Skip to content
cover-dataengineering

여러 조직이 함께 사용하는 Airflow 만들기

  • DataEngineering

📅 August 15, 2021

⏱️6 min read

사내 데이터가 다양해지고 사용자가 많아지면 접근 제어와 권한 등 다양한 고민이 생기게 됩니다. 이 글에서는 여러 조직이 함께 사용하는 Airflow를 만들 때 알아두면 좋은 내용들에 대해 정리해보려고 합니다.


접근 제어가 필요한 경우

먼저 접근 제어는 모든 조직에 필요한 내용은 아닙니다. 다만 아래와 같은 경우에는 필요할 수 있습니다.

  • 다른 사람이 실행, 중지 권한을 가져서는 안될 만큼 중요한 DAG이 존재하는 경우
  • 민감한 데이터를 다루는 DAG이 존재하는 경우 (HR, 매출 데이터 등)
  • 팀에서 운영하는 DAG, Connection, Variable을 우리 팀만 보고 싶은 경우

특히 Airflow Connections, Variable에는 DB 또는 클러스터 접속 정보, API키 등 민감한 정보가 많이 저장됩니다. 물론 마스킹 기능을 통해 UI에서 볼 수 없게 만들 수 있지만 id는 볼 수 있기 때문에 쉽게 값을 가져올 수 있습니다.

from airflow.models import Variable
from airflow.hooks.base_hook import BaseHook

variable = Variable.get("myvar")
connection = BaseHook.get_connection("myconn")

이 문제를 해결하기 위한 방법으로 조직마다 Airflow 환경을 분리하는 방법이 있습니다. 하지만 이 방법은 운영과 모니터링이 힘들 수 있어 프라이빗 클라우드를 운영해야하는 상황이 아니라면 추천하지 않습니다. 두 번째 방법은 Airflow의 RBAC 기능을 활용하는 방법 입니다.


Airflow RBAC

airflow-rbac

Airflow RBAC은 1.10 버전부터 추가되었고 2.0 버전부터 기본 설정으로 제공됩니다. Airflow의 Security Model은 위의 그림과 같은 구조를 따르고 있습니다. 사용자는 User, Role로 구성되어 있습니다. 여기서 User는 하나 이상의 Role을 가질 수 있습니다.

접근 권한은 Permission, ViewMenu 그리고 이를 조합한 PermissionView로 구성되어 있습니다. Role은 여러 개의 Permission을 가질 수 있습니다. PermissionView에 대한 예시는 아래와 같습니다.

permissionview

Connections ViewMenu 와 can_edit Permission 을 조합하면 can edit on Connections라는 PermissionView 가 생성됩니다. 이 권한을 가진 사용자만 Connections UI에서 편집을 할 수 있습니다. 이러한 방식을 Airflow에서는 Resource-Based permissions라고 정의하고 있습니다.

Airflow에는 다양한 리소스에 대해 권한이 이미 정의되어 있고, 기본적으로 Admin을 포함한 5개의 Role을 제공합니다. 조직마다 다른 Role을 가지고 싶은 경우, BaseRole을 정의하고 Copy Role을 통해 새로 만들면 편하게 운영할 수 있습니다.

리소스 기반의 권한 제어도 필요하지만 이 기능에서는 DAGs 라는 단일 리소스로 보고 있기 때문에 DAG 단위로 접근 제어를 할 수 없습니다. 이를 지원하기 위해 2.0+ 버전부터 DAG-level Permission이 추가되었습니다.


DAG-level Permissions

DAG-level Permission을 사용하면 다음과 같은 접근 제어를 할 수 있습니다.

  • A 사용자는 A 사용자의 DAG만 볼 수 있음
  • A 사용자는 B 사용자의 DAG을 볼 수 없음
  • B 사용자가 A 사용자에게 권한을 부여하면 볼 수 있음

DAG-level Permission은 앞서 얘기했던 리소스 기반 접근 제어에 DAG:dag_id라는 리소스를 추가하는 방식으로 구현되었습니다. 예를 들어 A 사용자와 B 사용자에게 example DAG에 대한 읽기 권한을 부여하고 싶은 경우, DAG:example.can_read라는 권한을 추가해주어야 합니다.

with DAG(
    "example_dag",
    default_args=default_args,
    description="example dags",
    schedule_interval="@once",
    access_control={"myrole": {"can_dag_read"}},
    start_date=days_ago(2),
) as dag:

위와 같이 DAG을 정의하는 단계에서도 access_control 파라메터를 통해 DAG의 접근 권한을 정의해주어야 합니다. 이후 BaseRole에 DAGs 리소스 접근 권한을 제거하면 사용자는 오직 허용된 DAG에 대해서만 접근할 수 있게 됩니다.

DAG access_control이 변경될 때마다 Role에 권한을 추가하는 일은 보통 번거로운 일이 아닙니다. 이를 위해 Airflow에서는 airflow sync-perm 이라는 명령어를 제공합니다. 해당 명령어를 실행하면 모든 DAG에 정의된 권한이 연관된 Role에 반영됩니다. Permission Sync 사이드카 컨테이너를 webserver에 배포하면 이 과정을 자동화할 수 있습니다. 관련 내용은 사이드카 컨테이너로 Airflow 기능 확장하기 글을 참고해주시면 됩니다.


