Hello,

I wonder how much this PR is the solution to your problems.
https://github.com/apache/airflow/pull/6210
This PR improves the use of resources when the operation is performed remotely.

Best regards,

On Thu, Oct 31, 2019 at 2:05 AM Aizhamal Nurmamat kyzy
<[email protected]> wrote:
>
> Is anyone able to give useful pointers to Jon?
>
> +Ash Berlin-Taylor <[email protected]> [email protected]
> <[email protected]> +Driesprong, Fokko <[email protected]> , +Jarek
> Potiuk <[email protected]> tagging you just because I know you all
> and feel safe about not being judged :D
>
> On Tue, Oct 15, 2019 at 1:41 PM Jonathan Miles <[email protected]> wrote:
>
> > Hi all,
> >
> > Posting this to dev instead of users because it crosses into framework
> > territory.
> >
> > I've been using Airflow for six months or so and I'm starting to think
> > about how to better manage Airflow tasks that are proxies for compute
> > tasks running elsewhere, e.g. steps on Amazon EMR clusters. It's easy
> > enough use a DAG with the various existing Emr...Operators to create
> > clusters, add steps and tear down. However, with large numbers of
> > parallel steps it's hard to manage the creation of EMR clusters, reuse
> > them in various steps and potentially even dynamically scale the EMR
> > cluster count. I want something more akin to a producer/consumer queue
> > for EMR steps. Before I write an AIP, I want to see if anyone's aware of
> > any other work or development in this area?
> >
> > *Example Problem 1*: cluster reuse.
> >
> > A workflow might need to spin up and execute steps on multiple EMR
> > clusters of different sizes, e.g.
> >
> > - EMR Cluster A
> > -- Phase 1 EMR Steps
> > -- Phase 2 EMR Steps
> > -- Phase 3 EMR Steps
> > -- Phase 4 EMR Steps
> >
> > - EMR Cluster B
> > -- Phase 5 EMR Steps
> > -- Phase 6 EMR Steps
> >
> > - EMR Cluster C
> > -- Phase 7 EMR Steps
> > -- Phase 8 EMR Steps
> >
> > The above steps are serial, requiring the previous phase to finish. The
> > most basic way to model it in a DAG is to use EmrCreateJobFlowOperator
> > for a cluster, then a set of EmrAddStepsOperator and EmrStepSensor pairs
> > for each phase and finally terminate the cluster with
> > EmrTerminateJobFlowOperator. We can use XCom to fetch the cluster id for
> > AddSteps/TerminateJobFlow and to fetch the step ids for the StepSensor.
> >
> > - EmrCreateJobFlowOperator
> > - Phase 1: EmrAddStepsOperator + EmrStepSensor
> > - Phase 2: EmrAddStepsOperator + EmrStepSensor
> > - Phase 3: EmrAddStepsOperator + EmrStepSensor
> > - Phase 4: EmrAddStepsOperator + EmrStepSensor
> > - EmrTerminateJobFlowOperator
> >
> > One problem here is that if the underlying EMR cluster fails at any time
> > - e.g. it could be using spot EC2 instances and AWS capacity runs out -
> > someone needs to manually attend to the failed task instance: restart
> > the EMR cluster; then reset the state of the failed AddSteps/StepSensor
> > task pairs. It needs close supervision.
> >
> > There are other ways to model this workflow in a DAG with different
> > trade-offs, e.g.
> >
> > 1. Put each phase in a SubDag then create and terminate the cluster for
> > each phase, with any failed task causing the whole SubDag to retry. But
> > this adds extra total duration due to stopping/starting the cluster for
> > each phase, which is not insignificant.
> >
> > 2. Write custom operators. One to represent an EMR cluster as a
> > EmrSubDagOperator to create and eventually terminate cluster. Then
> > another another to create sub-tasks that use XCom to fetch the cluster
> > id from the "parent" SubDag, add the EMR steps and wait.
> >
> > Ideally, however, I'd like Airflow to manage this itself: it could know
> > that task instances require certain resources or resource instances,
> > then start/stop them as required.
> >
> > *Example Problem 2*: parallel tasks.
> >
> > A workflow might be split into many parallel steps, e.g. for different
> > data partitions or bins.
> >
> > - Phase P EMR Steps
> > -- Parallel EMR step for bin 0
> > -- ...
> > -- Parallel EMR step for bin N
> >
> > The above steps are independent, where they compute a subset of the
> > larger problem, so can be run in parallel. A basic way to model this as
> > a DAG is to create as many branches as the desired parallelism level,
> > e.g. for parallelism of two:
> >
> > - EmrCreateJobFlowOperator
> > -- Bin 0: EmrAddStepsOperator + EmrStepSensor
> > -- ...
> > -- Bin max(even(B)): EmrAddStepsOperator + EmrStepSensor
> > -- EmrTerminateJobFlowOperator
> >
> > - EmrCreateJobFlowOperator
> > -- Bin 1: EmrAddStepsOperator + EmrStepSensor
> > -- ...
> > -- Bin max(odd(B)): EmrAddStepsOperator + EmrStepSensor
> > -- EmrTerminateJobFlowOperator
> >
> > This has the same management problem as the previous example when
> > clusters fail, but also another challenge: the parallelism level is
> > statically coded into the DAG topology. It's hard to scale up/down and
> > rebalance the tasks for the bins.
> >
> > I could create a separate "EMR cluster" management service outside of
> > Airflow and write a custom Airflow operator to put EMR steps into a
> > queue then have that service auto-scale depending on queue depth etc. If
> > the queue were Celery, it starts to look like a specialised Airflow
> > Executor.
> >
> > *Solutions*:
> >
> > Without yet doing much research, I've considered some competing
> > solutions for both problems:
> >
> > 1. Service for managing resources. Custom operator to communicate with
> > the service and schedule an atomic set of "EMR steps" with a given EMR
> > cluster specification. Service decides whether or not to reuse an
> > existing cluster or spin up a new one, can auto-scale etc. We can
> > represent both serial or parallel step patterns in the Airflow DAG itself.
> >
> > 2. Build resource management into Airflow
> >
> > 2a. Allow tasks to specify *resource* dependencies. As written above, a
> > new dimension of dependencies that lets Airflow manage when instances of
> > a resource should be spun up or otherwise acquired.
> >
> > 2b. Allow Airflow to have multiple Executors (could be an implementation
> > of 2a), e.g. EmrClusterExecutor. The scheduler still does its thing, but
> > tasks are run on a specialised kind of executor that understands EMR steps.
> >
> > Any thoughts? Has anyone worked on this set of problems before? I'm
> > specifically looking at EMR right now, but I suspect there are many
> > other use-cases.
> >
> > Regards,
> >
> > Jon
> >
> >

Reply via email to