0

Tất tần tật về Airflow (Phần 2) Giới thiệu về DAG

Giới thiệu

Trong phần 1, ta đã biết được cách cài đặt Airflow sử dụng Docker. Trong bài viết này, ta sẽ cùng nhau tìm hiểu một khái niệm quan trọng trong Airflow là DAG (Directed Acyclic Graph) 😄 DAG là một tập hợp các nhiệm vụ (tasks) được sắp xếp theo một thứ tự nhất định, trong đó mỗi task đại diện cho một công việc cụ thể trong quá trình xử lý dữ liệu.

Trong Airflow, DAG được sử dụng để định nghĩa các quá trình xử lý dữ liệu phức tạp thành các tác vụ (tasks) đơn giản hơn, sau đó sắp xếp các tác vụ này theo một thứ tự nhất định để tạo thành một quá trình hoàn chỉnh. Trong bài viết này mình sẽ giới thiệu các thành phần và cách sử dụng của DAG 😄

DAG là gì?

Qua phần giới thiệu ta đã biết được ý tưởng chung của DAG 😄 Nói ngắn gọn, đây chính là nơi bạn định nghĩa pipeline cho dữ liệu và đơn giản chỉ là một file python. Trong một file python, bạn cũng có thể định nghĩa nhiều DAG khác nhau.

Khai báo DAG

Có 3 cách để khai báo một DAG. Ở cách 1, ta sẽ sử dụng một context manager như sau:

import datetime

 from airflow import DAG
 from airflow.operators.empty import EmptyOperator

 with DAG(
     dag_id="my_dag_name",
     start_date=datetime.datetime(2023, 6, 20),
     schedule="@daily",
 ):
     EmptyOperator(task_id="task")

ở cách này để chỉ định một operator thuộc DAG nào, ta chỉ cần khai báo operator ở trong "with DAG".

Cách thứ 2, ta có thể sử dụng một constructor, sau đó truyền DAG vào tất cả các operator thuộc DAG đó:

 import datetime

 from airflow import DAG
 from airflow.operators.empty import EmptyOperator

 my_dag = DAG(
     dag_id="my_dag_name",
     start_date=datetime.datetime(2023, 6, 20),
     schedule="@daily",
 )
 EmptyOperator(task_id="task", dag=my_dag)

Cách thứ 3, ta có thể sử dụng decorator @dag để chuyển một function thành DAG generator:

import datetime

from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator

@dag(start_date=datetime.datetime(2023, 6, 20), schedule="@daily")
def generate_dag():
    EmptyOperator(task_id="task")

generate_dag()

Hãy cùng tìm hiểu các tham số có trong DAG 😄

Đầu tiên, đó là dag_id là id riêng biệt cho mỗi DAG trong Airflow. Tiếp theo, để tạo DAG ta cần chỉ định một số thông tin:

  • schedule_interval là tham số chỉ định xem khi nào DAG sẽ chạy. Dưới đây là bảng các giá trị bạn có thể dùng để đặt lịch DAG, bạn có thể đặt bằng giá trị cron "preset" hoặc cron expression tùy ý.
Preset Ý nghĩa Cron
None Không đặt lịch cho DAG
@once Đặt lịch chạy một lần và chỉ một lần
@continuous Kích hoạt DAG khi lần chạy trước kết thúc (chạy liên tục)
@hourly Chạy mỗi tiếng một lần 0 * * * *
@daily Chạy mỗi ngày một lần, kích hoạt vào 24:00 0 0 * * *
@weekly Chạy mỗi tuần một lần, kích hoạt vào 24:00 chủ nhật 0 0 * * 0
@monthly Chạy mỗi tháng một lần, kích hoạt vào 24:00 ngày đầu tiên của tháng 0 0 1 * *
@quarterly Chạy mỗi quý một lần, kích hoạt vào 24:00 ngày đầu tiên 0 0 1 */3 *
@yearly Chạy một năm một lần, kích hoạt vào 24:00 ngày 1 tháng 1 0 0 1 1 *

Nếu schedule chưa đủ đô, bạn có thể sử dụng Timetables để custom lập lịch chạy.

  • start_date Là thời gian mà DAG bắt đầu chạy. Thời gian được chỉ định sử dụng datetime object.

Task Dependencies

