Hi all,

Posting this to dev instead of users because it crosses into framework territory.

I've been using Airflow for six months or so and I'm starting to think about how to better manage Airflow tasks that are proxies for compute tasks running elsewhere, e.g. steps on Amazon EMR clusters. It's easy enough use a DAG with the various existing Emr...Operators to create clusters, add steps and tear down. However, with large numbers of parallel steps it's hard to manage the creation of EMR clusters, reuse them in various steps and potentially even dynamically scale the EMR cluster count. I want something more akin to a producer/consumer queue for EMR steps. Before I write an AIP, I want to see if anyone's aware of any other work or development in this area?

*Example Problem 1*: cluster reuse.

A workflow might need to spin up and execute steps on multiple EMR clusters of different sizes, e.g.

- EMR Cluster A
-- Phase 1 EMR Steps
-- Phase 2 EMR Steps
-- Phase 3 EMR Steps
-- Phase 4 EMR Steps

- EMR Cluster B
-- Phase 5 EMR Steps
-- Phase 6 EMR Steps

- EMR Cluster C
-- Phase 7 EMR Steps
-- Phase 8 EMR Steps

The above steps are serial, requiring the previous phase to finish. The most basic way to model it in a DAG is to use EmrCreateJobFlowOperator for a cluster, then a set of EmrAddStepsOperator and EmrStepSensor pairs for each phase and finally terminate the cluster with EmrTerminateJobFlowOperator. We can use XCom to fetch the cluster id for AddSteps/TerminateJobFlow and to fetch the step ids for the StepSensor.

- EmrCreateJobFlowOperator
- Phase 1: EmrAddStepsOperator + EmrStepSensor
- Phase 2: EmrAddStepsOperator + EmrStepSensor
- Phase 3: EmrAddStepsOperator + EmrStepSensor
- Phase 4: EmrAddStepsOperator + EmrStepSensor
- EmrTerminateJobFlowOperator

One problem here is that if the underlying EMR cluster fails at any time - e.g. it could be using spot EC2 instances and AWS capacity runs out - someone needs to manually attend to the failed task instance: restart the EMR cluster; then reset the state of the failed AddSteps/StepSensor task pairs. It needs close supervision.

There are other ways to model this workflow in a DAG with different trade-offs, e.g.

1. Put each phase in a SubDag then create and terminate the cluster for each phase, with any failed task causing the whole SubDag to retry. But this adds extra total duration due to stopping/starting the cluster for each phase, which is not insignificant.

2. Write custom operators. One to represent an EMR cluster as a EmrSubDagOperator to create and eventually terminate cluster. Then another another to create sub-tasks that use XCom to fetch the cluster id from the "parent" SubDag, add the EMR steps and wait.

Ideally, however, I'd like Airflow to manage this itself: it could know that task instances require certain resources or resource instances, then start/stop them as required.

*Example Problem 2*: parallel tasks.

A workflow might be split into many parallel steps, e.g. for different data partitions or bins.

- Phase P EMR Steps
-- Parallel EMR step for bin 0
-- ...
-- Parallel EMR step for bin N

The above steps are independent, where they compute a subset of the larger problem, so can be run in parallel. A basic way to model this as a DAG is to create as many branches as the desired parallelism level, e.g. for parallelism of two:

- EmrCreateJobFlowOperator
-- Bin 0: EmrAddStepsOperator + EmrStepSensor
-- ...
-- Bin max(even(B)): EmrAddStepsOperator + EmrStepSensor
-- EmrTerminateJobFlowOperator

- EmrCreateJobFlowOperator
-- Bin 1: EmrAddStepsOperator + EmrStepSensor
-- ...
-- Bin max(odd(B)): EmrAddStepsOperator + EmrStepSensor
-- EmrTerminateJobFlowOperator

This has the same management problem as the previous example when clusters fail, but also another challenge: the parallelism level is statically coded into the DAG topology. It's hard to scale up/down and rebalance the tasks for the bins.

I could create a separate "EMR cluster" management service outside of Airflow and write a custom Airflow operator to put EMR steps into a queue then have that service auto-scale depending on queue depth etc. If the queue were Celery, it starts to look like a specialised Airflow Executor.

*Solutions*:

Without yet doing much research, I've considered some competing solutions for both problems:

1. Service for managing resources. Custom operator to communicate with the service and schedule an atomic set of "EMR steps" with a given EMR cluster specification. Service decides whether or not to reuse an existing cluster or spin up a new one, can auto-scale etc. We can represent both serial or parallel step patterns in the Airflow DAG itself.

2. Build resource management into Airflow

2a. Allow tasks to specify *resource* dependencies. As written above, a new dimension of dependencies that lets Airflow manage when instances of a resource should be spun up or otherwise acquired.

2b. Allow Airflow to have multiple Executors (could be an implementation of 2a), e.g. EmrClusterExecutor. The scheduler still does its thing, but tasks are run on a specialised kind of executor that understands EMR steps.

Any thoughts? Has anyone worked on this set of problems before? I'm specifically looking at EMR right now, but I suspect there are many other use-cases.

Regards,

Jon

Reply via email to