IT/Data

[data] Amazon Managed Workflows for Apache Airflow(MWAA) & GCP Cloud Composer

Hayley Shim 2023. 10. 28. 18:04

안녕하세요. AWS의 Apache Airflow 용 관리형 오케스트레이션 서비스인 Amazon Managed Workflows for Apache Airflow(MWAA)와 GCP의 Cloud Composer에 대해 이해하고자 작성한 글입니다.

일반적으로 Airflow는 빅데이터 플랫폼 환경에서 Airflow 내 다양한 Operator 기능을 통해 워크플로우(workflow)를 오케스트레이션 해주는 툴로 일반적인 빅데이터 플랫폼 환경에서 많이 사용되고 데이터의 수집, 저장 및 데이터 변환 작업을 일반적으로 순차작업(Airflow DAG)으로 구성하여 배치 작업(Airflow 스케줄)으로 수행할 수 있습니다.

Airflow 동작을 이해하기 위해 데이터 분석에 관련된 용어를 간단히 살펴보겠습니다. [참고]

  • 워크플로 : 데이터 수집, 변환, 분석, 활용을 위한 일련의 태스크. Airflow에서 워크플로우(workflow)는 DAG(또는 Directed Acyclic Graph)를 사용하여 생성
  • DAG : 관계 및 종속 항목을 반영하는 방식으로 구성된 예약하고 실행하려는 태스크의 모음. DAG는 코드를 사용하여 DAG 구조(태스크 및 종속 항목)를 정의하는 Python 스크립트에서 생성
 

DAG와 태스크의 관계

Amazon Managed Workflows for Apache Airflow(MWAA)

  • Amazon Managed Workflows for Apache Airflow : Apache Airflow 용 관리형 오케스트레이션 서비스입니다.대규모 클라우드에서 데이터 파이프라인을 설정하고 운영하는 데 사용할 수 있습니다.
  • Apache Airflow : 워크플로라고 하는 프로세스 및 작업 의 시퀀스를 프로그래밍 방식으로 작성, 예약 및 모니터링하는 데 사용되는 오픈 소스 도구
  • Amazon MWAA를 사용하면 확장성, 가용성 및 보안을 위해 기본 인프라를 관리할 필요 없이 Apache Airflow 및 Python을 사용하여 워크플로를 생성할 수 있습니다.

MWAA 튜토리얼

  • MWAA에 대해 이해하기 위해 간단한 AWS MWAA 공식 튜토리얼을 살펴보겠습니다.
  • 이 튜토리얼에서는 1)DAG를 Amazon S3에 업로드하고, 2)Apache Airflow에서 DAG를 실행하고, 3)CloudWatch에서 로그를 보는 내용을 안내합니다.

1. AWS CloudFormation으로 인프라 구성

  • mwaa_public_network.yaml. 템플릿을 다운로드 하여 인프라를 구성해줍니다.
  • Amazon VPC 인프라, Amazon S3 버킷 및 Amazon MWAA 환경을 생성하는 데 30분 이상 걸립니다.

2. DAG를 Amazon S3에 업로드

  • 아래 Airflow 예제 파이프라인 tutorial.py을 다운받아 S3에 업로드합니다. 해당 코드에 대한 설명은 링크에 자세히 설명되어 있습니다.

# 모듈 가져오기
from datetime import datetime, timedelta
from textwrap import dedent

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator

# 기본 인수
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}

# DAG 인스턴스화
with DAG(
    'tutorial',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=['example'],
) as dag:

# Task : 연산자에서 인스턴스화된 개체
    # t1, t2 and t3 are examples of tasks created by instantiating operators
    t1 = BashOperator(
        task_id='print_date',
        bash_command='date',
    )

    t2 = BashOperator(
        task_id='sleep',
        depends_on_past=False,
        bash_command='sleep 5',
        retries=3,
    )
    t1.doc_md = dedent(
        """\
    #### Task Documentation
    You can document your task using the attributes `doc_md` (markdown),
    `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
    rendered in the UI's Task Instance Details page.
    ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)

    """
    )

    dag.doc_md = __doc__  # providing that you have a docstring at the beginning of the DAG
    dag.doc_md = """
    This is a documentation placed anywhere
    """  # otherwise, type it like this
    templated_command = dedent(
        """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
    """
    )

    t3 = BashOperator(
        task_id='templated',
        depends_on_past=False,
        bash_command=templated_command,
        params={'my_param': 'Parameter I passed in'},
    )

    t1 >> [t2, t3]
$ aws s3 ls s3://YOUR_S3_BUCKET_NAME

$ aws s3 cp tutorial.py s3://YOUR_S3_BUCKET_NAME/dags/

3. Apache Airflow에서 DAG를 실행

  • Amazon MWAA 콘솔에서 Open Airflow UI를 선택합니다.
  • Apache Airflow UI의 사용 가능한 DAG 목록에서 tutorial DAG를 선택합니다.
  • DAG 세부 정보 페이지에서 DAG 이름 옆에 있는 Pause/Unpause DAG 토글을 선택하여 DAG 일시 중지를 해제합니다.

4. CloudWatch에서 로그 보기

  • CloudWatch 콘솔에서 Apache Airflow 로그를 볼 수 있습니다.
  • Airflow 웹 서버 로그 그룹을 선택하고 로그 스트림webserver_console_ip 에서 로그인을 선택합니다 .

Cloud Composer

위 AWS MWAA의 튜토리얼과 마찬가지로 Cloud Composer 역시Airflow의 DAG 파일을 Cloud Storage에 업로드해서 사용합니다. Cloud Storage에 Airflow DAG 파일을 업로드하면 Cloud Composer에서 해당 파일을 읽어 작업을 실행합니다. 사용자 입장에서 Airflow 구성 요소에 대해 크게 신경 쓸 필요가 없이 DAG 파일 생성에 집중하면 됩니다.

Cloud Composer 튜토리얼

  • Cloud Composer에 대해 이해하기 위해 간단한 Cloud Composer 공식 튜토리얼을 참고하면 됩니다.
  • 아래와 같이 전반적인 Logic은 위 AWS MWAA와 같습니다.
  1. Cloud Composer 환경 만들기
  2. DAG 만들기
  3. Cloud Storage에 DAG 업로드
  4. Airflow UI에서 DAG 보기
  5. Airflow 로그에서 태스크 인스턴스 세부정보 보기

복잡한 task flow를 병렬로 처리하고 시각화해서 운영할 경우 CSP 환경에 맞게 Managed Service인 AWS Managed Apache Airflow(MWAA) 또는 Cloud Composer를 사용하여 간편하게 워크플로우를 관리할 수 있습니다.

 

 

참고

https://airflow.apache.org/docs/apache-airflow/2.2.2/tutorial.html

'IT > Data' 카테고리의 다른 글

[data] AWS Lake Formation & Redshift Spectrum  (0) 2023.10.28