Tất tần tật về Airflow (Phần 2) Giới thiệu về DAG
Giới thiệu
Trong phần 1, ta đã biết được cách cài đặt Airflow sử dụng Docker. Trong bài viết này, ta sẽ cùng nhau tìm hiểu một khái niệm quan trọng trong Airflow là DAG (Directed Acyclic Graph) DAG là một tập hợp các nhiệm vụ (tasks) được sắp xếp theo một thứ tự nhất định, trong đó mỗi task đại diện cho một công việc cụ thể trong quá trình xử lý dữ liệu.
Trong Airflow, DAG được sử dụng để định nghĩa các quá trình xử lý dữ liệu phức tạp thành các tác vụ (tasks) đơn giản hơn, sau đó sắp xếp các tác vụ này theo một thứ tự nhất định để tạo thành một quá trình hoàn chỉnh. Trong bài viết này mình sẽ giới thiệu các thành phần và cách sử dụng của DAG
DAG là gì?
Qua phần giới thiệu ta đã biết được ý tưởng chung của DAG Nói ngắn gọn, đây chính là nơi bạn định nghĩa pipeline cho dữ liệu và đơn giản chỉ là một file python. Trong một file python, bạn cũng có thể định nghĩa nhiều DAG khác nhau.
Khai báo DAG
Có 3 cách để khai báo một DAG. Ở cách 1, ta sẽ sử dụng một context manager như sau:
import datetime
from airflow import DAG
from airflow.operators.empty import EmptyOperator
with DAG(
dag_id="my_dag_name",
start_date=datetime.datetime(2023, 6, 20),
schedule="@daily",
):
EmptyOperator(task_id="task")
ở cách này để chỉ định một operator thuộc DAG nào, ta chỉ cần khai báo operator ở trong "with DAG".
Cách thứ 2, ta có thể sử dụng một constructor, sau đó truyền DAG vào tất cả các operator thuộc DAG đó:
import datetime
from airflow import DAG
from airflow.operators.empty import EmptyOperator
my_dag = DAG(
dag_id="my_dag_name",
start_date=datetime.datetime(2023, 6, 20),
schedule="@daily",
)
EmptyOperator(task_id="task", dag=my_dag)
Cách thứ 3, ta có thể sử dụng decorator @dag
để chuyển một function thành DAG generator:
import datetime
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator
@dag(start_date=datetime.datetime(2023, 6, 20), schedule="@daily")
def generate_dag():
EmptyOperator(task_id="task")
generate_dag()
Hãy cùng tìm hiểu các tham số có trong DAG
Đầu tiên, đó là dag_id
là id riêng biệt cho mỗi DAG trong Airflow. Tiếp theo, để tạo DAG ta cần chỉ định một số thông tin:
schedule_interval
là tham số chỉ định xem khi nào DAG sẽ chạy. Dưới đây là bảng các giá trị bạn có thể dùng để đặt lịch DAG, bạn có thể đặt bằng giá trị cron "preset" hoặc cron expression tùy ý.
Preset | Ý nghĩa | Cron |
---|---|---|
None | Không đặt lịch cho DAG | |
@once | Đặt lịch chạy một lần và chỉ một lần | |
@continuous | Kích hoạt DAG khi lần chạy trước kết thúc (chạy liên tục) | |
@hourly | Chạy mỗi tiếng một lần | 0 * * * * |
@daily | Chạy mỗi ngày một lần, kích hoạt vào 24:00 | 0 0 * * * |
@weekly | Chạy mỗi tuần một lần, kích hoạt vào 24:00 chủ nhật | 0 0 * * 0 |
@monthly | Chạy mỗi tháng một lần, kích hoạt vào 24:00 ngày đầu tiên của tháng | 0 0 1 * * |
@quarterly | Chạy mỗi quý một lần, kích hoạt vào 24:00 ngày đầu tiên | 0 0 1 */3 * |
@yearly | Chạy một năm một lần, kích hoạt vào 24:00 ngày 1 tháng 1 | 0 0 1 1 * |
Nếu schedule chưa đủ đô, bạn có thể sử dụng Timetables để custom lập lịch chạy.
start_date
Là thời gian mà DAG bắt đầu chạy. Thời gian được chỉ định sử dụngdatetime
object.
Task Dependencies
Sau khi khai báo DAG, ta sẽ chỉ định các task trong DAG. Thông thường, một task sẽ phụ thuộc vào kết quả của một (hoặc nhiều) task trước đó. Việc khai báo các phụ thuộc này chính là chúng ta đang xây dựng lên một cấu trúc DAG
Có 2 cách chính để khai báo sự phụ thuộc giữa các task. Cách 1 là ta sẽ sử dụng operator >>
và <<
. Ví dụ:
import datetime
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
with DAG(
"random_dag",
start_date=start_date=datetime.datetime(2023, 6, 20),
schedule_interval=None,
) as dag:
task_a = DummyOperator(task_id="task_a")
task_b = DummyOperator(task_id="task_b")
task_c = DummyOperator(task_id="task_c")
task_d = DummyOperator(task_id="task_d")
task_a >> [task_b, task_c]
task_c >> task_d
Trong ví dụ trên, task_a
được gọi là upstream của task_b
và task_c
, ngược lại task_b
và task_c
được gọi là downstream của task_a
.
Cách 2 ta sử dụng method là set_upstream
và set_downstream
:
task_a.set_downstream(task_b, task_c)
task_d.set_upstream(task_c)
Loading DAGs
File DAG của bạn cần được lưu trong folder /dags
(nếu đọc phần 1 thì bạn đã biết cần tạo folder này ) để tải lên Airflow. Airflow sẽ thực thi từng file và tải các DAG object từ các file đó. Điều này có nghĩa là bạn có thể đặt nhiều DAG trong một python file hoặc là chia chúng sang nhiều file khác nhau.
Lưu ý rằng khi Airflow load DAG từ file python, nó chỉ load object ở top level. Ví dụ:
dag_1 = DAG('this_dag_will_be_discovered')
def my_function():
dag_2 = DAG('but_this_dag_will_not')
my_function()
Trong đoạn code trên chỉ có dag_1
là được load, dag_2
nằm trong hàm nên sẽ bị bỏ qua
Running DAGs
DAG được chạy theo 2 cách:
- Trigger thủ công hoặc qua API
- Bạn thiết lập schedule cho DAG và đến giờ theo schedule thì DAG chạy
Bạn có thể thiết lập schedule cho DAG hoặc không tùy vào yêu cầu của bài toán.
Control Flow
Mặc định thì một DAG sẽ chạy một task khi các task mà nó phụ thuộc chạy thành công. Tuy nhiên, có nhiều cách custom cho luồng chạy của DAG. Ở đây mình sẽ giới thiệu 2 control flow chính:
- Branching: Bạn có thể đặt điều kiện để chạy một task tiếp theo tùy ý nào đó.
- Trigger Rules: Đặt điều kiện để chạy một task dựa vào trạng thái của task trước đó (successed, failed, skipped).
Branching
Ta sử dụng branch khi cần chạy các luồng khác nhau phụ thuộc xem các điều kiện có được thỏa mãn hay không. Để khai báo một branch ta sử dụng @task.branch
hoặc BranchPythonOperator. Một DAG sử dụng branch như hình dưới:
Ví dụ, định nghĩa branch sử dụng @task.branch
như sau:
@task.branch(task_id="branch_task")
def branch_func(ti=None):
xcom_value = int(ti.xcom_pull(task_ids="start_task"))
if xcom_value >= 5:
return "continue_task"
elif xcom_value >= 3:
return "stop_task"
else:
return None
start_op = BashOperator(
task_id="start_task",
bash_command="echo 5",
xcom_push=True,
dag=dag,
)
branch_op = branch_func()
continue_op = EmptyOperator(task_id="continue_task", dag=dag)
stop_op = EmptyOperator(task_id="stop_task", dag=dag)
start_op >> branch_op >> [continue_op, stop_op]
Hoặc sử dụng BaseBranchOperator:
class MyBranchOperator(BaseBranchOperator):
def choose_branch(self, context):
"""
Run an extra branch on the first day of the month
"""
if context['data_interval_start'].day == 1:
return ['daily_task_id', 'monthly_task_id']
elif context['data_interval_start'].day == 2:
return 'daily_task_id'
else:
return None
Trigger Rules
Mặc định thì Airflow sẽ chờ cho tất cả upstream task của một task chạy thành công trước khi thực hiện chạy task đó. Tuy nhiên ta có thể thêm sửa tham số trigger_rule
cho những yêu cầu DAG phức tạp hơn. Cụ thể:
-
all_success (mặc định): Tất cả các task upstream đã thành công
-
all_failed: Tất cả các task upstream đều ở trạng thái failed hoặc upstream_failed
-
all_done: Tất cả các task upstream đã hoàn thành việc thực thi của chúng
-
all_skipped: Tất cả các task upstream đều ở trạng thái skipped
-
one_failed: Ít nhất một task upstream đã thất bại (không đợi tất cả các task upstream hoàn thành)
-
one_success: Ít nhất một task upstream đã thành công (không đợi tất cả các task upstream hoàn thành)
-
one_done: Ít nhất một task upstream đã thành công hoặc thất bại
-
none_failed: Tất cả các task upstream không thất bại hoặc upstream_failed - tức là tất cả các task upstream đã thành công hoặc được bỏ qua (skipped)
-
none_failed_min_one_success: Tất cả các task upstream không thất bại hoặc upstream_failed, và ít nhất một task upstream đã thành công.
-
none_skipped: Không có task upstream nào ở trạng thái skipped - tức là tất cả các task upstream ở trạng thái success, failed hoặc upstream_failed
-
always: Không có sự phụ thuộc nào, task này có thể chạy bất kỳ lúc nào
Dynamic DAGs
Vì DAG được định nghĩa bằng Python code nên ta có thể thoải mái định nghĩa DAG sử dụng vòng lặp, hàm,... ví dụ:
with DAG("loop_example", ...):
first = EmptyOperator(task_id="first")
last = EmptyOperator(task_id="last")
options = ["branch_a", "branch_b", "branch_c", "branch_d"]
for option in options:
t = EmptyOperator(task_id=option)
first >> t >> last
SubDAGs
Trong trường hợp muốn nhóm các task nhỏ thành một task lớn, ta sẽ sử dụng SubDAGs. Ví dụ ta có nhiều task chạy song song trong 2 section như sau:
Ta có thể gộp các task chạy song song thành một SubDAG, kết quả như sau:
Hàm subdag được định nghĩa như sau:
import pendulum
from airflow import DAG
from airflow.operators.empty import EmptyOperator
def subdag(parent_dag_name, child_dag_name, args) -> DAG:
"""
Generate a DAG to be used as a subdag.
:param str parent_dag_name: Id of the parent DAG
:param str child_dag_name: Id of the child DAG
:param dict args: Default arguments to provide to the subdag
:return: DAG to use as a subdag
"""
dag_subdag = DAG(
dag_id=f"{parent_dag_name}.{child_dag_name}",
default_args=args,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
schedule="@daily",
)
for i in range(5):
EmptyOperator(
task_id=f"{child_dag_name}-task-{i + 1}",
default_args=args,
dag=dag_subdag,
)
return dag_subdag
Hàm này sẽ được gọi trong DAG như sau:
import datetime
from airflow import DAG
from airflow.example_dags.subdags.subdag import subdag
from airflow.operators.empty import EmptyOperator
from airflow.operators.subdag import SubDagOperator
DAG_NAME = "example_subdag_operator"
with DAG(
dag_id=DAG_NAME,
default_args={"retries": 2},
start_date=datetime.datetime(2022, 1, 1),
schedule="@once",
tags=["example"],
) as dag:
start = EmptyOperator(
task_id="start",
)
section_1 = SubDagOperator(
task_id="section-1",
subdag=subdag(DAG_NAME, "section-1", dag.default_args),
)
some_other_task = EmptyOperator(
task_id="some-other-task",
)
section_2 = SubDagOperator(
task_id="section-2",
subdag=subdag(DAG_NAME, "section-2", dag.default_args),
)
end = EmptyOperator(
task_id="end",
)
start >> section_1 >> some_other_task >> section_2 >> end
Kết luận
Qua bài viết ta đã biết thêm những ý chính của một concept quan trọng trong Airflow là DAG. Trong bài viết tiếp theo, ta sẽ tìm hiểu thêm các concept khác trong Airflow.
Tham khảo
[1] https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/index.html
[2] https://www.youtube.com/watch?v=kAtaj_s4f-w&ab_channel=ApacheAirflow
All rights reserved