shahar1 commented on code in PR #68492:
URL: https://github.com/apache/airflow/pull/68492#discussion_r3408997061
##########
task-sdk/src/airflow/sdk/definitions/dag.py:
##########
@@ -1400,8 +1400,23 @@ def test(
# triggerer may mark tasks scheduled so we read from DB
all_tis = set(dr.get_task_instances(session=session))
scheduled_tis = {x for x in all_tis if x.state ==
TaskInstanceState.SCHEDULED}
- ids_unrunnable = {x for x in all_tis if x.state not in
FINISHED_STATES} - scheduled_tis
- if not scheduled_tis and ids_unrunnable:
+ awaiting_input_tis = {x for x in all_tis if x.state ==
TaskInstanceState.AWAITING_INPUT}
+ ids_unrunnable = (
+ {x for x in all_tis if x.state not in FINISHED_STATES}
+ - scheduled_tis
+ - awaiting_input_tis
+ )
+ if not scheduled_tis and awaiting_input_tis:
+ # Human-in-the-loop tasks stay parked in AWAITING_INPUT:
dag.test() never
+ # resolves them itself. Keep the run alive until a
response recorded from
+ # outside -- the `airflow dags test` console prompt, or
the HITL REST API of
+ # a deployment sharing this metadata DB -- flips them back
to SCHEDULED.
+ log.info(
+ "Waiting for Human-in-the-loop input for tasks: %s",
+ sorted(x.task_id for x in awaiting_input_tis),
+ )
Review Comment:
These days I'm into runtime optimizations (mostly to make the CI run faster,
but also for better UX in core), so I nitpick more than usual regarding
deliberate bottlenecks :)
In this case case it is indeed justified by design as you mentioned, so I
don't have much to add.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]