Skip to content

Airflow on Kubernetes (2)

  • DataEngineering

📅 July 12, 2020

⏱️5 min read

최근 Airflow에는 Kubernetes 지원을 위해 다양한 컴포넌트들이 추가되고 있습니다. 이러한 변화의 흐름에 따라 Airflow를 Kubernetes 위에 배포하고 운영하는 방법에 대해 글을 작성해보고자 합니다. 이 글은 시리즈로 연재됩니다.


Airflow on Kubernetes

지난 글에서는 stable/airflow helm chart를 이용하여 CeleryExecutor의 각 모듈을 Kubernetes 위에 올리는 방식에 대해 설명하였습니다. 이번 글에서는 많이 사용하는 Airflow Helm Chart에 대해 알아보고 최근에 추가된 Official Airflow Helm Chart를 이용하여 KubernetesExecutor를 배포했을 때 어떤 아키텍쳐를 가지는지에 대해 설명드리려 합니다. 먼저 많이 사용하는 차트는 아래와 같이 3가지가 있습니다.


1. stable/airflow: 다양한 옵션을 지원하고 많이 사용하지만 커뮤니티 버전입니다. 공식 릴리즈 이후에 개발이 중단될 예정입니다.

2. astronomer/airflow-chart: Airflow as a Service를 개발하는 astronomer에서 공개한 차트입니다. airflow 2.0의 공식 차트로 활용될 예정입니다. (merge된 상태)

3. apache/airflow-on-k8s-operator: Kubernetes Operator를 활용한 방식으로 위와 다른 구성을 가지고 있습니다. 구글에서 apache에 기증했으며 GCP의 Composer에서 활용되고 있다고 알려져 있습니다.


이외에도 최근에 공식 차트가 PR-8777을 통해 merge 되었습니다. 아직 정식 릴리즈는 아니지만 큰 이슈는 없는 것으로 보여 공식 차트 기준으로 설명하겠습니다.


Airflow Executor on Kubernetes

task_lifecycle

먼저 공식 차트 기준으로 executor마다 컴포넌트가 어떤 형태로 올라가는지 알아보겠습니다. 컴포넌트는 크게 아래와 같이 구분하고 있으며 위의 그림과 같은 라이프사이클에 따라 동작합니다.

  • webserver: Airflow UI, RBAC, DAG monitoring
  • scheduler: task monitoring, trigger, DAG sync, DAG processing
  • executor: how task instance running (pluggable)
  • worker: task instance processing

LocalExecutor

localexecutor

LocalExecutor는 Scheduler에서 각 task가 subprocess 형태로 돌아가는 구조입니다. Scale-Out이 어렵기 때문에 간단한 테스트 용도로 사용하는 경우가 많습니다.


CeleryExecutor + DAG PV

celeryexecutor3

CeleryExecutor는 Scheduler가 task queue에 작업을 전달하고 worker에서 작업이 수행되는 구조입니다. 지난 번 글에서 언급했듯이 여러 노드에 걸쳐 있는 DAG 파일을 동기화하기 위해 PV, git-sync 2가지 옵션을 지원합니다. 이 옵션은 KubernetesExecutor에서도 지원합니다.


위의 그림에서는 AWS EFS를 기준으로 표현했지만 NFS를 지원하는 스토리지에서 모두 활용 가능합니다. 이 방식은 스토리지를 별도로 두기 때문에 git과 다르게 배포 주기를 가져갈 수 있습니다. 그리고 worker pod이 statefulset 형태로 변경되었습니다. 이를 통해 각 worker에 PV를 연결하고 airflow UI에서 각 task의 로그를 볼 수 있습니다.


CeleryExecutor + DAG git-sync

celeryexecutor4

git-sync 옵션을 사용한다면 위와 같은 그림으로 구성됩니다. airflow의 각 컴포넌트에 git-sync 컨테이너가 sidecar 형태로 추가됩니다. 이 방식은 DAG 전용 git repository가 있다면 자동으로 배포를 구성할 수 있다는 장점이 있습니다. 처음 차트를 배포할때 init container 단계에서 git clone을 수행하고 이후부터는 git-sync 사이드카 컨테이너를 통해 주기적으로 pull을 수행하게 됩니다.


CeleryExecutor + KEDA AutoScaler

keda


KEDA AutoScaler는 공식 차트에만 추가된 옵션입니다. 기존의 Horizontal Pod Autoscaler는 리소스(CPU, Memory) 메트릭을 기반으로 스케일 여부를 결정하게 됩니다. 반면에 KEDA는 특정 이벤트를 기반으로 스케일 여부를 결정할 수 있습니다. 예를 들어 airflow는 metadb를 통해 현재 실행 중이거나 대기 중인 task가 얼마나 존재하는지 알 수 있습니다. 이러한 이벤트를 활용하여 worker의 scale을 결정한다면 queue에 task가 많이 추가되는 시점에 더 빠르게 확장할 수 있습니다.

SELECT ceil(COUNT(*)::decimal / 16)
FROM task_instance
WHERE state='running' OR state='queued'

