Hi all,
Posting this to dev instead of users because it crosses into framework
territory.
I've been using Airflow for six months or so and I'm starting to think
about how to better manage Airflow tasks that are proxies for compute
tasks running elsewhere, e.g. steps on Amazon EMR clusters. It's easy
enough use a DAG with the various existing Emr...Operators to create
clusters, add steps and tear down. However, with large numbers of
parallel steps it's hard to manage the creation of EMR clusters, reuse
them in various steps and potentially even dynamically scale the EMR
cluster count. I want something more akin to a producer/consumer queue
for EMR steps. Before I write an AIP, I want to see if anyone's aware of
any other work or development in this area?
*Example Problem 1*: cluster reuse.
A workflow might need to spin up and execute steps on multiple EMR
clusters of different sizes, e.g.
- EMR Cluster A
-- Phase 1 EMR Steps
-- Phase 2 EMR Steps
-- Phase 3 EMR Steps
-- Phase 4 EMR Steps
- EMR Cluster B
-- Phase 5 EMR Steps
-- Phase 6 EMR Steps
- EMR Cluster C
-- Phase 7 EMR Steps
-- Phase 8 EMR Steps
The above steps are serial, requiring the previous phase to finish. The
most basic way to model it in a DAG is to use EmrCreateJobFlowOperator
for a cluster, then a set of EmrAddStepsOperator and EmrStepSensor pairs
for each phase and finally terminate the cluster with
EmrTerminateJobFlowOperator. We can use XCom to fetch the cluster id for
AddSteps/TerminateJobFlow and to fetch the step ids for the StepSensor.
- EmrCreateJobFlowOperator
- Phase 1: EmrAddStepsOperator + EmrStepSensor
- Phase 2: EmrAddStepsOperator + EmrStepSensor
- Phase 3: EmrAddStepsOperator + EmrStepSensor
- Phase 4: EmrAddStepsOperator + EmrStepSensor
- EmrTerminateJobFlowOperator
One problem here is that if the underlying EMR cluster fails at any time
- e.g. it could be using spot EC2 instances and AWS capacity runs out -
someone needs to manually attend to the failed task instance: restart
the EMR cluster; then reset the state of the failed AddSteps/StepSensor
task pairs. It needs close supervision.
There are other ways to model this workflow in a DAG with different
trade-offs, e.g.
1. Put each phase in a SubDag then create and terminate the cluster for
each phase, with any failed task causing the whole SubDag to retry. But
this adds extra total duration due to stopping/starting the cluster for
each phase, which is not insignificant.
2. Write custom operators. One to represent an EMR cluster as a
EmrSubDagOperator to create and eventually terminate cluster. Then
another another to create sub-tasks that use XCom to fetch the cluster
id from the "parent" SubDag, add the EMR steps and wait.
Ideally, however, I'd like Airflow to manage this itself: it could know
that task instances require certain resources or resource instances,
then start/stop them as required.
*Example Problem 2*: parallel tasks.
A workflow might be split into many parallel steps, e.g. for different
data partitions or bins.
- Phase P EMR Steps
-- Parallel EMR step for bin 0
-- ...
-- Parallel EMR step for bin N
The above steps are independent, where they compute a subset of the
larger problem, so can be run in parallel. A basic way to model this as
a DAG is to create as many branches as the desired parallelism level,
e.g. for parallelism of two:
- EmrCreateJobFlowOperator
-- Bin 0: EmrAddStepsOperator + EmrStepSensor
-- ...
-- Bin max(even(B)): EmrAddStepsOperator + EmrStepSensor
-- EmrTerminateJobFlowOperator
- EmrCreateJobFlowOperator
-- Bin 1: EmrAddStepsOperator + EmrStepSensor
-- ...
-- Bin max(odd(B)): EmrAddStepsOperator + EmrStepSensor
-- EmrTerminateJobFlowOperator
This has the same management problem as the previous example when
clusters fail, but also another challenge: the parallelism level is
statically coded into the DAG topology. It's hard to scale up/down and
rebalance the tasks for the bins.
I could create a separate "EMR cluster" management service outside of
Airflow and write a custom Airflow operator to put EMR steps into a
queue then have that service auto-scale depending on queue depth etc. If
the queue were Celery, it starts to look like a specialised Airflow
Executor.
*Solutions*:
Without yet doing much research, I've considered some competing
solutions for both problems:
1. Service for managing resources. Custom operator to communicate with
the service and schedule an atomic set of "EMR steps" with a given EMR
cluster specification. Service decides whether or not to reuse an
existing cluster or spin up a new one, can auto-scale etc. We can
represent both serial or parallel step patterns in the Airflow DAG itself.
2. Build resource management into Airflow
2a. Allow tasks to specify *resource* dependencies. As written above, a
new dimension of dependencies that lets Airflow manage when instances of
a resource should be spun up or otherwise acquired.
2b. Allow Airflow to have multiple Executors (could be an implementation
of 2a), e.g. EmrClusterExecutor. The scheduler still does its thing, but
tasks are run on a specialised kind of executor that understands EMR steps.
Any thoughts? Has anyone worked on this set of problems before? I'm
specifically looking at EMR right now, but I suspect there are many
other use-cases.
Regards,
Jon