1. 安装
1 | pip install airflow |
1 | # airflow needs a home, ~/airflow is the default, |
启动
首先打开 airflow 守护进程1
airflow scheduler
2. 使用Airflow的大致步骤:
- 写任务脚本(.py)
- 测试任务脚本(command)
- WebUI 自查
测试任务脚本1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# -*- coding: utf-8 -*-
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import timedelta
# -------------------------------------------------------------------------------
# these args will get passed on to each operator
# you can override them on a per-task basis during operator initialization
default_args = {
'owner': 'shehuang',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
# 'email': ['@qq.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'adhoc':False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'trigger_rule': u'all_success'
}
# -------------------------------------------------------------------------------
# dag
dag = DAG(
'example_hello_world_dag',
default_args=default_args,
description='my first DAG',
schedule_interval=timedelta(days=1))
# -------------------------------------------------------------------------------
# first operator
date_operator = BashOperator(
task_id='date_task',
bash_command='date',
dag=dag)
# -------------------------------------------------------------------------------
# second operator
sleep_operator = BashOperator(
task_id='sleep_task',
depends_on_past=False,
bash_command='sleep 5',
dag=dag)
# -------------------------------------------------------------------------------
# third operator
def print_hello():
return 'Hello world!'
hello_operator = PythonOperator(
task_id='hello_task',
python_callable=print_hello,
dag=dag)
# -------------------------------------------------------------------------------
# dependencies
sleep_operator.set_upstream(date_operator)
hello_operator.set_upstream(date_operator)
3. 测试
测试代码有没有语法错误。1
python3 test.py
查看任务有没有生成1
airflow list_dags
List tasks1
2
3
4airflow list_tasks example_hello_world_dag
date_task
hello_task
sleep_task
测试 task 能不能跑成功1
airflow test example_hello_world_dag date_task 20190424
因为我们是朋友,所以你可以使用我的文字,但请注明出处:http://alwa.info