Connection, Variable Access Control

앞서 DAG-level Permission을 보셨다면 느끼셨겠지만 Connection, Variable 또한 각 변수에 대해 접근 제어를 할 수 없고 관련 기능도 없습니다. 하지만 Alternative Secrets Backend 라는 기능을 통해 Custom Backend 클래스를 만들면 접근 제어를 구현할 수 있습니다.


Alternative Secrets Backend

원래 Connection, Variable은 Meta DB에 저장됩니다. 하지만 이 기능을 사용하면 AWS Parameter Store, Vault 등 외부 자원을 저장소로 사용할 수 있습니다. airflow에 구현된 코드는 아래와 같습니다.

@classmethod
def get_connection_from_secrets(cls, conn_id: str) -> 'Connection':
    """
    Get connection by conn_id.
    :param conn_id: connection id
    :return: connection
    """
    for secrets_backend in ensure_secrets_loaded():
        conn = secrets_backend.get_connection(conn_id=conn_id)
        if conn:
            return conn
    raise AirflowNotFoundException(f"The conn_id `{conn_id}` not defined")

BaseHook에서 호출하는 get_connection_from_secrets 메서드는 여러 backend로부터 conn_id에 대한 값을 받아오고 리턴합니다. 즉 기존 Meta DB를 사용하고 있더라도 유지하면서 새로운 backend와 호환 가능합니다.

AWS Parameter Store는 Path 단위로 키를 다르게 값을 저장할 수 있습니다. 이 점을 활용해서 id 상위 경로로 role을 지정한다면 role 단위로 접근 제어가 가능해집니다. 접근 제어를 위한 AWS Parameter Store에 저장되는 규칙은 아래와 같습니다. Airflow 환경, 역할 별로 구분해서 저장합니다.

secrets:
    backend: "airflow...SystemsManagerParameterStoreBackend"
    backend_kwargs: {
        "connections_prefix": "/airflow/prod/connections",
        "variables_prefix": "/airflow/prod/variables",
        "profile_name": null
    }
  • /airflow/prod/connections/myrole/connection_id
  • /airflow/prod/variables/myrole/variable_id

기본으로 제공하는 Connections, Variables UI는 세부 경로로 값을 가져오는게 아니기 때문에 secrets backend 설정과 함께 Custom UI Plugin이 필요합니다.


Access Control UI Plugin

conn ui

플러그인의 역할은 다음과 같습니다. myrole이라는 Airflow Role을 가진 사용자가 Connections UI 페이지에 접근하면 Custom Backend를 통해 Paramter Store의 /airflow/prod/connections/myrole 경로 하위의 값들을 받아오도록 요청해야 합니다. list 뿐만 아니라 create, edit, delete에 대한 기능도 추가해주어야 합니다.

이를 위해 UI 플러그인에서 현재 접속한 사용자의 Role 이름을 받아올 수 있어야 합니다. 이 때 flask의 global session을 활용하면 쉽게 받아올 수 있습니다.

from flask import g

role_name = g.user.roles[0].name

이제 UI에서 추가, 편집, 삭제 시 Secrets Backend를 통해 AWS Parameter Store에 반영됩니다. 오직 권한을 가진 사용자만이 DAG, Connection, Variable에 접근할 수 있습니다.


Cluster Policy

DAG 작성에 대한 가이드가 있더라도 모두 만족하는지 체크하는건 상당히 번거로운 일 입니다. Airflow 2.0+에서는 Cluster Policy를 통해 클러스터 전체에서 DAG 또는 task에 대한 정책을 정의하고 강제하도록 설정할 수 있습니다. 예를 들면 다음과 같은 정책을 정의할 수 있습니다.

  • 모든 DAG에는 적어도 하나의 태그를 달아야 한다
  • 특정 task의 timeout은 48시간을 넘을 수 없다

airflow_local_settings.py 파일을 만들고 정의하면 적용할 수 있습니다. 태그를 강제하는 정책 예시는 아래와 같습니다.

def dag_policy(dag: DAG):
    """Ensure that DAG has at least one tag"""
    if not dag.tags:
        raise AirflowClusterPolicyViolation(
            f"DAG {dag.dag_id} has no tags. At least one tag required. File path: {dag.filepath}"
        )

위 정책이 적용된 클러스터에 태그가 없는 DAG을 배포하는 경우, AirflowClusterPolicyViolation 오류가 발생하기 때문에 DAG을 등록할 수 없습니다. 자세한 내용은 공식문서를 참고하시면 됩니다.


정리

최근 Airflow Summit에서 Multi-Tenent와 관련된 영상들이 많이 올라와서 함께 참고하면 도움이 될 것 같습니다.

← PrevNext →
  • Powered by Contentful
  • COPYRIGHT © 2020 by @swalloow