Hi there,

I have been struggling to get the BranchPythonOperator to work correctly.
The documentation [1] is relatively thin, and a related answer on
StackOverflow [2] did not clarify a lot. So I set up a bunch of demo DAGs
that show the in my opinion faulty behaviour and a workaround with a subdag
that I currently use. I'd be interested in opinions on that workaround some
fix propositions that I have.

The basic set up is one base task ("step1"), then a branching for either of
two tasks ("branch1" and "branch2"), depending on the execution date, and
then a last task ("final_step") to be executed when either of the branches
has succeeded.

The problem is that the the join task "final_step" will never fail for
upstream failures, since one upstream is always skipped.

I have created three demo DAGs [3] that you can throw at a fresh Airflow
1.9 instance that will illustrate the behavior of the different
implementations. All tasks are DummyOperators except for branch2, which is
a PythonOperator set up to always fail for demo purposes.

The DAG runs/intervals and branching condition is setup in a way that the
scheduler will produce two runs for every DAG, each of which will have one
run taking branch1 (which succeeds) and another run taking branch2 (which
fails).

These are the different implementations

   1. Basic setup as described, all operators have the default trigger rule
   'all_success'
   2. Verbatim exactly as 1., except that the joining task "final_step" has
   trigger_rule 'one_success'
   3. Do the branching in a SubDag(Operator), do not join downstream from
   the PythonBranchOperator in the subdag

This is how they behave

   1. Irrespective of which branch is taken, final_step will be skipped,
   because the skip state propagates downstream
   2. This fixes the branch1 (success) case, in the sense that branch2 is
   skipped and final_step is run. But in the branch2 (failure) case, the
   final_step is skipped instead of failing (I would expect upstream_failed).
   Instead, the skip state from branch1 propagates to the final_step.
   3. This is my current workaround: Put the branching into a subdag, where
   no joining task is present after the branching. The semantics of
   "successful DagRun" being "all final tasks are skipped or success", the
   subdagOperator is correctly fails in the branch2 case, hence correctly
   failing the whole DagRun.

Questions

   - What are the exact semantics of the skip state? The documentation is
   quite sparse, only the section on the branch operator [1] mentions "
   skipped states propagates where all directly upstream tasks are skipped".
   This however does not match my above observation where a task has upstream
   skip and success but is skipped nevertheless. Is the skipped state meant to
   trickle down like that?
   - It seems clear to me that the join task after a branch operator needs
   special handling in terms of trigger rules, as there are upstream tasks
   that are not relevant, depending on which branch was taken. Would this need
   a special trigger rule?
   - Or, alternatively, is the 'one_success' trigger rule simply broken?
   The respective trigger rule code [4] simply doesn't handle an upstream
   failure case, should it do that? My naive understanding of the one_success
   trigger rule would be:
      - at least one success upstream -> run,
      - all done and only skips upstream -> skip,
      - all done and no successes and some (upstream_)failures upstream ->
      upstream_failed.
   - (Given the above expectation is shared), I would provide a fix, but it
   seems to me the unit tests do not at all exercise upstream failure cases
   [6] (only flag_upstream_failed=False). Are there other tests I have
   missed?

I realize I have written quite a long post, but I hope this way the problem
statement is clear and maybe this can be transformed into a bug report
easily, provided it is considered such.

Best regards,
Benjamin


[1] https://airflow.incubator.apache.org/concepts.html#branching
[2] https://stackoverflow.com/a/44531885/1382008
[3]
https://gist.github.com/BenjaminDebeerst/fc217aa178aa648db2aa04ab6fdd71ae
[4]
https://github.com/apache/incubator-airflow/blob/master/airflow/ti_deps/deps/trigger_rule_dep.py#L143
[5]
https://github.com/apache/incubator-airflow/blob/master/tests/ti_deps/deps/test_trigger_rule_dep.py

Reply via email to