이를 위해 airflow에서는 KEDA의 PostgreSQL trigger를 활용하였고 실제 위와 같은 쿼리가 등록되어 있습니다. KEDA는 CRD와 custom controller로 구성되어 있기 때문에 기존 HPA와 함께 사용 가능하며 모든 K8S 클러스터에 추가할 수 있습니다.


CeleryExecutor vs KubernetesExecutor

여기까지 CeleryExecutor에 대해 알아보았습니다. CeleryExecutor 또한 Kubernetes 위에 배포하면 Helm 차트를 통한 선언형 리소스 관리, 쉬운 버전 업데이트, DAG 배포 자동화, 쉬운 리소스 확장 등의 장점을 가질 수 있습니다. 하지만 Celery에 대한 의존성이 남아있기 때문에 Redis, Celery Worker에 대한 리소스를 계속 점유하고 있어야 합니다. 다시 말해서, Scale to Zero가 어렵다는 단점이 있습니다. KubernetesExecutor는 task가 존재할때만 pod이 생성되고 task가 완료되면 종료되기 때문에 더 리소스를 효율적으로 사용한다고 볼 수 있습니다.


KubernetesExecutor, KubernetesPodOperator

kubernetesexecutor


위의 그림처럼 KubernetesExecutor는 Broker와 같은 리소스를 점유하고 있을 필요가 없습니다. 리소스를 할당하고 스케줄링 하는 역할은 Kubernetes Scheduler가 수행하게 됩니다. Airflow Scheduler는 API Server에게 task 수행을 위한 Pod 생성을 요청합니다. worker는 images.airflow에 설정한 이미지로 Pod이 생성되기 때문에 추가로 필요한 파이썬 패키지가 존재한다면 별도의 이미지를 만들어주어야 합니다. 만일 task pod 마다 다른 이미지와 리소스 설정을 가지도록 하고 싶다면 KubernetesPodOperator를 사용하시면 됩니다. KubernetesPodOperator는 worker를 통해 pod이 생성되는 구조이므로 파라메터를 통해 사용자가 원하는 설정으로 변경할 수 있습니다.


KubernetesExecutor Process

kubeexecutor2


KubernetesExecutor는 위와 같은 프로세스를 통해 동작합니다. 일반적으로 Pod이 생성되는 과정과 동일하며 airflow에서는 내부적으로 python kubernetes client library를 통해 k8s_model 이라는 객체로 K8S API를 추상화하여 사용하고 있습니다.


kubeexecutor3


task가 완료되기 전에 Airflow DB 상태 업데이트 단계에서 OOM 등의 이유로 Pod Crash가 언제나 발생할 수 있기 때문에 이에 대한 장애 시나리오도 준비되어 있습니다. DB 업데이트에 실패하더라도 airflow scheduler는 Kubernetes Watch API를 통해 pod의 상태를 전달받아 다시 DB 상태를 업데이트 할 수 있습니다. CeleryExecutor의 경우, task 상태에 대한 처리를 celery에 주기적으로 확인하는 방식이라면 KubernetesExecutor는 이벤트 스트림으로 전달받기 때문에 스케줄러에 대한 부하가 더 낮다고 볼 수 있습니다.


KubernetesExecutor Batch, CronJob

공식 차트에서는 사용자의 편의를 위해 RBAC 초기 사용자를 생성해주는 create-user BatchJob이 추가되었습니다. Helm Hooks (post-install) 를 통해 차트 리소스가 모두 생성된 이후에 수행됩니다. 더 이상 exec 명령어로 bash에 들어가 create-user 명령어를 수행할 필요가 없습니다!

추가로 cleanup CronJob이 있습니다. AIRFLOW__KUBERNETES__DELETE_WORKER_PODS 옵션을 통해 task가 끝나더라도 pod이 종료되지 않도록 설정할 수 있는데 이때 내가 원하는 주기마다 오래된 pod을 삭제할 수 있는 CronJob 입니다.


Official Helm Chart Issue

공식 버전 차트는 아래와 같은 이슈가 남아있지만 2.0 정식 버전 출시와 함께 해결될 예정입니다. 글을 작성하는 과정에서 DAG 동기화 관련 버그를 발견하였지만 리뷰를 통해 곧바로 수정되었습니다. (PR-9371). stable/airflow 차트와 비교했을때 아쉬운 점은 아래와 같습니다.

  • 현재 버전에서는 backend로 postgresql만 지원 (ISSUE-9627)
  • pip 등 작업 실행에 필요한 패키지 설치하는 옵션이 없음
  • initContainer를 수정해서 설치하거나 이미지 별도로 생성해야함
  • 차트에 Ingress 설정에 대한 옵션이 부족
  • KubernetesExecutor의 경우 remote logging 설정을 해야 UI에서 로그 확인 가능

Deploy

사실 배포와 옵션에 대한 내용은 지난 글에서 말한 내용과 크게 다름이 없습니다. 아직 정식 릴리즈까지 변경될 여지가 많다보니 아래 공식 문서 따라하시는 방법을 추천드립니다 (apache/airflow/chart). 다음 글에서는 KubernetesExecutor의 로깅과 모니터링에 대해 다루어보겠습니다!

Next →
  • Powered by Contentful
  • COPYRIGHT © 2020 by @swalloow