Airflow
2025年9月5日大约 3 分钟
重跑失败的task

重载
airflow dags reserialize
默认密码启动会输出
airflow users reset-password --username admin --password 新密码
安装
services:
airflow:
build:
context: .
dockerfile: Dockerfile
tags:
- "registry.cn-hangzhou.aliyuncs.com/wjn0918/soft:airflow-latest"
image: airflow
container_name: airflow-standalone
environment:
- AIRFLOW__CORE__LOAD_EXAMPLES=False # 不加载默认案例
volumes:
- ./dags:/root/airflow/dags
- airflow-data:/root/airflow
ports:
- 8082:8080
volumes:
airflow-data:
test
python xx.py
案例
from airflow.sdk import Variable,dag, task
import pendulum
from airflow.operators.python import get_current_context
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False, # 是否开启回填
tags=["example"],
)
def hello_variable():
"""
hello variable
"""
@task
def get_variable():
"""
get variable
"""
appid = Variable.get("appid")
print(f"the variable appid is {appid}")
@task
def get_currnet_time():
"""
获取任务运行时间
"""
context = get_current_context()
execution_date = context['ds']
print(f"Execution date is: {execution_date}")
t1 = get_variable()
t2 = get_currnet_time()
t1 >> t2
hello_variable()
引用自身模块
注意
需要添加到 dag 开头
import sys
import os
# Dynamically add the dags folder to Python path
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from airflow.sdk import Variable,dag, task
import pendulum
import sys
import os
# Dynamically add the dags folder to Python path
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from utils.my_util import hello_util
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False, # 是否开启回填
tags=["example"],
)
def demo():
"""
hello utils
"""
@task
def get_function():
"""
get function from third py
"""
hello_util()
get_function()
demo()
安装
注意
windows 需要在wsl环境中运行
pip install apache-airflow -i https://pypi.tuna.tsinghua.edu.cn/simple/
airflow standalone
配置
## 不加载案例
[core]
load_examples = False
重置数据库
airflow db reset
hello world
运行
验证脚本
命令行运行
providers
调度
┌───────────── 分钟 (0 - 59)
│ ┌───────────── 小时 (0 - 23)
│ │ ┌───────────── 日 (1 - 31)
│ │ │ ┌───────────── 月 (1 - 12)
│ │ │ │ ┌───────────── 星期几 (0 - 7)(0 和 7 都代表星期日)
│ │ │ │ │
│ │ │ │ │
*/5 * * * * your-command
变量
配置
更改端口
base_url
port
internal_api_url
都需要修改
时区问题
@dag(
schedule="*/10 * * * *",
start_date=pendulum.datetime(2025, 7, 15, 17, 30, tz="Asia/Shanghai"), # 明确使用上海时区
catchup=True,
tags=["cl"],
)
补数据
catchup = True
airflow 会自动从开始时间补数据 , 所以 设置为ture后需要调整start_date, 不然会从开始时间一直补到现在
pkill -f "airflow"
分组中存在SQLExecuteQueryOperator 需要放到task后
import pendulum
import subprocess
import re
import pytz
from airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun
from airflow.executors.workloads import TaskInstance
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.sdk import dag, task, task_group
from airflow.providers.standard.operators.empty import EmptyOperator
@dag(
schedule="*/5 * * * *",
start_date=pendulum.datetime(2025, 7, 15, 17, tz="Asia/Shanghai"),
catchup=False,
tags=["sjzx"],
)
def sjzx_xmjcjl():
"""
数据中心 校门进出记录
"""
@task
def extract_data(ti: TaskInstance = None, dag_run: DagRun = None):
"""运行 DataX 并提取记录数"""
d = dag_run.data_interval_start
shanghai_tz = pytz.timezone('Asia/Shanghai')
d_shanghai = d.astimezone(shanghai_tz)
bizdate = d_shanghai.strftime("%Y-%m-%d")
print(f"DAG Run queued at: {d}")
cmd = f"python /data/soft/datax_cs/bin/datax.py /data/soft/airflow/datax/ods_sr_sjzx_xmjcjl.json -p '-Dbizdate={bizdate} -Dhh={d_shanghai.hour:02d}'"
print(f"执行: {cmd}")
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
log_output = result.stdout + result.stderr
match = re.search(r"读出记录总数\s*:\s*(\d+)", log_output)
record_count = int(match.group(1)) if match else 0
print(f"[DataX] 读出记录总数: {record_count}")
print(log_output)
ti.xcom_push(key='record_count', value=record_count)
ti.xcom_push(key='bizdate', value=bizdate)
ti.xcom_push(key='hh', value=f"{d_shanghai.hour:02d}")
@task.branch
def branch_func(ti=None):
record_count = int(ti.xcom_pull(key='record_count', task_ids='extract_data'))
if record_count > 0:
return "export_task"
else:
return "skip_task"
@task_group(group_id="export_task")
def export_task():
@task.bash(cwd="/data/soft/airflow/etl")
def export_access_person_record(ti=None):
"""门禁 进出记录"""
bizdate = str(ti.xcom_pull(key='bizdate', task_ids='extract_data'))
hh = str(ti.xcom_pull(key='hh', task_ids='extract_data'))
return f"uv run access_person_record.py config_sr_pro.yaml {bizdate} {hh}"
@task.bash(cwd="/data/soft/airflow/etl")
def export_visitor_access_record(ti=None):
"""访客 进出记录"""
bizdate = str(ti.xcom_pull(key='bizdate', task_ids='extract_data'))
hh = str(ti.xcom_pull(key='hh', task_ids='extract_data'))
return f"uv run visitor_access_record.py config_sr_pro.yaml {bizdate} {hh}"
update_sx_fkzt = SQLExecuteQueryOperator(
task_id="update_sx_fkzt",
conn_id="sr_ywxt",
sql="""
with t as (
-- 门禁当天人员进出记录
select
job_no,
access_type,
access_time,
row_number() over(partition by job_no order by access_time desc) as n
from t_access_person_record where workspace_id = 'SXXQ' and to_char(access_time, 'yyyy-MM-dd') = '${bizdate}' and name is not null
)
update ads_t_dp_ryzt
set
sfzx = case when access_type = '1' then 1 else 0 end
,jxsj = case when access_type = '1' then cast(access_time as varchar) else jxsj end
,lxsj = case when access_type != '1' then cast(access_time as varchar) else lxsj end
from (
select * from t where n = 1
) as t1
where ads_t_dp_ryzt.xh = t1.job_no
and workspace_id = 'SXXQ'
"""
)
update_hz_fkzt = SQLExecuteQueryOperator(
task_id="update_hz_fkzt",
conn_id="sr_ywxt",
sql="""
with t as (
-- 门禁当天人员进出记录
select
job_no,
access_type,
access_time,
row_number() over(partition by job_no order by access_time desc) as n
from t_access_person_record where workspace_id = 'HZXQ' and to_char(access_time, 'yyyy-MM-dd') = '${bizdate}' and name is not null
)
update ads_t_dp_ryzt
set
sfzx = case when access_type = '1' then 1 else 0 end
,jxsj = case when access_type = '1' then cast(access_time as varchar) else jxsj end
,lxsj = case when access_type != '1' then cast(access_time as varchar) else lxsj end
from (
select * from t where n = 1
) as t1
where ads_t_dp_ryzt.xh = t1.job_no
and workspace_id = 'HZXQ'
""",
)
export_access_person_record() >> export_visitor_access_record() >> update_hz_fkzt >> update_sx_fkzt
skip_task = EmptyOperator(task_id="skip_task")
extract_data() >> branch_func() >> [export_task(), skip_task]
sjzx_xmjcjl()
自启
/etc/systemd/system/airflow.service
[Unit]
Description=Airflow Standalone
After=network.target
[Service]
User=root
Environment="PATH=/root/miniconda3/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
ExecStart=/root/miniconda3/bin/airflow standalone
Restart=always
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target