Good seeing that more people have been working around this. I'm curious as
to what is the impact on the community (are they writing over and over
Operators combining Sensor + Operator to cover async cases? Are they using
custom code to have atomicity, like python functions used in the hooks ? )

Also, some interesting points were raised, maybe the usage of sensors
should be reviewed and good vs anti-patterns should be highlighted in the
docs..

Here's how I tend to reason about sensors / operators:
- Sensors -> I see those as triggers (or should only be used as such)
- Operators -> do work that is synchronous (blocks the python thread)
- Operator + Sensor -> covers asynchronous cases like running on external
compute environments, but leads to bad Airflow code (call-backs to retry
the pairs, bigger more complex dags)

I think for that last case we might be missing a base class as Jonathan
pointed out, BaseAsyncOperator might be a good solution!

Also, Ash and Jonathan you both have combined operators that are often used
together it seems (not necessarily sensor + operator but also operator +
operator), just wondering if there would be an easy way to combine
Operators without using SubDags ? Like function composition for Operators
if that makes sense?

I just added some code for reference, this is not something I use in
production but it gives an idea of how people might have been patching 2
classes together for async cases (or just cases where 2 operators are just
so often used together and need to be retried atomically that it makes
sense merging them)

Thanks for the discussion!

from airflow.contrib.operators.emr_add_steps_operator import
 EmrAddStepsOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.utils import apply_defaults

class EmrRunSparkJobOperator(EmrAddStepsOperator, EmrStepSensor):
    template_fields = EmrAddStepsOperator.template_fields + EmrStepSensor.
template_fields

    @apply_defaults
    def __init__(
            self,
            job_flow_id,
            jar,
            entry_class,
            job_args,
            *args, **kwargs):
        self.steps = [{
            'Name': kwargs['task_id'],
            'ActionOnFailure': 'CONTINUE',
            'HadoopJarStep': {
                'Jar': 'command-runner.jar',
                'Args': [
                    'spark-submit',
                    '--class',
                    entry_class,
                    jar] + job_args}}]
        self.job_flow_id = job_flow_id
        EmrStepSensor.__init__(self, job_flow_id=job_flow_id, step_id='', *
args, **kwargs)

    def execute(self, context):
        self.step_id = EmrAddStepsOperator.execute(self, context)[0]
        EmrStepSensor.execute(self, context)


On Wed, 16 Oct 2019 at 06:25, Jonathan Miles <j...@cybus.co.uk> wrote:

> That's a great idea, to provide a generic way to do these. I feel like
> standalone sensors are a bit abused in the framework, like they're
> better suited as triggers when an external source is ready (e.g. new S3
> file appears) than to poll for completion of a previous task (e.g.
> EmrStepSensor); exactly because of the lack of atomicity with retries
> discussed earlier in the thread. Whereas the BaseAsyncOperator (I'd
> prefer BaseAtomicOperator or modify existing BaseSensorOperator) removes
> the need for the sensor misuse. Ideally all operators should have the
> ability to use pre/action/sense/post hooks.
>
> It still doesn't solve the problem that we need to write a combined
> EmrAddStep + Sensor operator, but helps all of these types of
> implementations be more consistent.
>
> I've added some comments to the PR, thanks for bringing it up!
>
> Jon
>
> On 15/10/2019 20:04, Jarek Potiuk wrote:
> > I think it could be solved in a more consistent and future-proof way.
> >
> > There is this new proposal for Base Async operators proposed by Jacob:
> > https://github.com/apache/airflow/pull/6210
> >
> > This is an interesting approach that might solve the problem in a
> slightly
> > different way. Rather than combining operators and sensors, it introduces
> > an Async Operator that has two stages: "initiate operation" and
> sensor-like
> > "wait for the operator to complete". In this model a lot of long-running
> > operators we have now could be rewritten (easily) using the Async
> operator
> > model and you could run them in either sync mode (like there are now) or
> > with the Async mode that would be equivalent to the "Operator" + "Sensor"
> > pattern that is indeed a bit problematic.
> >
> > The EMRAddStep + Sensor could be an Async Operator then.
> >
> > J.
> >
> > On Tue, Oct 15, 2019 at 7:13 PM Jonathan Miles <j...@cybus.co.uk> wrote:
> >
> >> Yes, I often refer to this as the "atomicity problem": we usually want
> >> all four "create", "add steps", "wait for steps", "terminate cluster"
> >> tasks to retry together and in order if any one of them fails (well, the
> >> terminate one is questionable). In our current Dags we resolved this by
> >> putting the tasks in a SubDag and changing the on_retry_callback to
> >> clear the state of the sub-tasks. But use of SubDags makes navigation in
> >> the UI a bit of a pain, so we've planned to merge them into a single
> >> custom operator soon. There's also the problem that for big workflows,
> >> doing this adds a lot of duration due to the management overhead of
> >> starting/stopping EMR clusters instead of reusing them. I'm about to
> >> send a separate e-mail about that.
> >>
> >> I think it'd be great to have a combined operator upstream in the
> codebase!
> >>
> >> Jon
> >>
> >> On 14/10/2019 20:42, Daniel Mateus Pires wrote:
> >>> Hi there!
> >>>
> >>> Would it make sense to add an operator that is both the EmrAddStep
> >> operator
> >>> and the step sensor?
> >>>
> >>> In a past role we were using Airflow heavily for all things EMR, and I
> >>> found myself writing an Operator that combined the emr_add_step
> operator
> >>> and the sensor, it made the pipelines simpler (less operator instances
> >> per
> >>> DAG) and retries were easy
> >>>
> >>> There is still value in keeping those 2 other classes around when we
> >> don't
> >>> care about the result of an EMR step or we are polling for the
> completion
> >>> of an EMR step we did not start from Airflow, but for most tasks
> >> wouldn't a
> >>> "merged operator" make sense?
> >>>
> >>> Thanks!
> >>> Daniel
> >>>
> >
>

Reply via email to