Sau khi khai báo DAG, ta sẽ chỉ định các task trong DAG. Thông thường, một task sẽ phụ thuộc vào kết quả của một (hoặc nhiều) task trước đó. Việc khai báo các phụ thuộc này chính là chúng ta đang xây dựng lên một cấu trúc DAG 😄

Có 2 cách chính để khai báo sự phụ thuộc giữa các task. Cách 1 là ta sẽ sử dụng operator >><<. Ví dụ:

import datetime
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
with DAG(
  "random_dag",
  start_date=start_date=datetime.datetime(2023, 6, 20),
  schedule_interval=None,
) as dag:
   task_a = DummyOperator(task_id="task_a")
   task_b = DummyOperator(task_id="task_b")
   task_c = DummyOperator(task_id="task_c")
   task_d = DummyOperator(task_id="task_d")
   task_a >> [task_b, task_c]
   task_c >> task_d

Trong ví dụ trên, task_a được gọi là upstream của task_btask_c, ngược lại task_btask_c được gọi là downstream của task_a.

Cách 2 ta sử dụng method là set_upstreamset_downstream:

task_a.set_downstream(task_b, task_c)
task_d.set_upstream(task_c)

Loading DAGs

File DAG của bạn cần được lưu trong folder /dags (nếu đọc phần 1 thì bạn đã biết cần tạo folder này 😄) để tải lên Airflow. Airflow sẽ thực thi từng file và tải các DAG object từ các file đó. Điều này có nghĩa là bạn có thể đặt nhiều DAG trong một python file hoặc là chia chúng sang nhiều file khác nhau.

Lưu ý rằng khi Airflow load DAG từ file python, nó chỉ load object ở top level. Ví dụ:

dag_1 = DAG('this_dag_will_be_discovered')

def my_function():
    dag_2 = DAG('but_this_dag_will_not')

my_function()

Trong đoạn code trên chỉ có dag_1 là được load, dag_2 nằm trong hàm nên sẽ bị bỏ qua 😄

Running DAGs

DAG được chạy theo 2 cách:

  • Trigger thủ công hoặc qua API
  • Bạn thiết lập schedule cho DAG và đến giờ theo schedule thì DAG chạy 😄

Bạn có thể thiết lập schedule cho DAG hoặc không tùy vào yêu cầu của bài toán.

Control Flow

Mặc định thì một DAG sẽ chạy một task khi các task mà nó phụ thuộc chạy thành công. Tuy nhiên, có nhiều cách custom cho luồng chạy của DAG. Ở đây mình sẽ giới thiệu 2 control flow chính:

  • Branching: Bạn có thể đặt điều kiện để chạy một task tiếp theo tùy ý nào đó.
  • Trigger Rules: Đặt điều kiện để chạy một task dựa vào trạng thái của task trước đó (successed, failed, skipped).

Branching

Ta sử dụng branch khi cần chạy các luồng khác nhau phụ thuộc xem các điều kiện có được thỏa mãn hay không. Để khai báo một branch ta sử dụng @task.branch hoặc BranchPythonOperator. Một DAG sử dụng branch như hình dưới:

image.png

Ví dụ, định nghĩa branch sử dụng @task.branch như sau:

@task.branch(task_id="branch_task")
def branch_func(ti=None):
    xcom_value = int(ti.xcom_pull(task_ids="start_task"))
    if xcom_value >= 5:
        return "continue_task"
    elif xcom_value >= 3:
        return "stop_task"
    else:
        return None


start_op = BashOperator(
    task_id="start_task",
    bash_command="echo 5",
    xcom_push=True,
    dag=dag,
)

branch_op = branch_func()

continue_op = EmptyOperator(task_id="continue_task", dag=dag)
stop_op = EmptyOperator(task_id="stop_task", dag=dag)

start_op >> branch_op >> [continue_op, stop_op]

Hoặc sử dụng BaseBranchOperator:

class MyBranchOperator(BaseBranchOperator):
    def choose_branch(self, context):
        """
        Run an extra branch on the first day of the month
        """
        if context['data_interval_start'].day == 1:
            return ['daily_task_id', 'monthly_task_id']
        elif context['data_interval_start'].day == 2:
            return 'daily_task_id'
        else:
            return None

Trigger Rules

