Good seeing that more people have been working around this. I'm curious as to what is the impact on the community (are they writing over and over Operators combining Sensor + Operator to cover async cases? Are they using custom code to have atomicity, like python functions used in the hooks ? )
Also, some interesting points were raised, maybe the usage of sensors should be reviewed and good vs anti-patterns should be highlighted in the docs.. Here's how I tend to reason about sensors / operators: - Sensors -> I see those as triggers (or should only be used as such) - Operators -> do work that is synchronous (blocks the python thread) - Operator + Sensor -> covers asynchronous cases like running on external compute environments, but leads to bad Airflow code (call-backs to retry the pairs, bigger more complex dags) I think for that last case we might be missing a base class as Jonathan pointed out, BaseAsyncOperator might be a good solution! Also, Ash and Jonathan you both have combined operators that are often used together it seems (not necessarily sensor + operator but also operator + operator), just wondering if there would be an easy way to combine Operators without using SubDags ? Like function composition for Operators if that makes sense? I just added some code for reference, this is not something I use in production but it gives an idea of how people might have been patching 2 classes together for async cases (or just cases where 2 operators are just so often used together and need to be retried atomically that it makes sense merging them) Thanks for the discussion! from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor from airflow.utils import apply_defaults class EmrRunSparkJobOperator(EmrAddStepsOperator, EmrStepSensor): template_fields = EmrAddStepsOperator.template_fields + EmrStepSensor. template_fields @apply_defaults def __init__( self, job_flow_id, jar, entry_class, job_args, *args, **kwargs): self.steps = [{ 'Name': kwargs['task_id'], 'ActionOnFailure': 'CONTINUE', 'HadoopJarStep': { 'Jar': 'command-runner.jar', 'Args': [ 'spark-submit', '--class', entry_class, jar] + job_args}}] self.job_flow_id = job_flow_id EmrStepSensor.__init__(self, job_flow_id=job_flow_id, step_id='', * args, **kwargs) def execute(self, context): self.step_id = EmrAddStepsOperator.execute(self, context)[0] EmrStepSensor.execute(self, context) On Wed, 16 Oct 2019 at 06:25, Jonathan Miles <j...@cybus.co.uk> wrote: > That's a great idea, to provide a generic way to do these. I feel like > standalone sensors are a bit abused in the framework, like they're > better suited as triggers when an external source is ready (e.g. new S3 > file appears) than to poll for completion of a previous task (e.g. > EmrStepSensor); exactly because of the lack of atomicity with retries > discussed earlier in the thread. Whereas the BaseAsyncOperator (I'd > prefer BaseAtomicOperator or modify existing BaseSensorOperator) removes > the need for the sensor misuse. Ideally all operators should have the > ability to use pre/action/sense/post hooks. > > It still doesn't solve the problem that we need to write a combined > EmrAddStep + Sensor operator, but helps all of these types of > implementations be more consistent. > > I've added some comments to the PR, thanks for bringing it up! > > Jon > > On 15/10/2019 20:04, Jarek Potiuk wrote: > > I think it could be solved in a more consistent and future-proof way. > > > > There is this new proposal for Base Async operators proposed by Jacob: > > https://github.com/apache/airflow/pull/6210 > > > > This is an interesting approach that might solve the problem in a > slightly > > different way. Rather than combining operators and sensors, it introduces > > an Async Operator that has two stages: "initiate operation" and > sensor-like > > "wait for the operator to complete". In this model a lot of long-running > > operators we have now could be rewritten (easily) using the Async > operator > > model and you could run them in either sync mode (like there are now) or > > with the Async mode that would be equivalent to the "Operator" + "Sensor" > > pattern that is indeed a bit problematic. > > > > The EMRAddStep + Sensor could be an Async Operator then. > > > > J. > > > > On Tue, Oct 15, 2019 at 7:13 PM Jonathan Miles <j...@cybus.co.uk> wrote: > > > >> Yes, I often refer to this as the "atomicity problem": we usually want > >> all four "create", "add steps", "wait for steps", "terminate cluster" > >> tasks to retry together and in order if any one of them fails (well, the > >> terminate one is questionable). In our current Dags we resolved this by > >> putting the tasks in a SubDag and changing the on_retry_callback to > >> clear the state of the sub-tasks. But use of SubDags makes navigation in > >> the UI a bit of a pain, so we've planned to merge them into a single > >> custom operator soon. There's also the problem that for big workflows, > >> doing this adds a lot of duration due to the management overhead of > >> starting/stopping EMR clusters instead of reusing them. I'm about to > >> send a separate e-mail about that. > >> > >> I think it'd be great to have a combined operator upstream in the > codebase! > >> > >> Jon > >> > >> On 14/10/2019 20:42, Daniel Mateus Pires wrote: > >>> Hi there! > >>> > >>> Would it make sense to add an operator that is both the EmrAddStep > >> operator > >>> and the step sensor? > >>> > >>> In a past role we were using Airflow heavily for all things EMR, and I > >>> found myself writing an Operator that combined the emr_add_step > operator > >>> and the sensor, it made the pipelines simpler (less operator instances > >> per > >>> DAG) and retries were easy > >>> > >>> There is still value in keeping those 2 other classes around when we > >> don't > >>> care about the result of an EMR step or we are polling for the > completion > >>> of an EMR step we did not start from Airflow, but for most tasks > >> wouldn't a > >>> "merged operator" make sense? > >>> > >>> Thanks! > >>> Daniel > >>> > > >