[ thanks Aizhamal for bumping up this thread ]
Yes, since I posted originally, I was pointed at @jaketf's PR -
https://github.com/apache/airflow/pull/6210 - and I have been active in
the discussions there (I'm @JonnyIncognito) along with others. @Fokko is
also helping wrt figuring out how to store state between different
phases of that new operator, which has turned into a prerequisite to
making it work because Airflow currently clears all XCom state before
starting a task, even if it's rescheduled. There was a PoC PR of that
emergent issue here https://github.com/apache/airflow/pull/6370 but the
approach needs some more thought.
That new operator concept helps to reduce the need for SubDags in my two
problems below because it would allow operators and sensors to be
combined into a more atomic unit wrt retries. I like to think of it as a
multi-phased or "atomic" operator, where the coupling is a bit tighter
than having separate tasks.
An implementation deriving from this new base operator could also solve
the first problem below of "EMR cluster reuse". We could conditionally
start each cluster in the first phase of the tasks by passing in a
cluster id over XCom (existing cluster) and a EMR cluster spec (in case
the cluster needs to be started or restarted). I think solving the below
problems this way is somewhat skirting around the issue, but it's a good
start.
It doesn't, however, help to solve the second problem of resource
management / auto-scaling with parallel tasks. I think here is where we
get into the territory of the other potential solutions I proposed:
2. Build resource management into Airflow
2a. Allow tasks to specify *resource* dependencies. As written above, a
new dimension of dependencies that lets Airflow manage when instances of
a resource should be spun up or otherwise acquired.
2b. Allow Airflow to have multiple Executors (could be an implementation
of 2a), e.g. EmrClusterExecutor. The scheduler still does its thing, but
tasks are run on a specialised kind of executor that understands EMR steps.
... which I'd love some feedback on. It's a fairly meaty subject, so
perhaps I should create a draft AIP to at least get the problems in a
format that's easier to digest.
Regards,
Jon
On 31/10/2019 08:42, Kamil Breguła wrote:
Hello,
I wonder how much this PR is the solution to your problems.
https://github.com/apache/airflow/pull/6210
This PR improves the use of resources when the operation is performed remotely.
Best regards,
On Thu, Oct 31, 2019 at 2:05 AM Aizhamal Nurmamat kyzy
<[email protected]> wrote:
Is anyone able to give useful pointers to Jon?
+Ash Berlin-Taylor <[email protected]> [email protected]
<[email protected]> +Driesprong, Fokko <[email protected]> , +Jarek
Potiuk <[email protected]> tagging you just because I know you all
and feel safe about not being judged :D
On Tue, Oct 15, 2019 at 1:41 PM Jonathan Miles <[email protected]> wrote:
Hi all,
Posting this to dev instead of users because it crosses into framework
territory.
I've been using Airflow for six months or so and I'm starting to think
about how to better manage Airflow tasks that are proxies for compute
tasks running elsewhere, e.g. steps on Amazon EMR clusters. It's easy
enough use a DAG with the various existing Emr...Operators to create
clusters, add steps and tear down. However, with large numbers of
parallel steps it's hard to manage the creation of EMR clusters, reuse
them in various steps and potentially even dynamically scale the EMR
cluster count. I want something more akin to a producer/consumer queue
for EMR steps. Before I write an AIP, I want to see if anyone's aware of
any other work or development in this area?
*Example Problem 1*: cluster reuse.
A workflow might need to spin up and execute steps on multiple EMR
clusters of different sizes, e.g.
- EMR Cluster A
-- Phase 1 EMR Steps
-- Phase 2 EMR Steps
-- Phase 3 EMR Steps
-- Phase 4 EMR Steps
- EMR Cluster B
-- Phase 5 EMR Steps
-- Phase 6 EMR Steps
- EMR Cluster C
-- Phase 7 EMR Steps
-- Phase 8 EMR Steps
The above steps are serial, requiring the previous phase to finish. The
most basic way to model it in a DAG is to use EmrCreateJobFlowOperator
for a cluster, then a set of EmrAddStepsOperator and EmrStepSensor pairs
for each phase and finally terminate the cluster with
EmrTerminateJobFlowOperator. We can use XCom to fetch the cluster id for
AddSteps/TerminateJobFlow and to fetch the step ids for the StepSensor.
- EmrCreateJobFlowOperator
- Phase 1: EmrAddStepsOperator + EmrStepSensor
- Phase 2: EmrAddStepsOperator + EmrStepSensor
- Phase 3: EmrAddStepsOperator + EmrStepSensor
- Phase 4: EmrAddStepsOperator + EmrStepSensor
- EmrTerminateJobFlowOperator
One problem here is that if the underlying EMR cluster fails at any time
- e.g. it could be using spot EC2 instances and AWS capacity runs out -
someone needs to manually attend to the failed task instance: restart
the EMR cluster; then reset the state of the failed AddSteps/StepSensor
task pairs. It needs close supervision.
There are other ways to model this workflow in a DAG with different
trade-offs, e.g.
1. Put each phase in a SubDag then create and terminate the cluster for
each phase, with any failed task causing the whole SubDag to retry. But
this adds extra total duration due to stopping/starting the cluster for
each phase, which is not insignificant.
2. Write custom operators. One to represent an EMR cluster as a
EmrSubDagOperator to create and eventually terminate cluster. Then
another another to create sub-tasks that use XCom to fetch the cluster
id from the "parent" SubDag, add the EMR steps and wait.
Ideally, however, I'd like Airflow to manage this itself: it could know
that task instances require certain resources or resource instances,
then start/stop them as required.
*Example Problem 2*: parallel tasks.
A workflow might be split into many parallel steps, e.g. for different
data partitions or bins.
- Phase P EMR Steps
-- Parallel EMR step for bin 0
-- ...
-- Parallel EMR step for bin N
The above steps are independent, where they compute a subset of the
larger problem, so can be run in parallel. A basic way to model this as
a DAG is to create as many branches as the desired parallelism level,
e.g. for parallelism of two:
- EmrCreateJobFlowOperator
-- Bin 0: EmrAddStepsOperator + EmrStepSensor
-- ...
-- Bin max(even(B)): EmrAddStepsOperator + EmrStepSensor
-- EmrTerminateJobFlowOperator
- EmrCreateJobFlowOperator
-- Bin 1: EmrAddStepsOperator + EmrStepSensor
-- ...
-- Bin max(odd(B)): EmrAddStepsOperator + EmrStepSensor
-- EmrTerminateJobFlowOperator
This has the same management problem as the previous example when
clusters fail, but also another challenge: the parallelism level is
statically coded into the DAG topology. It's hard to scale up/down and
rebalance the tasks for the bins.
I could create a separate "EMR cluster" management service outside of
Airflow and write a custom Airflow operator to put EMR steps into a
queue then have that service auto-scale depending on queue depth etc. If
the queue were Celery, it starts to look like a specialised Airflow
Executor.
*Solutions*:
Without yet doing much research, I've considered some competing
solutions for both problems:
1. Service for managing resources. Custom operator to communicate with
the service and schedule an atomic set of "EMR steps" with a given EMR
cluster specification. Service decides whether or not to reuse an
existing cluster or spin up a new one, can auto-scale etc. We can
represent both serial or parallel step patterns in the Airflow DAG itself.
2. Build resource management into Airflow
2a. Allow tasks to specify *resource* dependencies. As written above, a
new dimension of dependencies that lets Airflow manage when instances of
a resource should be spun up or otherwise acquired.
2b. Allow Airflow to have multiple Executors (could be an implementation
of 2a), e.g. EmrClusterExecutor. The scheduler still does its thing, but
tasks are run on a specialised kind of executor that understands EMR steps.
Any thoughts? Has anyone worked on this set of problems before? I'm
specifically looking at EMR right now, but I suspect there are many
other use-cases.
Regards,
Jon