Apache Airflow에 기여하면서 배운 점들

Apache Airflow는 코드를 통해 워크플로우를 관리하고 모니터링 할 수 있도록 도와주는 플랫폼이다. Airflow 프로젝트에 대한 설명은 다른 글에서도 많이 다루기 때문에 생략하고 이 글에서는 처음으로 아파치 프로젝트에 기여해본 경험을 정리해보려 한다.

기여하게 된 배경

당시에 관리하던 데이터 인프라에는 의존성이 얽혀있는 배치 작업이 상당히 많았다. 여기에서 의존성이 얽혀있다는 말은 A 작업과 B 작업이 성공적으로 끝나고 난 뒤 C 작업을 해야하는 경우를 말한다. 또한 각 작업들은 서로 다른 시간에 스케줄링 되어야 했고, 작업이 실패하는 경우 재시도 또는 특정 로직을 실행시킬 수 있어야 했다.

처음에는 단순한 구조이다 보니 스크립트로 관리했지만 점차 늘어나는 운영 이슈에 대응하기 위해 Airflow를 활용하기로 결정했다. 하지만 운영하다 보니 AWS 관련 컴포넌트들의 여러 버그를 발견하게 되었고 이를 수정하기 위해 PR을 추가했었다.

아파치 프로젝트 PR 프로세스

아파치 프로젝트는 이슈 관리 도구로 JIRA를 사용한다. CI 도구는 프로젝트마다 다른 편인데 Airflow의 경우 TravisCI를 사용한다. 모든 프로젝트에는 처음 프로젝트에 기여하려는 개발자를 위해 CONTRIBUTING.md 라는 문서를 제공한다. 문서에는 개발 및 테스트 환경을 어떻게 구축해야하는지, 지켜야할 규칙, PR 가이드라인 등에 대해 설명되어 있다. 그리고 PR template를 준수해야 하는데 잘 모르겠다면, 이전 PR들을 확인하고 비슷한 양식으로 작성하면 된다.

내가 처음 접했던 Airflow 문서에는 AWS 관련 Hook, Operator도 반영되어 있지 않았다. 그래서 첫 PR로 AWS, GCP 관련 컴포넌트를 업데이트하는 문서 기여를 하게 되었다. 문서 관리에는 readthedocs를 사용하고 있었고 Sphinx 빌드를 통해 문서를 확인할 수 있었다.

사용하다보니 특히 EMR 관련 Hook과 Operator에 버그가 많았다. 만일 JIRA에 이미 등록되어 있는 이슈가 아니라면 이슈를 새로 생성한 다음 PR을 추가해주어야 한다.

비슷한 이슈를 겪고 있는 사람들이 있어서 좀 신기했다. 그리고 아주 작은 수정이라도 테스트 케이스를 추가해야 한다는 사실을 알게 되었다.

양식만 잘 지키면 커미터들은 정말 친절하다. 내가 파악하지 못한 부분까지 알려주고, 코드 리뷰도 받을 수 있다. 다른 PR을 참고하면서 많이 배울 수 있었다.

클라우드 인프라 테스트 방법

AWS는 기본적으로 클라우드 환경이다. 따라서 과금문제로 인해 실제로 추가, 변경한 오퍼레이터가 잘 동작하는지 매번 확인해보기가 힘들다. Airflow에서는 AWS 서비스를 Mocking 하기 위해 moto 라는 라이브러를 활용해서 테스트를 작성한다.

@mock_s3
def test_my_model_save():
    # Create Bucket so that test can run
    conn = boto3.resource('s3', region_name='us-east-1')
    conn.create_bucket(Bucket='mybucket')
    model_instance = MyModel('steve', 'is awesome')
    model_instance.save()
    body = conn.Object('mybucket', 'steve').get()['Body'].read().decode()

    assert body == 'is awesome'

위와 같이 moto에서 미리 정의한 mock object를 decorator를 사용하여 쉽게 활용할 수 있다. 하지만 AWS에서 공식으로 지원하는 라이브러리가 아니다보니 업데이트가 늦어지기도 한다. 이런 이유로 인해 unittest의 mock으로 작성된 테스트 코드도 많이 있다.

class TestEmrAddStepsOperator(unittest.TestCase):
    # When
    _config = [{
        'Name': 'test_step',
        'ActionOnFailure': 'CONTINUE',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': [
                '/usr/lib/spark/bin/run-example'
            ]
        }
    }]

    def setUp(self):
        configuration.load_test_config()

        # Mock out the emr_client (moto has incorrect response)
        self.emr_client_mock = MagicMock()
        self.operator = EmrAddStepsOperator(
            task_id='test_task',
            job_flow_id='j-8989898989',
            aws_conn_id='aws_default',
            steps=self._config
        )

    def test_init(self):
        self.assertEqual(self.operator.aws_conn_id, 'aws_default')
        self.assertEqual(self.operator.emr_conn_id, 'emr_default')

    def test_render_template(self):
        ti = TaskInstance(self.operator, DEFAULT_DATE)
        ti.render_templates()

        expected_args = [{
            'Name': 'test_step',
            'ActionOnFailure': 'CONTINUE',
            'HadoopJarStep': {
                'Jar': 'command-runner.jar',
                'Args': [
                    '/usr/lib/spark/bin/run-example'
                ]
            }
        }]

        self.assertListEqual(self.operator.steps, expected_args)


if __name__ == '__main__':
    unittest.main()

unittest로 작성된 테스트 케이스는 API로 주고 받는 json을 직접 정의해줘야 하는 번거로움이 있다. 테스트 케이스를 작성하고 난 다음 바로 PR을 추가하는 것보다 로컬 CI를 미리 돌려보는게 좋다.

TravisCI는 오픈소스인 경우 무료로 사용할 수 있으며, yml 파일에 미리 정의되어 있으니 참고하면 된다. 로컬에서 CI가 통과되고 나면 PR을 추가해도 좋다. 작업이 길어지면서 커밋이 여러 개로 늘어나는 경우, commit을 squash 해주는 것이 좋다. (나중에 문제가 생겼을 때 쉽게 rebase 하기 위함)

잡다한 정리

그 동안 5개 정도의 버그를 해결했고 수정했던 AWS EMR 관련 버그들은 1.9 - 10 버전에 모두 반영 되었다. 이외에도 Airflow에는 여전히 자잘한 버그가 많이 남아있다. (Docker로 운영했을 때 로그가 이상하게 나타난다거나, SubDag Deadlock 문제 등) 당시에 블로그를 열심히 했다면 운영 관련해서 글을 남겼을텐데 하는 아쉬움이 남아있다.

어쨋든 Airflow를 적용하고 난 뒤, 편히 새벽에 잠들 수 있게 되었다. 지금은 머신러닝 파이프라인 관련 도구가 많이 나왔지만, Airflow도 충분히 해당 영역을 커버할 수 있다.

그리고 오픈소스에 대해 다시 한번 생각해보게 되었다. 많은 사람들이 참여하는 오픈소스이다 보니 당연히 버그나 이슈가 생길 수 있고, 문제가 생겼을 때 고쳐달라고 강요하거나 기다리는 것보다 스스로 수정해서 기여하는 것이 올바른 태도가 아닌가 싶다.