Mặc định thì Airflow sẽ chờ cho tất cả upstream task của một task chạy thành công trước khi thực hiện chạy task đó. Tuy nhiên ta có thể thêm sửa tham số trigger_rule cho những yêu cầu DAG phức tạp hơn. Cụ thể:

  • all_success (mặc định): Tất cả các task upstream đã thành công

  • all_failed: Tất cả các task upstream đều ở trạng thái failed hoặc upstream_failed

  • all_done: Tất cả các task upstream đã hoàn thành việc thực thi của chúng

  • all_skipped: Tất cả các task upstream đều ở trạng thái skipped

  • one_failed: Ít nhất một task upstream đã thất bại (không đợi tất cả các task upstream hoàn thành)

  • one_success: Ít nhất một task upstream đã thành công (không đợi tất cả các task upstream hoàn thành)

  • one_done: Ít nhất một task upstream đã thành công hoặc thất bại

  • none_failed: Tất cả các task upstream không thất bại hoặc upstream_failed - tức là tất cả các task upstream đã thành công hoặc được bỏ qua (skipped)

  • none_failed_min_one_success: Tất cả các task upstream không thất bại hoặc upstream_failed, và ít nhất một task upstream đã thành công.

  • none_skipped: Không có task upstream nào ở trạng thái skipped - tức là tất cả các task upstream ở trạng thái success, failed hoặc upstream_failed

  • always: Không có sự phụ thuộc nào, task này có thể chạy bất kỳ lúc nào

Dynamic DAGs

Vì DAG được định nghĩa bằng Python code nên ta có thể thoải mái định nghĩa DAG sử dụng vòng lặp, hàm,... ví dụ:

 with DAG("loop_example", ...):

     first = EmptyOperator(task_id="first")
     last = EmptyOperator(task_id="last")

     options = ["branch_a", "branch_b", "branch_c", "branch_d"]
     for option in options:
         t = EmptyOperator(task_id=option)
         first >> t >> last

SubDAGs

Trong trường hợp muốn nhóm các task nhỏ thành một task lớn, ta sẽ sử dụng SubDAGs. Ví dụ ta có nhiều task chạy song song trong 2 section như sau:

image.png

Ta có thể gộp các task chạy song song thành một SubDAG, kết quả như sau:

image.png

Hàm subdag được định nghĩa như sau:

import pendulum

from airflow import DAG
from airflow.operators.empty import EmptyOperator


def subdag(parent_dag_name, child_dag_name, args) -> DAG:
    """
    Generate a DAG to be used as a subdag.

    :param str parent_dag_name: Id of the parent DAG
    :param str child_dag_name: Id of the child DAG
    :param dict args: Default arguments to provide to the subdag
    :return: DAG to use as a subdag
    """
    dag_subdag = DAG(
        dag_id=f"{parent_dag_name}.{child_dag_name}",
        default_args=args,
        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
        catchup=False,
        schedule="@daily",
    )

    for i in range(5):
        EmptyOperator(
            task_id=f"{child_dag_name}-task-{i + 1}",
            default_args=args,
            dag=dag_subdag,
        )

    return dag_subdag

Hàm này sẽ được gọi trong DAG như sau:

import datetime

from airflow import DAG
from airflow.example_dags.subdags.subdag import subdag
from airflow.operators.empty import EmptyOperator
from airflow.operators.subdag import SubDagOperator

DAG_NAME = "example_subdag_operator"

with DAG(
    dag_id=DAG_NAME,
    default_args={"retries": 2},
    start_date=datetime.datetime(2022, 1, 1),
    schedule="@once",
    tags=["example"],
) as dag:

    start = EmptyOperator(
        task_id="start",
    )

    section_1 = SubDagOperator(
        task_id="section-1",
        subdag=subdag(DAG_NAME, "section-1", dag.default_args),
    )

    some_other_task = EmptyOperator(
        task_id="some-other-task",
    )

    section_2 = SubDagOperator(
        task_id="section-2",
        subdag=subdag(DAG_NAME, "section-2", dag.default_args),
    )

    end = EmptyOperator(
        task_id="end",
    )

    start >> section_1 >> some_other_task >> section_2 >> end

Kết luận

Qua bài viết ta đã biết thêm những ý chính của một concept quan trọng trong Airflow là DAG. Trong bài viết tiếp theo, ta sẽ tìm hiểu thêm các concept khác trong Airflow.

Tham khảo

[1] https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/index.html

[2] https://www.youtube.com/watch?v=kAtaj_s4f-w&ab_channel=ApacheAirflow


All rights reserved

Viblo
Hãy đăng ký một tài khoản Viblo để nhận được nhiều bài viết thú vị hơn.
Đăng kí