This is an automated email from the ASF dual-hosted git repository.
kaxil pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b64a0e636d9 Add `awaiting_input` task state for Human-in-the-loop, off
the triggerer (#68028)
b64a0e636d9 is described below
commit b64a0e636d9b693a1a17d6f226e3a78014c13afc
Author: Kaxil Naik <[email protected]>
AuthorDate: Sat Jun 6 05:03:59 2026 +0100
Add `awaiting_input` task state for Human-in-the-loop, off the triggerer
(#68028)
HITL tasks previously deferred onto the triggerer, so a triggerer had to
stay running for the whole (potentially days-long) wait and could never
scale to zero while any task awaited a response. They now park in a new
scheduler-managed `awaiting_input` state that holds neither a worker slot
nor the triggerer, the same way `up_for_reschedule` waits. A human
response (Core API) or the scheduler timeout sweep resumes the task
directly. On Airflow < 3.3 the operator falls back to the trigger path.
---
.../docs/authoring-and-scheduling/deferring.rst | 6 ++
airflow-core/docs/core-concepts/overview.rst | 3 +-
airflow-core/docs/core-concepts/tasks.rst | 1 +
.../docs/img/diagram_task_lifecycle.md5sum | 2 +-
airflow-core/docs/img/diagram_task_lifecycle.png | Bin 601178 -> 671852 bytes
airflow-core/docs/img/diagram_task_lifecycle.py | 23 ++++-
airflow-core/docs/security/audit_logs.rst | 1 +
airflow-core/docs/tutorial/hitl.rst | 11 +++
airflow-core/newsfragments/68028.feature.rst | 1 +
airflow-core/src/airflow/api/common/mark_tasks.py | 1 +
.../src/airflow/api_fastapi/common/parameters.py | 3 +-
.../core_api/datamodels/ui/dashboard.py | 1 +
.../api_fastapi/core_api/openapi/_private_ui.yaml | 5 +
.../core_api/openapi/v2-rest-api-generated.yaml | 1 +
.../api_fastapi/core_api/routes/public/hitl.py | 52 ++++++++++
.../airflow/api_fastapi/core_api/routes/ui/dags.py | 2 +-
.../execution_api/datamodels/taskinstance.py | 30 ++++++
.../execution_api/routes/task_instances.py | 47 ++++++++-
.../api_fastapi/execution_api/versions/__init__.py | 8 +-
.../execution_api/versions/v2026_06_30.py | 18 +++-
.../src/airflow/jobs/scheduler_job_runner.py | 106 ++++++++++++++++++++-
airflow-core/src/airflow/models/dagrun.py | 5 +-
airflow-core/src/airflow/models/hitl.py | 18 ++++
airflow-core/src/airflow/models/taskinstance.py | 12 ++-
.../src/airflow/ti_deps/dependencies_states.py | 7 ++
.../airflow/ui/openapi-gen/requests/schemas.gen.ts | 8 +-
.../airflow/ui/openapi-gen/requests/types.gen.ts | 3 +-
.../airflow/ui/public/i18n/locales/en/common.json | 1 +
.../ui/src/components/NeedsReviewButton.tsx | 2 +-
.../src/airflow/ui/src/components/StateIcon.tsx | 11 ++-
.../src/airflow/ui/src/constants/stateOptions.ts | 1 +
.../airflow/ui/src/hooks/useRequiredActionTabs.ts | 4 +-
.../HistoricalMetrics/TaskInstanceMetrics.tsx | 1 +
.../HITLTaskInstances/HITLResponseForm.test.tsx | 34 +++++++
.../pages/HITLTaskInstances/HITLResponseForm.tsx | 4 +-
.../HITLTaskInstances/HITLTaskInstances.test.tsx | 49 ++++++++++
.../pages/HITLTaskInstances/HITLTaskInstances.tsx | 12 +--
airflow-core/src/airflow/ui/src/theme.ts | 1 +
airflow-core/src/airflow/ui/src/utils/hitl.test.ts | 52 +++++++++-
airflow-core/src/airflow/ui/src/utils/hitl.ts | 12 ++-
airflow-core/src/airflow/ui/src/utils/query.ts | 1 +
.../src/airflow/ui/src/utils/stateUtils.ts | 1 +
.../src/airflow/ui/tests/e2e/utils/test-helpers.ts | 10 +-
airflow-core/src/airflow/utils/state.py | 5 +
.../core_api/routes/public/test_hitl.py | 55 +++++++++++
.../core_api/routes/ui/test_dashboard.py | 3 +
.../tests/unit/dag_processing/test_processor.py | 1 +
airflow-core/tests/unit/jobs/test_scheduler_job.py | 92 ++++++++++++++++++
airflow-core/tests/unit/jobs/test_triggerer_job.py | 1 +
.../src/airflowctl/api/datamodels/generated.py | 1 +
.../task_lifecycle/awaiting_input_state.png | Bin 0 -> 2467 bytes
.../edge3/worker_api/v2-edge-generated.yaml | 1 +
.../airflow/providers/standard/operators/hitl.py | 37 +++++--
.../tests/unit/standard/operators/test_hitl.py | 83 ++++++++++------
task-sdk/src/airflow/sdk/api/client.py | 6 ++
.../src/airflow/sdk/api/datamodels/_generated.py | 17 ++++
task-sdk/src/airflow/sdk/exceptions.py | 42 ++++++++
task-sdk/src/airflow/sdk/execution_time/comms.py | 10 +-
.../airflow/sdk/execution_time/schema/schema.json | 77 ++++++++++++++-
.../src/airflow/sdk/execution_time/supervisor.py | 18 +++-
.../src/airflow/sdk/execution_time/task_runner.py | 29 ++++++
.../task_sdk/execution_time/test_supervisor.py | 9 ++
62 files changed, 975 insertions(+), 83 deletions(-)
diff --git a/airflow-core/docs/authoring-and-scheduling/deferring.rst
b/airflow-core/docs/authoring-and-scheduling/deferring.rst
index 0e54a9ca593..95c7c2b0b47 100644
--- a/airflow-core/docs/authoring-and-scheduling/deferring.rst
+++ b/airflow-core/docs/authoring-and-scheduling/deferring.rst
@@ -54,6 +54,12 @@ If you want to use pre-written deferrable operators that
come with Airflow, such
Airflow automatically handles and implements the deferral processes for you.
+.. note::
+
+ :doc:`Human-in-the-loop <../tutorial/hitl>` operators do **not** use
deferral or the triggerer.
+ They wait in the scheduler-managed ``awaiting_input`` task state, so a
deployment that waits only
+ on human input rather than deferrable operators does not need a running
triggerer.
+
If you're upgrading existing Dags to use deferrable operators, Airflow
contains API-compatible sensor variants. Add these variants into your Dag to
use deferrable operators with no other changes required.
Note that you can't use the deferral ability from inside custom PythonOperator
or TaskFlow Python functions. Deferral is only available to traditional,
class-based operators.
diff --git a/airflow-core/docs/core-concepts/overview.rst
b/airflow-core/docs/core-concepts/overview.rst
index dc51476a263..d8c1d5f959e 100644
--- a/airflow-core/docs/core-concepts/overview.rst
+++ b/airflow-core/docs/core-concepts/overview.rst
@@ -77,7 +77,8 @@ performance in your Airflow:
* Optional *triggerer*, which executes deferred tasks in an asyncio event
loop. In basic installation
where deferred tasks are not used, a triggerer is not necessary. More about
deferring tasks can be
- found in :doc:`/authoring-and-scheduling/deferring`.
+ found in :doc:`/authoring-and-scheduling/deferring`. Note that
:doc:`Human-in-the-loop </tutorial/hitl>`
+ tasks wait in the scheduler-managed ``awaiting_input`` state and do not use
the triggerer.
* Optional folder of *plugins*. Plugins are a way to extend Airflow's
functionality (similar to installed
packages). Plugins are read by the *scheduler*, *Dag processor*, *triggerer*
and *webserver*. More about
diff --git a/airflow-core/docs/core-concepts/tasks.rst
b/airflow-core/docs/core-concepts/tasks.rst
index 24553f8a8ab..95d9846dd5e 100644
--- a/airflow-core/docs/core-concepts/tasks.rst
+++ b/airflow-core/docs/core-concepts/tasks.rst
@@ -83,6 +83,7 @@ The possible states for a Task Instance are:
* ``up_for_retry``: The task failed, but has retry attempts left and will be
rescheduled.
* ``up_for_reschedule``: The task is a :doc:`Sensor <sensors>` that is in
``reschedule`` mode
* ``deferred``: The task has been :doc:`deferred to a trigger
<../authoring-and-scheduling/deferring>`
+* ``awaiting_input``: The task is a :doc:`Human-in-the-loop
<../tutorial/hitl>` task waiting for a human response. It is managed by the
scheduler and uses neither a worker slot nor the triggerer.
* ``removed``: The task has vanished from the Dag since the run started
.. image:: /img/diagram_task_lifecycle.png
diff --git a/airflow-core/docs/img/diagram_task_lifecycle.md5sum
b/airflow-core/docs/img/diagram_task_lifecycle.md5sum
index 78135f22287..6cc716bd17e 100644
--- a/airflow-core/docs/img/diagram_task_lifecycle.md5sum
+++ b/airflow-core/docs/img/diagram_task_lifecycle.md5sum
@@ -1 +1 @@
-f129c61777c2b56652b2c68d36c309ee
+0821383375d44dc24d69c5b68d50f423
diff --git a/airflow-core/docs/img/diagram_task_lifecycle.png
b/airflow-core/docs/img/diagram_task_lifecycle.png
index 6c2cea9f78a..5687cdb4ad9 100644
Binary files a/airflow-core/docs/img/diagram_task_lifecycle.png and
b/airflow-core/docs/img/diagram_task_lifecycle.png differ
diff --git a/airflow-core/docs/img/diagram_task_lifecycle.py
b/airflow-core/docs/img/diagram_task_lifecycle.py
index aa24e2fc3ae..061a3912d03 100644
--- a/airflow-core/docs/img/diagram_task_lifecycle.py
+++ b/airflow-core/docs/img/diagram_task_lifecycle.py
@@ -47,6 +47,7 @@ SHARED_STATE_IMG = (DIAGRAMS_DIR / "task_lifecycle" /
"shared_state.png").as_pos
TERMINAL_STATE_IMG = (DIAGRAMS_DIR / "task_lifecycle" /
"terminal_state.png").as_posix()
SENSOR_STATE_IMG = (DIAGRAMS_DIR / "task_lifecycle" /
"sensor_state.png").as_posix()
DEFERRABLE_STATE_IMG = (DIAGRAMS_DIR / "task_lifecycle" /
"deferrable_state.png").as_posix()
+AWAITING_INPUT_STATE_IMG = (DIAGRAMS_DIR / "task_lifecycle" /
"awaiting_input_state.png").as_posix()
STATE_NODE_ATTRS = {"width": "4.16", "height": "1", "fontname": "Monospace",
"fontsize": "20"}
COMPONENT_NODE_ATTRS = {
@@ -107,6 +108,7 @@ def generate_task_lifecycle_diagram():
state_scheduled = Custom("scheduled", SHARED_STATE_IMG,
**STATE_NODE_ATTRS)
state_queued = Custom("queued", SHARED_STATE_IMG, **STATE_NODE_ATTRS)
state_deferred = Custom("deferred", DEFERRABLE_STATE_IMG,
**STATE_NODE_ATTRS)
+ state_awaiting_input = Custom("awaiting_input",
AWAITING_INPUT_STATE_IMG, **STATE_NODE_ATTRS)
state_running = Custom("running", SHARED_STATE_IMG, **STATE_NODE_ATTRS)
state_up_for_reschedule = Custom("up_for_reschedule",
SENSOR_STATE_IMG, **STATE_NODE_ATTRS)
state_restarting = Custom("restarting", SHARED_STATE_IMG,
**STATE_NODE_ATTRS)
@@ -150,6 +152,11 @@ def generate_task_lifecycle_diagram():
CONDITION_IMG,
**CONDITION_NODE_ATTRS,
)
+ cond_hitl_requested = Custom(
+ "\n\n\n\n\nHuman-in-the-loop task,\nand human input is requested?",
+ CONDITION_IMG,
+ **CONDITION_NODE_ATTRS,
+ )
cond_skip_signal = Custom("\n\n\n\n\nSkip signal is raised?",
CONDITION_IMG, **CONDITION_NODE_ATTRS)
cond_sensor_reschedule = Custom(
"\n\n\n\n\nTask is a sensor in reschedule mode,\nand result is
undetermined?",
@@ -183,7 +190,14 @@ def generate_task_lifecycle_diagram():
cond_trigger_task_2 >> Edge(label="YES") >> cond_task_complete_1
cond_task_complete_1 >> Edge(label="NO") >> state_deferred
cond_task_complete_1 >> Edge(label="YES") >> cond_defer_signal_raised
- component_worker >> state_running >> cond_defer_signal_raised
+ component_worker >> state_running >> cond_hitl_requested
+ cond_hitl_requested >> Edge(label="YES") >> state_awaiting_input
+ cond_hitl_requested >> Edge(label="NO") >> cond_defer_signal_raised
+ (
+ state_awaiting_input
+ >> Edge(label="response received,\nor timeout default applied")
+ >> component_scheduler
+ )
cond_defer_signal_raised >> Edge(label="NO") >> cond_skip_signal
cond_defer_signal_raised >> Edge(label="YES") >> component_triggerer
cond_skip_signal >> Edge(label="NO") >> cond_sensor_reschedule
@@ -218,6 +232,13 @@ def generate_task_lifecycle_diagram():
height="0.77",
**LEGEND_NODE_ATTRS,
)
+ Custom(
+ "\n\nState for Human-in-the-loop Tasks",
+ AWAITING_INPUT_STATE_IMG,
+ width="3.2",
+ height="0.77",
+ **LEGEND_NODE_ATTRS,
+ )
Custom("\n\nTerminal State", TERMINAL_STATE_IMG, width="3.2",
height="0.77", **LEGEND_NODE_ATTRS)
Custom("\n\nShared State", SHARED_STATE_IMG, width="3.2",
height="0.77", **LEGEND_NODE_ATTRS)
Custom("\n\nComponent", COMPONENT_IMG, width="2.53",
height="0.77", **LEGEND_NODE_ATTRS)
diff --git a/airflow-core/docs/security/audit_logs.rst
b/airflow-core/docs/security/audit_logs.rst
index 3b8a09435e1..3c7df803cb0 100644
--- a/airflow-core/docs/security/audit_logs.rst
+++ b/airflow-core/docs/security/audit_logs.rst
@@ -187,6 +187,7 @@ Task Instance Events
- ``queued``: Task instance is queued for execution
- ``scheduled``: Task instance is scheduled
- ``deferred``: Task instance is deferred (waiting for trigger)
+- ``awaiting_input``: Task instance is waiting for human input
(Human-in-the-loop)
- ``restarting``: Task instance is restarting
- ``removed``: Task instance was removed
diff --git a/airflow-core/docs/tutorial/hitl.rst
b/airflow-core/docs/tutorial/hitl.rst
index 3a5e35c6f79..8e89b49d88d 100644
--- a/airflow-core/docs/tutorial/hitl.rst
+++ b/airflow-core/docs/tutorial/hitl.rst
@@ -23,6 +23,17 @@ HITLOperator (Human-in-the-loop)
Human-in-the-Loop (HITL) functionality allows you to incorporate human
decision-making directly into your workflows.
This powerful feature enables workflows to pause and wait for human input,
making it perfect for approval processes, manual quality checks, and scenarios
where human judgment is essential.
+.. versionchanged:: 3.3
+
+ A HITL task waiting for input now uses a dedicated, scheduler-managed
``awaiting_input`` task state
+ instead of deferring onto the triggerer. While waiting, the task holds
neither a worker slot nor the
+ triggerer, so the triggerer can scale to zero even while tasks await a
response; tasks resume on a human
+ response or on the scheduler's response-timeout sweep. On Airflow 3.1 and
3.2, HITL tasks use the older
+ trigger-based deferral.
+
+ A waiting ``awaiting_input`` task does not occupy a pool slot. This differs
from the older deferral
+ path, where a deferred HITL task counted against a pool that had
``include_deferred`` enabled.
+
In this tutorial, we will explore how to use the HITL operators in workflows
and demonstrate how it would look like in Airflow UI.
An HITL Example Dag
diff --git a/airflow-core/newsfragments/68028.feature.rst
b/airflow-core/newsfragments/68028.feature.rst
new file mode 100644
index 00000000000..fa9db5b21d8
--- /dev/null
+++ b/airflow-core/newsfragments/68028.feature.rst
@@ -0,0 +1 @@
+Human-in-the-loop tasks now wait in a new ``awaiting_input`` task state
managed by the scheduler instead of deferring onto the triggerer, so the
triggerer can scale to zero while tasks await a human response. Waiting tasks
resume directly on a response or on the scheduler's response-timeout sweep. On
Airflow versions before 3.3 the operator falls back to the previous
trigger-based path.
diff --git a/airflow-core/src/airflow/api/common/mark_tasks.py
b/airflow-core/src/airflow/api/common/mark_tasks.py
index 915af5952a3..efc2a016a43 100644
--- a/airflow-core/src/airflow/api/common/mark_tasks.py
+++ b/airflow-core/src/airflow/api/common/mark_tasks.py
@@ -301,6 +301,7 @@ def set_dag_run_state_to_failed(
TaskInstanceState.RUNNING,
TaskInstanceState.DEFERRED,
TaskInstanceState.UP_FOR_RESCHEDULE,
+ TaskInstanceState.AWAITING_INPUT,
)
# Mark only RUNNING task instances.
diff --git a/airflow-core/src/airflow/api_fastapi/common/parameters.py
b/airflow-core/src/airflow/api_fastapi/common/parameters.py
index 56b4c20884c..4daf3418e2c 100644
--- a/airflow-core/src/airflow/api_fastapi/common/parameters.py
+++ b/airflow-core/src/airflow/api_fastapi/common/parameters.py
@@ -1220,7 +1220,7 @@ class _PendingActionsFilter(BaseParam[bool]):
.join(TaskInstance, HITLDetail.ti_id == TaskInstance.id)
.where(
HITLDetail.responded_at.is_(None),
- TaskInstance.state == TaskInstanceState.DEFERRED,
+ TaskInstance.state.in_((TaskInstanceState.DEFERRED,
TaskInstanceState.AWAITING_INPUT)),
)
.where(TaskInstance.dag_id == DagModel.dag_id)
.scalar_subquery()
@@ -1600,6 +1600,7 @@ state_priority: list[None | TaskInstanceState] = [
TaskInstanceState.QUEUED,
TaskInstanceState.SCHEDULED,
TaskInstanceState.DEFERRED,
+ TaskInstanceState.AWAITING_INPUT,
TaskInstanceState.RUNNING,
TaskInstanceState.RESTARTING,
None,
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/dashboard.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/dashboard.py
index 4f48ea48351..46fc051daf3 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/dashboard.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/dashboard.py
@@ -44,6 +44,7 @@ class TaskInstanceStateCount(BaseModel):
upstream_failed: int
skipped: int
deferred: int
+ awaiting_input: int
class HistoricalMetricDataResponse(BaseModel):
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
index de3f5869864..8b28661c26a 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
+++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
@@ -3758,6 +3758,7 @@ components:
- upstream_failed
- skipped
- deferred
+ - awaiting_input
title: TaskInstanceState
description: 'All possible states that a Task Instance can be in.
@@ -3804,6 +3805,9 @@ components:
deferred:
type: integer
title: Deferred
+ awaiting_input:
+ type: integer
+ title: Awaiting Input
type: object
required:
- no_status
@@ -3819,6 +3823,7 @@ components:
- upstream_failed
- skipped
- deferred
+ - awaiting_input
title: TaskInstanceStateCount
description: TaskInstance serializer for responses.
TeamCollectionResponse:
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index 6c0ec008322..180d9b98a43 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -15867,6 +15867,7 @@ components:
- upstream_failed
- skipped
- deferred
+ - awaiting_input
title: TaskInstanceState
description: 'All possible states that a Task Instance can be in.
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py
index 45285e108a7..2ad11a245d6 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py
@@ -67,6 +67,9 @@ from airflow.models.dagrun import DagRun
from airflow.models.hitl import HITLDetail as HITLDetailModel, HITLUser
from airflow.models.taskinstance import TaskInstance as TI
from airflow.models.taskinstancehistory import TaskInstanceHistory as TIH
+from airflow.models.trigger import handle_event_submit
+from airflow.triggers.base import TriggerEvent
+from airflow.utils.state import TaskInstanceState
task_instances_hitl_router = AirflowRouter(
tags=["Task Instance"],
@@ -161,6 +164,23 @@ def update_hitl_detail(
map_index=map_index,
)
+ # Acquire row locks in a fixed order -- TaskInstance first, then the HITL
row -- matching the
+ # Execution API park transition, so a human response racing the worker's
park cannot deadlock.
+ # Locking the TI also serializes respond-vs-clear (the clear path locks
the TI, not the HITL row).
+ locked_ti = (
+ session.get(TI, task_instance.id, with_for_update={"of": TI})
+ if isinstance(task_instance, TI)
+ else None
+ )
+ # Lock the hitl_detail row (FOR UPDATE OF hitl_detail). of= scopes the
lock to hitl_detail, which
+ # eager-joins task_instance (lazy="joined"); a bare with_for_update()
would emit FOR UPDATE against
+ # the nullable side of that outer join, which Postgres rejects. The
joinedloaded relationship object
+ # reused below is the same identity-mapped row, now locked for this
transaction.
+ session.execute(
+ select(HITLDetailModel)
+ .where(HITLDetailModel.ti_id == task_instance.id)
+ .with_for_update(of=HITLDetailModel)
+ )
hitl_detail_model = task_instance.hitl_detail
if hitl_detail_model.response_received:
raise HTTPException(
@@ -187,11 +207,43 @@ def update_hitl_detail(
f"User={user_name} (id={user_id}) is not a respondent for the
task.",
)
+ # Write-side validation: reject an invalid response here (400) instead of
accepting it and
+ # failing the task later on resume. Mirrors
HITLOperator.validate_chosen_options + cardinality.
+ allowed_options = set(hitl_detail_model.options or [])
+ invalid_options = set(update_hitl_detail_payload.chosen_options) -
allowed_options
+ if invalid_options:
+ raise HTTPException(
+ status.HTTP_400_BAD_REQUEST,
+ f"Invalid options {sorted(invalid_options)}; allowed options are
{sorted(allowed_options)}.",
+ )
+ if not hitl_detail_model.multiple and
len(update_hitl_detail_payload.chosen_options) > 1:
+ raise HTTPException(
+ status.HTTP_400_BAD_REQUEST,
+ "Multiple options chosen but this Human-in-the-loop task accepts
only a single option.",
+ )
+
hitl_detail_model.responded_by = hitl_user
hitl_detail_model.responded_at = timezone.utcnow()
hitl_detail_model.chosen_options =
update_hitl_detail_payload.chosen_options
hitl_detail_model.params_input = update_hitl_detail_payload.params_input
session.add(hitl_detail_model)
+
+ # Event-driven resume: if the task is parked waiting for this input,
transition it directly,
+ # without a trigger. handle_event_submit packs the response into
next_kwargs["event"], sets
+ # state=SCHEDULED + scheduled_dttm; the scheduler then re-queues
execute_complete. Gated on the
+ # parked states so a finished/cleared TI is never resurrected. `locked_ti`
was locked at the top
+ # (TI-before-HITLDetail order), so a concurrent clear cannot interleave
between this state check
+ # and the resume and have its reset silently overwritten.
+ if locked_ti is not None and locked_ti.state in (
+ TaskInstanceState.AWAITING_INPUT,
+ TaskInstanceState.DEFERRED,
+ ):
+ handle_event_submit(
+ TriggerEvent(hitl_detail_model.as_resume_event_payload()),
+ task_instance=locked_ti,
+ session=session,
+ )
+
session.commit()
return HITLDetailResponse.model_validate(hitl_detail_model)
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py
index dfd3a71d815..b3f1325a34e 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py
@@ -233,7 +233,7 @@ def get_dags(
)
.where(
HITLDetail.responded_at.is_(None),
- TaskInstance.state == TaskInstanceState.DEFERRED,
+ TaskInstance.state.in_((TaskInstanceState.DEFERRED,
TaskInstanceState.AWAITING_INPUT)),
)
.where(TaskInstance.dag_id.in_([dag.dag_id for dag in dags]))
.order_by(TaskInstance.dag_id)
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
index a0d97390801..c7f6b1ee9f8 100644
---
a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
+++
b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
@@ -168,6 +168,33 @@ class TIRescheduleStatePayload(StrictBaseModel):
end_date: UtcDateTime
+class TIAwaitingInputStatePayload(StrictBaseModel):
+ """Schema for parking a TaskInstance in an awaiting_input state
(Human-in-the-loop, no trigger)."""
+
+ state: Annotated[
+ Literal[IntermediateTIState.AWAITING_INPUT],
+ # Specify a default in the schema, but not in code, so Pydantic marks
it as required.
+ WithJsonSchema(
+ {
+ "type": "string",
+ "enum": [IntermediateTIState.AWAITING_INPUT],
+ "default": IntermediateTIState.AWAITING_INPUT,
+ }
+ ),
+ ]
+ timeout: timedelta | None = None
+ """Optional response deadline (relative); converted to an absolute
datetime server-side."""
+ next_method: str
+ """The name of the method on the operator to call in the worker after
input is received."""
+ next_kwargs: Annotated[dict[str, JsonValue], Field(default_factory=dict)]
+ """
+ Kwargs to pass to the above method, either a plain dict or an encrypted
string.
+
+ Both forms will be passed along to the TaskSDK upon resume, the server
will not handle either.
+ """
+ rendered_map_index: str | None = None
+
+
class TIRetryStatePayload(StrictBaseModel):
"""Schema for updating TaskInstance to up_for_retry."""
@@ -216,6 +243,8 @@ def ti_state_discriminator(v: dict[str, str] |
StrictBaseModel) -> str:
return "deferred"
if state == TIState.UP_FOR_RESCHEDULE:
return "up_for_reschedule"
+ if state == TIState.AWAITING_INPUT:
+ return "awaiting_input"
if state == TIState.UP_FOR_RETRY:
return "up_for_retry"
return "_other_"
@@ -229,6 +258,7 @@ TIStateUpdate = Annotated[
| Annotated[TITargetStatePayload, Tag("_other_")]
| Annotated[TIDeferredStatePayload, Tag("deferred")]
| Annotated[TIRescheduleStatePayload, Tag("up_for_reschedule")]
+ | Annotated[TIAwaitingInputStatePayload, Tag("awaiting_input")]
| Annotated[TIRetryStatePayload, Tag("up_for_retry")],
Field(discriminator=ti_state_discriminator),
]
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
index e1062be1314..8a91614e09a 100644
---
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
+++
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
@@ -55,6 +55,7 @@ from
airflow.api_fastapi.execution_api.datamodels.taskinstance import (
PrevSuccessfulDagRunResponse,
TaskBreadcrumbsResponse,
TaskStatesResponse,
+ TIAwaitingInputStatePayload,
TIDeferredStatePayload,
TIEnterRunningPayload,
TIHeartbeatInfo,
@@ -79,14 +80,16 @@ from airflow.exceptions import TaskNotFound
from airflow.models.asset import AssetActive
from airflow.models.dag import DagModel
from airflow.models.dagrun import DagRun as DR
+from airflow.models.hitl import HITLDetail
from airflow.models.log import Log
from airflow.models.taskinstance import TaskInstance as TI,
_stop_remaining_tasks
from airflow.models.taskinstancehistory import TaskInstanceHistory as TIH
from airflow.models.taskreschedule import TaskReschedule
-from airflow.models.trigger import Trigger
+from airflow.models.trigger import Trigger, handle_event_submit
from airflow.models.xcom import XComModel
from airflow.serialization.definitions.assets import SerializedAsset,
SerializedAssetUniqueKey
from airflow.state import get_state_backend
+from airflow.triggers.base import TriggerEvent
from airflow.utils.sqlalchemy import get_dialect_name
from airflow.utils.state import DagRunState, TaskInstanceState, TerminalTIState
@@ -636,6 +639,48 @@ def _create_ti_state_update_query_and_update_state(
trigger_timeout=timeout,
)
updated_state = TaskInstanceState.DEFERRED
+ elif isinstance(ti_patch_payload, TIAwaitingInputStatePayload):
+ # Park the task waiting for human input (Human-in-the-loop). No
trigger / triggerer is
+ # created: the task is resumed by the Core API response handler or the
scheduler timeout
+ # sweep. The optional response deadline is stored on the existing
trigger_timeout column.
+ #
+ # Fixed lock order (TaskInstance -> HITLDetail), matching the Core API
response path, so a
+ # human response racing this park transition cannot deadlock.
+ ti = session.get(TI, task_instance_id, with_for_update={"of": TI})
+ # Lock only the hitl_detail row (of=...): HITLDetail eager-joins
task_instance (lazy="joined"),
+ # and Postgres rejects FOR UPDATE against the nullable side of that
outer join.
+ hitl_detail = session.scalar(
+ select(HITLDetail).where(HITLDetail.ti_id ==
task_instance_id).with_for_update(of=HITLDetail)
+ )
+ if ti is not None and hitl_detail is not None and
hitl_detail.response_received:
+ # The human responded in the window between the operator writing
the HITL request and
+ # the worker parking the task. Resume straight to execute_complete
instead of parking,
+ # which would otherwise strand an already-responded task (no
trigger/sweep would fire).
+ # Carry next_method/next_kwargs onto the TI first so the resume
dispatches correctly;
+ # handle_event_submit then injects the response event into
next_kwargs.
+ ti.next_method = ti_patch_payload.next_method
+ ti.next_kwargs = ti_patch_payload.next_kwargs
+ handle_event_submit(
+ TriggerEvent(hitl_detail.as_resume_event_payload()),
+ task_instance=ti,
+ session=session,
+ )
+ query = update(TI).where(TI.id ==
task_instance_id).values(state=TaskInstanceState.SCHEDULED)
+ updated_state = TaskInstanceState.SCHEDULED
+ else:
+ timeout = None
+ if ti_patch_payload.timeout is not None:
+ timeout = timezone.utcnow() + ti_patch_payload.timeout
+
+ query = update(TI).where(TI.id == task_instance_id)
+ query = query.values(
+ state=TaskInstanceState.AWAITING_INPUT,
+ trigger_id=None,
+ next_method=ti_patch_payload.next_method,
+ next_kwargs=ti_patch_payload.next_kwargs,
+ trigger_timeout=timeout,
+ )
+ updated_state = TaskInstanceState.AWAITING_INPUT
elif isinstance(ti_patch_payload, TIRescheduleStatePayload):
# Quick check for poke_interval isn't immediately over MySQL's
TIMESTAMP limit.
# This check is only rudimentary to catch trivial user errors, e.g.
mistakenly
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
index 50ddb6985e8..9e4d486aa30 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
@@ -47,13 +47,19 @@ from airflow.api_fastapi.execution_api.versions.v2026_06_16
import (
AddTeamNameField,
)
from airflow.api_fastapi.execution_api.versions.v2026_06_30 import (
+ AddAwaitingInputStatePayload,
AddConnectionTestEndpoint,
AddVariableKeysEndpoint,
)
bundle = VersionBundle(
HeadVersion(),
- Version("2026-06-30", AddVariableKeysEndpoint, AddConnectionTestEndpoint),
+ Version(
+ "2026-06-30",
+ AddVariableKeysEndpoint,
+ AddConnectionTestEndpoint,
+ AddAwaitingInputStatePayload,
+ ),
Version(
"2026-06-16",
AddRetryPolicyFields,
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_06_30.py
b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_06_30.py
index cc751bcc797..f9c22f6cd82 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_06_30.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_06_30.py
@@ -17,7 +17,9 @@
from __future__ import annotations
-from cadwyn import VersionChange, endpoint
+from cadwyn import VersionChange, endpoint, schema
+
+from airflow.api_fastapi.execution_api.datamodels.taskinstance import
TIAwaitingInputStatePayload
class AddVariableKeysEndpoint(VersionChange):
@@ -37,3 +39,17 @@ class AddConnectionTestEndpoint(VersionChange):
endpoint("/connection-tests/{connection_test_id}",
["PATCH"]).didnt_exist,
endpoint("/connection-tests/{connection_test_id}/connection",
["GET"]).didnt_exist,
)
+
+
+class AddAwaitingInputStatePayload(VersionChange):
+ """Add the awaiting_input task instance state transition payload
(Human-in-the-loop, no trigger)."""
+
+ description = __doc__
+
+ instructions_to_migrate_to_previous_version = (
+ schema(TIAwaitingInputStatePayload).field("state").didnt_exist,
+ schema(TIAwaitingInputStatePayload).field("timeout").didnt_exist,
+ schema(TIAwaitingInputStatePayload).field("next_method").didnt_exist,
+ schema(TIAwaitingInputStatePayload).field("next_kwargs").didnt_exist,
+
schema(TIAwaitingInputStatePayload).field("rendered_map_index").didnt_exist,
+ )
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index ba39e44030c..a6496665e8b 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -107,7 +107,7 @@ from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import TaskInstance
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.models.team import Team
-from airflow.models.trigger import TRIGGER_FAIL_REPR, Trigger,
TriggerFailureReason
+from airflow.models.trigger import TRIGGER_FAIL_REPR, Trigger,
TriggerFailureReason, handle_event_submit
from airflow.observability.metrics import stats_utils
from airflow.partition_mappers.base import is_rollup
from airflow.serialization.definitions.assets import SerializedAssetUniqueKey
@@ -115,6 +115,7 @@ from airflow.serialization.definitions.notset import NOTSET
from airflow.ti_deps.dependencies_states import ACTIVE_STATES, EXECUTION_STATES
from airflow.timetables.base import compute_rollup_fingerprint
from airflow.timetables.simple import AssetTriggeredTimetable,
PartitionedAssetTimetable
+from airflow.triggers.base import TriggerEvent
from airflow.utils.event_scheduler import EventScheduler
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.retries import MAX_DB_RETRIES, retry_db_transaction,
run_with_db_retries
@@ -249,9 +250,10 @@ class ConcurrencyMap:
# max_active_tis_per_dagrun), including DEFERRED.
self.task_concurrency_map[(dag_id, task_id)] += count
self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] +=
count
- # Only count non-deferred states towards DAG-run active tasks
- # (max_active_tasks / worker slot accounting).
- if state != TaskInstanceState.DEFERRED:
+ # Only count states that hold a worker slot towards DAG-run active
tasks
+ # (max_active_tasks / worker slot accounting). DEFERRED and
AWAITING_INPUT
+ # are in-flight but parked, holding no worker slot.
+ if state not in (TaskInstanceState.DEFERRED,
TaskInstanceState.AWAITING_INPUT):
self.dag_run_active_tasks_map[dag_id, run_id] += count
@@ -1644,6 +1646,11 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
self.check_trigger_timeouts,
)
+ timers.call_regular_interval(
+ conf.getfloat("scheduler", "trigger_timeout_check_interval",
fallback=15.0),
+ self.check_awaiting_input_timeouts,
+ )
+
timers.call_regular_interval(
30,
self._mark_backfills_complete,
@@ -2956,7 +2963,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
@provide_session
def _emit_ti_metrics(self, *, session: Session = NEW_SESSION) -> None:
- metric_states = {State.SCHEDULED, State.QUEUED, State.RUNNING,
State.DEFERRED}
+ metric_states = {State.SCHEDULED, State.QUEUED, State.RUNNING,
State.DEFERRED, State.AWAITING_INPUT}
stmt = (
select(
TaskInstance.state,
@@ -3174,6 +3181,95 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
if num_timed_out_tasks:
self.log.info("Timed out %i deferred tasks without fired
triggers", num_timed_out_tasks)
+ @provide_session
+ def check_awaiting_input_timeouts(
+ self, max_retries: int = MAX_DB_RETRIES, *, session: Session =
NEW_SESSION
+ ) -> None:
+ """
+ Resolve Human-in-the-loop tasks parked in AWAITING_INPUT whose
response deadline has passed.
+
+ This is the scheduler-side liveness guarantee for HITL and runs
independently of the
+ triggerer. For each timed-out task instance: if a response arrived
just before the deadline,
+ resume with it; otherwise, if the request defines defaults, write the
defaults as the
+ response and resume to success; otherwise fail the task (mirroring
``check_trigger_timeouts``).
+ """
+ for attempt in run_with_db_retries(max_retries, logger=self.log):
+ with attempt:
+ now = timezone.utcnow()
+ query = (
+ select(TI)
+ .where(
+ TI.state == TaskInstanceState.AWAITING_INPUT,
+ TI.trigger_timeout < now,
+ )
+ .options(joinedload(TI.hitl_detail))
+ # Bound the batch so a single scheduler tick cannot
lock/process an unbounded
+ # backlog of timed-out tasks (which would block concurrent
responses/clears);
+ # any remaining rows are handled on subsequent ticks.
+ .limit(100)
+ )
+ # Lock only the TI rows (of=TI) so HA schedulers don't
double-resolve, and so the
+ # FOR UPDATE is not applied to the nullable side of the
hitl_detail outer join.
+ query = with_row_locks(query, of=TI, session=session,
skip_locked=True)
+ timed_out_tis = session.scalars(query).all()
+ if not timed_out_tis:
+ return
+
+ num_resolved = 0
+ num_failed = 0
+ for ti in timed_out_tis:
+ hitl_detail = ti.hitl_detail
+ if hitl_detail is not None and hitl_detail.responded_at is
not None:
+ # A response landed just before the deadline; resume
with it.
+ handle_event_submit(
+
TriggerEvent(hitl_detail.as_resume_event_payload(timedout=False)),
+ task_instance=ti,
+ session=session,
+ )
+ num_resolved += 1
+ elif hitl_detail is not None and hitl_detail.defaults is
not None:
+ # Apply the configured defaults as the response, then
resume to success.
+ hitl_detail.chosen_options = list(hitl_detail.defaults)
+ hitl_detail.params_input = {
+ key: value["value"] if isinstance(value, dict) and
"value" in value else value
+ for key, value in (hitl_detail.params or
{}).items()
+ }
+ hitl_detail.responded_by = None
+ hitl_detail.responded_at = now
+ session.add(hitl_detail)
+ handle_event_submit(
+
TriggerEvent(hitl_detail.as_resume_event_payload(timedout=True)),
+ task_instance=ti,
+ session=session,
+ )
+ num_resolved += 1
+ else:
+ # No defaults and no response: resume into
execute_complete with a timeout
+ # failure event so the operator raises
HITLTimeoutError (matching the old
+ # trigger path), rather than a generic
deferral-timeout failure.
+ handle_event_submit(
+ TriggerEvent(
+ {
+ "error": "The Human-in-the-loop response
timeout has passed "
+ "without a response.",
+ "error_type": "timeout",
+ }
+ ),
+ task_instance=ti,
+ session=session,
+ )
+ num_failed += 1
+
+ # Flush within the retry block so both branches persist
consistently (the defaults
+ # branch already flushes via handle_event_submit; the fail
branch relies on this).
+ session.flush()
+ if num_resolved or num_failed:
+ self.log.info(
+ "AWAITING_INPUT timeout sweep: %i resolved
(response/defaults), %i failed",
+ num_resolved,
+ num_failed,
+ )
+
# [START find_and_purge_task_instances_without_heartbeats]
def _find_and_purge_task_instances_without_heartbeats(self) -> None:
"""
diff --git a/airflow-core/src/airflow/models/dagrun.py
b/airflow-core/src/airflow/models/dagrun.py
index 564e11a9522..1ed4dd198a4 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -1137,7 +1137,10 @@ class DagRun(Base, LoggingMixin):
and all(
getattr(t.task, "max_active_tis_per_dagrun", None) is
None for t in self.tis if t.task
)
- and all(t.state != TaskInstanceState.DEFERRED for t in
self.tis)
+ and all(
+ t.state not in (TaskInstanceState.DEFERRED,
TaskInstanceState.AWAITING_INPUT)
+ for t in self.tis
+ )
)
def recalculate(self) -> _UnfinishedStates:
diff --git a/airflow-core/src/airflow/models/hitl.py
b/airflow-core/src/airflow/models/hitl.py
index 38e2d2f112f..dda93d9c86f 100644
--- a/airflow-core/src/airflow/models/hitl.py
+++ b/airflow-core/src/airflow/models/hitl.py
@@ -177,3 +177,21 @@ class HITLDetail(Base, HITLDetailPropertyMixin):
onupdate="CASCADE",
),
)
+
+ def as_resume_event_payload(self, *, timedout: bool = False) -> dict[str,
Any]:
+ """
+ Build the event payload that ``HITLOperator.execute_complete``
consumes on resume.
+
+ Single source of truth mapping the response columns to the event dict
the operator reads,
+ reproducing the provider's ``HITLTriggerEventSuccessPayload`` contract
so a human response
+ (via the Core API) or the scheduler timeout sweep can resume an
``awaiting_input`` task
+ directly, without a trigger. ``responded_at`` stays a ``datetime``
(``execute_complete``
+ calls ``.isoformat()`` on it); ``responded_by_user`` is ``None`` for
the timeout default.
+ """
+ return {
+ "chosen_options": list(self.chosen_options or []),
+ "params_input": self.params_input or {},
+ "responded_at": self.responded_at,
+ "responded_by_user": self.responded_by_user,
+ "timedout": timedout,
+ }
diff --git a/airflow-core/src/airflow/models/taskinstance.py
b/airflow-core/src/airflow/models/taskinstance.py
index 0d0183665fa..92300f4ac31 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -2040,15 +2040,19 @@ class TaskInstance(Base, LoggingMixin, BaseWorkload):
def get_num_active_task_instances(self, *, same_dagrun: bool = False,
session: Session) -> int:
"""
- Count active (running or deferred) TIs for this task from the DB.
+ Count active (running, deferred, or awaiting-input) TIs for this task
from the DB.
- Deferred TIs are included because they are still logically in-flight
- and must count against max_active_tis_per_dag /
max_active_tis_per_dagrun.
+ Deferred and awaiting-input TIs are included because they are still
logically
+ in-flight and must count against max_active_tis_per_dag /
max_active_tis_per_dagrun.
:meta private:
"""
return self._get_num_task_instances_of_state(
- [TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED],
+ [
+ TaskInstanceState.RUNNING,
+ TaskInstanceState.DEFERRED,
+ TaskInstanceState.AWAITING_INPUT,
+ ],
same_dagrun=same_dagrun,
session=session,
)
diff --git a/airflow-core/src/airflow/ti_deps/dependencies_states.py
b/airflow-core/src/airflow/ti_deps/dependencies_states.py
index 7d5bd5091e0..21fd8ff31da 100644
--- a/airflow-core/src/airflow/ti_deps/dependencies_states.py
+++ b/airflow-core/src/airflow/ti_deps/dependencies_states.py
@@ -31,6 +31,13 @@ EXECUTION_STATES = {
ACTIVE_STATES = {
*EXECUTION_STATES,
TaskInstanceState.DEFERRED,
+ # AWAITING_INPUT (HITL) is logically in-flight like DEFERRED: it counts
towards task-level
+ # concurrency (max_active_tis_per_dag / max_active_tis_per_dagrun) and,
like DEFERRED, is
+ # excluded from DAG-run active-task accounting. It is also excluded from
pool slot accounting,
+ # but -- unlike DEFERRED, which a pool can opt to count via
``include_deferred`` -- an
+ # awaiting_input task is *never* counted against pool slots: an open-ended
human wait should
+ # not reserve one.
+ TaskInstanceState.AWAITING_INPUT,
}
# In order to be able to get queued a task must have one of these states
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 23170a462ca..fbebbf10c8e 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -6612,7 +6612,7 @@ export const $TaskInstanceResponse = {
export const $TaskInstanceState = {
type: 'string',
- enum: ['removed', 'scheduled', 'queued', 'running', 'success',
'restarting', 'failed', 'up_for_retry', 'up_for_reschedule', 'upstream_failed',
'skipped', 'deferred'],
+ enum: ['removed', 'scheduled', 'queued', 'running', 'success',
'restarting', 'failed', 'up_for_retry', 'up_for_reschedule', 'upstream_failed',
'skipped', 'deferred', 'awaiting_input'],
title: 'TaskInstanceState',
description: `All possible states that a Task Instance can be in.
@@ -10177,10 +10177,14 @@ export const $TaskInstanceStateCount = {
deferred: {
type: 'integer',
title: 'Deferred'
+ },
+ awaiting_input: {
+ type: 'integer',
+ title: 'Awaiting Input'
}
},
type: 'object',
- required: ['no_status', 'removed', 'scheduled', 'queued', 'running',
'success', 'restarting', 'failed', 'up_for_retry', 'up_for_reschedule',
'upstream_failed', 'skipped', 'deferred'],
+ required: ['no_status', 'removed', 'scheduled', 'queued', 'running',
'success', 'restarting', 'failed', 'up_for_retry', 'up_for_reschedule',
'upstream_failed', 'skipped', 'deferred', 'awaiting_input'],
title: 'TaskInstanceStateCount',
description: 'TaskInstance serializer for responses.'
} as const;
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 15d50056069..f242a2235dd 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -1709,7 +1709,7 @@ export type TaskInstanceResponse = {
*
* Note that None is also allowed, so always use this in a type hint with
Optional.
*/
-export type TaskInstanceState = 'removed' | 'scheduled' | 'queued' | 'running'
| 'success' | 'restarting' | 'failed' | 'up_for_retry' | 'up_for_reschedule' |
'upstream_failed' | 'skipped' | 'deferred';
+export type TaskInstanceState = 'removed' | 'scheduled' | 'queued' | 'running'
| 'success' | 'restarting' | 'failed' | 'up_for_retry' | 'up_for_reschedule' |
'upstream_failed' | 'skipped' | 'deferred' | 'awaiting_input';
/**
* Task Instance body for get batch.
@@ -2564,6 +2564,7 @@ export type TaskInstanceStateCount = {
upstream_failed: number;
skipped: number;
deferred: number;
+ awaiting_input: number;
};
/**
diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json
b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json
index 3083bc27276..95a86d0fab1 100644
--- a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json
+++ b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json
@@ -229,6 +229,7 @@
"startDate": "Start Date",
"state": "State",
"states": {
+ "awaiting_input": "Awaiting Input",
"deferred": "Deferred",
"failed": "Failed",
"no_status": "No Status",
diff --git a/airflow-core/src/airflow/ui/src/components/NeedsReviewButton.tsx
b/airflow-core/src/airflow/ui/src/components/NeedsReviewButton.tsx
index c0ec70f44dd..8e48c0c6fbe 100644
--- a/airflow-core/src/airflow/ui/src/components/NeedsReviewButton.tsx
+++ b/airflow-core/src/airflow/ui/src/components/NeedsReviewButton.tsx
@@ -41,7 +41,7 @@ export const NeedsReviewButton = ({
dagId: dagId ?? "~",
dagRunId: runId ?? "~",
responseReceived: false,
- state: ["deferred"],
+ state: ["deferred", "awaiting_input"],
taskId,
},
undefined,
diff --git a/airflow-core/src/airflow/ui/src/components/StateIcon.tsx
b/airflow-core/src/airflow/ui/src/components/StateIcon.tsx
index 3fad9f62edb..abfef20f216 100644
--- a/airflow-core/src/airflow/ui/src/components/StateIcon.tsx
+++ b/airflow-core/src/airflow/ui/src/components/StateIcon.tsx
@@ -18,7 +18,14 @@
*/
import type { IconBaseProps } from "react-icons";
import { FiActivity, FiCalendar, FiRepeat, FiSkipForward, FiSlash, FiWatch,
FiX } from "react-icons/fi";
-import { LuCalendarSync, LuCheck, LuCircleDashed, LuCircleFadingArrowUp,
LuRedo2 } from "react-icons/lu";
+import {
+ LuCalendarSync,
+ LuCheck,
+ LuCircleDashed,
+ LuCircleFadingArrowUp,
+ LuRedo2,
+ LuUserRoundPen,
+} from "react-icons/lu";
import { PiQueue } from "react-icons/pi";
import type { TaskInstanceState } from "openapi/requests/types.gen";
@@ -31,6 +38,8 @@ export const StateIcon = ({ state, ...rest }: Props) => {
// false positive eslint - we have a default.
// eslint-disable-next-line @typescript-eslint/switch-exhaustiveness-check
switch (state) {
+ case "awaiting_input":
+ return <LuUserRoundPen {...rest} />;
case "deferred":
return <FiWatch {...rest} />;
case "failed":
diff --git a/airflow-core/src/airflow/ui/src/constants/stateOptions.ts
b/airflow-core/src/airflow/ui/src/constants/stateOptions.ts
index b9df60a5208..f9237e1f221 100644
--- a/airflow-core/src/airflow/ui/src/constants/stateOptions.ts
+++ b/airflow-core/src/airflow/ui/src/constants/stateOptions.ts
@@ -37,6 +37,7 @@ export const taskInstanceStateOptions = createListCollection<{
{ label: "common:states.upstream_failed", value: "upstream_failed" },
{ label: "common:states.skipped", value: "skipped" },
{ label: "common:states.deferred", value: "deferred" },
+ { label: "common:states.awaiting_input", value: "awaiting_input" },
{ label: "common:states.removed", value: "removed" },
{ label: "common:states.none", value: "none" },
],
diff --git a/airflow-core/src/airflow/ui/src/hooks/useRequiredActionTabs.ts
b/airflow-core/src/airflow/ui/src/hooks/useRequiredActionTabs.ts
index 39ddbb8f335..0560327fac3 100644
--- a/airflow-core/src/airflow/ui/src/hooks/useRequiredActionTabs.ts
+++ b/airflow-core/src/airflow/ui/src/hooks/useRequiredActionTabs.ts
@@ -83,7 +83,9 @@ export const useRequiredActionTabs = (
const hasHitlData = (hitlData?.total_entries ?? 0) > 0;
const pendingActionsCount =
hitlData?.hitl_details.filter(
- (hitl) => hitl.task_instance.state === "deferred" &&
!hitl.response_received,
+ (hitl) =>
+ (hitl.task_instance.state === "deferred" || hitl.task_instance.state
=== "awaiting_input") &&
+ !hitl.response_received,
).length ?? 0;
const processedTabs = tabs
diff --git
a/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/TaskInstanceMetrics.tsx
b/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/TaskInstanceMetrics.tsx
index f36c0a4635b..617d3e62b04 100644
---
a/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/TaskInstanceMetrics.tsx
+++
b/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/TaskInstanceMetrics.tsx
@@ -43,6 +43,7 @@ const TASK_STATES: Array<keyof TaskInstanceStateCount> = [
"up_for_reschedule",
"upstream_failed",
"deferred",
+ "awaiting_input",
"no_status",
];
diff --git
a/airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLResponseForm.test.tsx
b/airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLResponseForm.test.tsx
index 09d9f0efc84..4627a326703 100644
---
a/airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLResponseForm.test.tsx
+++
b/airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLResponseForm.test.tsx
@@ -140,3 +140,37 @@ describe("HITLResponseForm – option button rendering
boundary", () => {
expect(screen.getByTestId("hitl-option-Reject")).toBeInTheDocument();
});
});
+
+// ---------------------------------------------------------------------------
+// Tests: pending-state enablement
+//
+// HITL tasks park in "deferred" (pre-3.3) or "awaiting_input" (from 3.3). Both
+// are "pending a response", so the option buttons must be enabled in either
+// state. Previously the form only treated "deferred" as pending, which
disabled
+// the response form for awaiting_input tasks.
+// ---------------------------------------------------------------------------
+describe("HITLResponseForm – pending-state enablement", () => {
+ it("enables option buttons for a deferred task", () => {
+ renderForm(["Yes", "No"], {
+ task_instance: { ...MOCK_TASK_INSTANCE, state: "deferred" },
+ });
+ expect(screen.getByTestId("hitl-option-Yes")).toBeEnabled();
+ expect(screen.getByTestId("hitl-option-No")).toBeEnabled();
+ });
+
+ it("enables option buttons for an awaiting_input task", () => {
+ renderForm(["Yes", "No"], {
+ task_instance: { ...MOCK_TASK_INSTANCE, state: "awaiting_input" },
+ });
+ expect(screen.getByTestId("hitl-option-Yes")).toBeEnabled();
+ expect(screen.getByTestId("hitl-option-No")).toBeEnabled();
+ });
+
+ it("disables option buttons for a finished task", () => {
+ renderForm(["Yes", "No"], {
+ task_instance: { ...MOCK_TASK_INSTANCE, state: "success" },
+ });
+ expect(screen.getByTestId("hitl-option-Yes")).toBeDisabled();
+ expect(screen.getByTestId("hitl-option-No")).toBeDisabled();
+ });
+});
diff --git
a/airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLResponseForm.tsx
b/airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLResponseForm.tsx
index 178962a4423..87bc57c1d42 100644
---
a/airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLResponseForm.tsx
+++
b/airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLResponseForm.tsx
@@ -28,7 +28,7 @@ import Time from "src/components/Time";
import { useParamStore } from "src/queries/useParamStore";
import { useUpdateHITLDetail } from "src/queries/useUpdateHITLDetail";
import { DEFAULT_DATETIME_FORMAT } from "src/utils/datetimeUtils";
-import { getHITLParamsDict, getHITLFormData, getPreloadHITLFormData } from
"src/utils/hitl";
+import { getHITLParamsDict, getHITLFormData, getPreloadHITLFormData,
isHITLPending } from "src/utils/hitl";
type HITLResponseFormProps = {
readonly hitlDetail: {
@@ -70,7 +70,7 @@ export const HITLResponseForm = ({ hitlDetail }:
HITLResponseFormProps) => {
const shouldRenderOptionButton =
hitlDetail.options.length <= 4 && !hitlDetail.multiple &&
preloadedHITLOptions.length === 0;
- const isPending = hitlDetail.task_instance.state === "deferred";
+ const isPending = isHITLPending(hitlDetail.task_instance.state);
const { updateHITLResponse } = useUpdateHITLDetail({
dagId: hitlDetail.task_instance.dag_id,
diff --git
a/airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLTaskInstances.test.tsx
b/airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLTaskInstances.test.tsx
index c149eac187f..534a7aa216c 100644
---
a/airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLTaskInstances.test.tsx
+++
b/airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLTaskInstances.test.tsx
@@ -148,6 +148,33 @@ describe("HITLTaskInstances – mapIndex URL param handling
(#66428)", () => {
});
});
+// ---------------------------------------------------------------------------
+// State filter: pending required-action rows park in either "deferred"
(pre-3.3)
+// or "awaiting_input" (from 3.3). Filtering on unreceived responses must
request
+// both states, otherwise awaiting_input rows are hidden from the listing.
+// ---------------------------------------------------------------------------
+describe("HITLTaskInstances – pending state filter", () => {
+ it("requests both deferred and awaiting_input when filtering for unreceived
responses", () => {
+ mockSearchParams = new URLSearchParams("response_received=false");
+
+ render(<HITLTaskInstances />, { wrapper: Wrapper });
+
+ const args = lastListingCall()?.[0] as { state?: Array<string> } |
undefined;
+
+ expect(args?.state).toEqual(["deferred", "awaiting_input"]);
+ });
+
+ it("does not send a state filter when not filtering for unreceived
responses", () => {
+ mockSearchParams = new URLSearchParams();
+
+ render(<HITLTaskInstances />, { wrapper: Wrapper });
+
+ const args = lastListingCall()?.[0] as { state?: Array<string> } |
undefined;
+
+ expect(args?.state).toBeUndefined();
+ });
+});
+
// ---------------------------------------------------------------------------
// Refetch predicate — the API serializes `responded_at` as JSON `null`, not an
// omitted field, so `=== undefined` never matched and the page never polled
@@ -198,6 +225,28 @@ describe("HITLTaskInstances – auto-refresh predicate", ()
=> {
expect(result).toBe(5000);
});
+ it("triggers refetch when an awaiting_input row has responded_at: null", ()
=> {
+ mockSearchParams = new URLSearchParams();
+
+ render(<HITLTaskInstances />, { wrapper: Wrapper });
+
+ const refetchInterval = getRefetchInterval();
+ const result = refetchInterval({
+ state: {
+ data: {
+ hitl_details: [
+ {
+ responded_at: null,
+ task_instance: { state: "awaiting_input" },
+ },
+ ],
+ },
+ },
+ });
+
+ expect(result).toBe(5000);
+ });
+
it("does not refetch when every row already has a responded_at value", () =>
{
mockSearchParams = new URLSearchParams();
diff --git
a/airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLTaskInstances.tsx
b/airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLTaskInstances.tsx
index e9e0e86e4d2..4ad3c945cf4 100644
---
a/airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLTaskInstances.tsx
+++
b/airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLTaskInstances.tsx
@@ -34,7 +34,7 @@ import { RouterLink } from "src/components/ui";
import { SearchParamsKeys, type SearchParamsKeysType } from
"src/constants/searchParams";
import { useAdvancedSearchArg } from "src/hooks/useAdvancedSearch";
import { useAutoRefresh } from "src/utils";
-import { getHITLState } from "src/utils/hitl";
+import { getHITLState, isHITLPending } from "src/utils/hitl";
import { getTaskInstanceLink } from "src/utils/links";
import { HITLFilters } from "./HITLFilters";
@@ -216,7 +216,7 @@ export const HITLTaskInstances = () => {
Boolean(effectiveResponseReceived) && effectiveResponseReceived !==
"all"
? effectiveResponseReceived === "true"
: undefined,
- state: effectiveResponseReceived === "false" ? ["deferred"] : undefined,
+ state: effectiveResponseReceived === "false" ? ["deferred",
"awaiting_input"] : undefined,
subjectSearch,
taskId,
...taskIdArg,
@@ -224,15 +224,15 @@ export const HITLTaskInstances = () => {
undefined,
{
// Only continue auto-refetching when filtering for unreceived responses
- // and at least one TaskInstance is still deferred without a response.
+ // and at least one TaskInstance is still pending (parked) without a
response.
refetchInterval: (query) => {
- const hasDeferredWithoutResponse = Boolean(
+ const hasPendingWithoutResponse = Boolean(
query.state.data?.hitl_details.some(
- (detail: HITLDetail) => detail.responded_at === null &&
detail.task_instance.state === "deferred",
+ (detail: HITLDetail) => detail.responded_at === null &&
isHITLPending(detail.task_instance.state),
),
);
- return hasDeferredWithoutResponse ? baseRefetchInterval : false;
+ return hasPendingWithoutResponse ? baseRefetchInterval : false;
},
},
);
diff --git a/airflow-core/src/airflow/ui/src/theme.ts
b/airflow-core/src/airflow/ui/src/theme.ts
index 5dea56794a7..8fc32ed4516 100644
--- a/airflow-core/src/airflow/ui/src/theme.ts
+++ b/airflow-core/src/airflow/ui/src/theme.ts
@@ -411,6 +411,7 @@ const defaultAirflowTheme: ThemingConfig = {
running: generateSemanticTokens("cyan"),
restarting: generateSemanticTokens("violet"),
deferred: generateSemanticTokens("purple"),
+ awaiting_input: generateSemanticTokens("orange"),
scheduled: generateSemanticTokens("zinc"),
none: generateSemanticTokens("gray"),
no_status: generateSemanticTokens("gray"),
diff --git a/airflow-core/src/airflow/ui/src/utils/hitl.test.ts
b/airflow-core/src/airflow/ui/src/utils/hitl.test.ts
index daead151359..90dd1c7fdaf 100644
--- a/airflow-core/src/airflow/ui/src/utils/hitl.test.ts
+++ b/airflow-core/src/airflow/ui/src/utils/hitl.test.ts
@@ -21,7 +21,7 @@ import { describe, it, expect, vi } from "vitest";
import type { HITLDetail } from "openapi/requests/types.gen";
-import { getHITLParamsDict } from "./hitl";
+import { getHITLParamsDict, getHITLState, isHITLPending } from "./hitl";
const mockTranslate = vi.fn((key: string) => key) as unknown as TFunction;
@@ -48,6 +48,56 @@ const createMockHITLDetail = (overrides?:
Partial<HITLDetail>): HITLDetail =>
...overrides,
}) as HITLDetail;
+describe("isHITLPending", () => {
+ it("treats a deferred task as pending", () => {
+ expect(isHITLPending("deferred")).toBe(true);
+ });
+
+ it("treats an awaiting_input task as pending", () => {
+ expect(isHITLPending("awaiting_input")).toBe(true);
+ });
+
+ it("treats a finished/cleared task as not pending", () => {
+ expect(isHITLPending("success")).toBe(false);
+ expect(isHITLPending(undefined)).toBe(false);
+ expect(isHITLPending(null)).toBe(false);
+ });
+});
+
+describe("getHITLState", () => {
+ it("reports a 'required' state for an awaiting_input task without a
response", () => {
+ const hitlDetail = createMockHITLDetail({
+ response_received: false,
+ task_instance: {
+ dag_id: "test_dag",
+ dag_run_id: "test_run",
+ map_index: -1,
+ state: "awaiting_input",
+ task_id: "test_task",
+ },
+ } as Partial<HITLDetail>);
+
+ // Empty params + non-approval options -> choice task; the point is that a
parked
+ // awaiting_input task is "required", not "noResponseReceived".
+ expect(getHITLState(mockTranslate,
hitlDetail)).toBe("state.choiceRequired");
+ });
+
+ it("reports no response received for a finished task without a response", ()
=> {
+ const hitlDetail = createMockHITLDetail({
+ response_received: false,
+ task_instance: {
+ dag_id: "test_dag",
+ dag_run_id: "test_run",
+ map_index: -1,
+ state: "success",
+ task_id: "test_task",
+ },
+ } as Partial<HITLDetail>);
+
+ expect(getHITLState(mockTranslate,
hitlDetail)).toBe("state.noResponseReceived");
+ });
+});
+
describe("getHITLParamsDict", () => {
it("correctly types object parameters as 'object' instead of 'string'", ()
=> {
const hitlDetail = createMockHITLDetail({
diff --git a/airflow-core/src/airflow/ui/src/utils/hitl.ts
b/airflow-core/src/airflow/ui/src/utils/hitl.ts
index 6ea2c88eca5..ab1ab135526 100644
--- a/airflow-core/src/airflow/ui/src/utils/hitl.ts
+++ b/airflow-core/src/airflow/ui/src/utils/hitl.ts
@@ -18,7 +18,7 @@
*/
import type { TFunction } from "i18next";
-import type { HITLDetail, HITLDetailHistory } from
"openapi/requests/types.gen";
+import type { HITLDetail, HITLDetailHistory, TaskInstanceState } from
"openapi/requests/types.gen";
import type { ParamSchema, ParamsSpec } from "src/queries/useDagParams";
export type HITLResponseParams = {
@@ -26,6 +26,12 @@ export type HITLResponseParams = {
params_input?: Record<string, unknown>;
};
+// A HITL task is "pending a response" while parked: pre-3.3 it parks in
"deferred", from 3.3 it
+// parks in "awaiting_input". Either way an unanswered parked task is
"response required", not
+// "no response received" (which is reserved for tasks no longer parked, e.g.
cleared/finished).
+export const isHITLPending = (state?: TaskInstanceState | null): boolean =>
+ state === "deferred" || state === "awaiting_input";
+
const getChosenOptionsValue = (hitlDetail: HITLDetailHistory) => {
// if response_received is true, display the chosen_options, otherwise
display the defaults
const sourceValues = hitlDetail.response_received ?
hitlDetail.chosen_options : hitlDetail.defaults;
@@ -211,11 +217,9 @@ export const getHITLState = (translate: TFunction,
hitlDetail: HITLDetail) => {
task_instance: { state: taskInstanceState },
} = hitlDetail;
- const isNotDeferred = taskInstanceState !== "deferred";
-
let stateType: [string, string] = ["responseRequired", "responseReceived"];
- if (!responseReceived && isNotDeferred) {
+ if (!responseReceived && !isHITLPending(taskInstanceState)) {
return translate("state.noResponseReceived");
}
diff --git a/airflow-core/src/airflow/ui/src/utils/query.ts
b/airflow-core/src/airflow/ui/src/utils/query.ts
index 10c031f47e3..ebdfdf16f63 100644
--- a/airflow-core/src/airflow/ui/src/utils/query.ts
+++ b/airflow-core/src/airflow/ui/src/utils/query.ts
@@ -21,6 +21,7 @@ import type { TaskInstanceState } from
"openapi/requests/types.gen";
import { useConfig } from "src/queries/useConfig";
export const isStatePending = (state?: TaskInstanceState | null) =>
+ state === "awaiting_input" ||
state === "deferred" ||
state === "scheduled" ||
state === "running" ||
diff --git a/airflow-core/src/airflow/ui/src/utils/stateUtils.ts
b/airflow-core/src/airflow/ui/src/utils/stateUtils.ts
index ea4de7a7b40..3d09c537643 100644
--- a/airflow-core/src/airflow/ui/src/utils/stateUtils.ts
+++ b/airflow-core/src/airflow/ui/src/utils/stateUtils.ts
@@ -33,6 +33,7 @@ export const STATE_PRIORITY: Array<string> = [
"running",
"restarting",
"deferred",
+ "awaiting_input",
"queued",
"scheduled",
"success",
diff --git a/airflow-core/src/airflow/ui/tests/e2e/utils/test-helpers.ts
b/airflow-core/src/airflow/ui/tests/e2e/utils/test-helpers.ts
index 178a2f3caa0..b0b159fce44 100644
--- a/airflow-core/src/airflow/ui/tests/e2e/utils/test-helpers.ts
+++ b/airflow-core/src/airflow/ui/tests/e2e/utils/test-helpers.ts
@@ -371,7 +371,7 @@ export async function setupHITLFlowViaAPI(
await waitForTaskInstanceState(request, {
dagId,
- expectedState: "deferred",
+ expectedState: "awaiting_input",
runId: dagRunId,
taskId: "wait_for_input",
});
@@ -385,7 +385,7 @@ export async function setupHITLFlowViaAPI(
await waitForTaskInstanceState(request, {
dagId,
- expectedState: "deferred",
+ expectedState: "awaiting_input",
runId: dagRunId,
taskId: "wait_for_option",
});
@@ -398,7 +398,7 @@ export async function setupHITLFlowViaAPI(
await waitForTaskInstanceState(request, {
dagId,
- expectedState: "deferred",
+ expectedState: "awaiting_input",
runId: dagRunId,
taskId: "wait_for_multiple_options",
});
@@ -431,7 +431,7 @@ export async function setupHITLFlowViaAPI(
await waitForTaskInstanceState(request, {
dagId,
- expectedState: "deferred",
+ expectedState: "awaiting_input",
runId: dagRunId,
taskId: "valid_input_and_options",
});
@@ -451,7 +451,7 @@ export async function setupHITLFlowViaAPI(
if (approve) {
await waitForTaskInstanceState(request, {
dagId,
- expectedState: "deferred",
+ expectedState: "awaiting_input",
runId: dagRunId,
taskId: "choose_a_branch_to_run",
});
diff --git a/airflow-core/src/airflow/utils/state.py
b/airflow-core/src/airflow/utils/state.py
index 332efb10553..89c8efcaa3e 100644
--- a/airflow-core/src/airflow/utils/state.py
+++ b/airflow-core/src/airflow/utils/state.py
@@ -56,6 +56,7 @@ class IntermediateTIState(str, Enum):
UP_FOR_RETRY = "up_for_retry"
UP_FOR_RESCHEDULE = "up_for_reschedule"
DEFERRED = "deferred"
+ AWAITING_INPUT = "awaiting_input"
def __str__(self) -> str:
return self.value
@@ -87,6 +88,7 @@ class TaskInstanceState(str, Enum):
UPSTREAM_FAILED = TerminalTIState.UPSTREAM_FAILED # One or more upstream
deps failed
SKIPPED = TerminalTIState.SKIPPED # Skipped by branching or some other
mechanism
DEFERRED = IntermediateTIState.DEFERRED # Deferrable operator waiting on
a trigger
+ AWAITING_INPUT = IntermediateTIState.AWAITING_INPUT # Parked waiting for
human input (HITL)
def __str__(self) -> str:
return self.value
@@ -130,6 +132,7 @@ class State:
UPSTREAM_FAILED = TaskInstanceState.UPSTREAM_FAILED
SKIPPED = TaskInstanceState.SKIPPED
DEFERRED = TaskInstanceState.DEFERRED
+ AWAITING_INPUT = TaskInstanceState.AWAITING_INPUT
finished_dr_states: frozenset[DagRunState] =
frozenset([DagRunState.SUCCESS, DagRunState.FAILED])
unfinished_dr_states: frozenset[DagRunState] =
frozenset([DagRunState.QUEUED, DagRunState.RUNNING])
@@ -157,6 +160,7 @@ class State:
TaskInstanceState.REMOVED: "lightgrey",
TaskInstanceState.SCHEDULED: "tan",
TaskInstanceState.DEFERRED: "mediumpurple",
+ TaskInstanceState.AWAITING_INPUT: "darkorange",
}
@classmethod
@@ -200,6 +204,7 @@ class State:
TaskInstanceState.UP_FOR_RETRY,
TaskInstanceState.UP_FOR_RESCHEDULE,
TaskInstanceState.DEFERRED,
+ TaskInstanceState.AWAITING_INPUT,
]
)
"""
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py
index eabbb0a7559..6d21cc8c6c4 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py
@@ -31,6 +31,7 @@ from sqlalchemy.orm import Session
from airflow._shared.timezones.timezone import utc, utcnow
from airflow.models.hitl import HITLDetail
from airflow.models.log import Log
+from airflow.models.taskinstance import TaskInstance as TIModel
from airflow.sdk.execution_time.hitl import HITLUser
from airflow.utils.state import TaskInstanceState
@@ -332,6 +333,60 @@ class TestUpdateHITLDetailEndpoint:
assert audit_log is not None
_assert_sample_audit_log(audit_log)
+ @time_machine.travel(datetime(2025, 7, 3, 0, 0, 0), tick=False)
+ @pytest.mark.usefixtures("sample_hitl_detail")
+ def test_response_resumes_awaiting_input_task_without_trigger(
+ self,
+ test_client: TestClient,
+ sample_ti_url_identifier: str,
+ sample_update_payload: dict[str, Any],
+ sample_ti: TaskInstance,
+ session: Session,
+ ) -> None:
+ """A human response transitions a parked AWAITING_INPUT task straight
to SCHEDULED, no trigger."""
+ # Park the task exactly as the 3.3 operator does: AWAITING_INPUT,
next_method set, no trigger.
+ ti = session.get(TIModel, sample_ti.id)
+ assert ti is not None
+ ti.state = TaskInstanceState.AWAITING_INPUT
+ ti.next_method = "execute_complete"
+ ti.next_kwargs = {}
+ ti.trigger_id = None
+ session.commit()
+ # Sanity: the park persisted before we respond.
+ session.expire_all()
+ parked = session.get(TIModel, sample_ti.id)
+ assert parked is not None
+ assert parked.state == TaskInstanceState.AWAITING_INPUT
+
+ response = test_client.patch(
+ f"{sample_ti_url_identifier}/hitlDetails",
+ json=sample_update_payload,
+ )
+ assert response.status_code == 200
+
+ session.expire_all()
+ refreshed = session.get(TIModel, sample_ti.id)
+ assert refreshed is not None
+ # Resumed so the scheduler re-queues execute_complete -- with no
triggerer involved.
+ assert refreshed.state == TaskInstanceState.SCHEDULED
+ assert refreshed.trigger_id is None
+ assert refreshed.next_method == "execute_complete"
+ assert "event" in (refreshed.next_kwargs or {})
+
+ @pytest.mark.usefixtures("sample_hitl_detail")
+ def test_should_respond_400_for_invalid_option(
+ self,
+ test_client: TestClient,
+ sample_ti_url_identifier: str,
+ ) -> None:
+ """Write-side validation rejects an out-of-set option (400) instead of
failing the task on resume."""
+ response = test_client.patch(
+ f"{sample_ti_url_identifier}/hitlDetails",
+ json={"chosen_options": ["Maybe"], "params_input": {}},
+ )
+ assert response.status_code == 400
+ assert "Invalid options" in response.json()["detail"]
+
@time_machine.travel(datetime(2025, 7, 3, 0, 0, 0), tick=False)
@pytest.mark.usefixtures("sample_hitl_detail_respondent")
def test_should_respond_200_to_assigned_users(
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dashboard.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dashboard.py
index ccfc2b26f78..ebb31fff965 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dashboard.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dashboard.py
@@ -246,6 +246,7 @@ class TestHistoricalMetricsDataEndpoint:
{
"dag_run_states": {"failed": 1, "queued": 1, "running": 1,
"success": 1},
"task_instance_states": {
+ "awaiting_input": 0,
"deferred": 0,
"failed": 2,
"no_status": 4,
@@ -268,6 +269,7 @@ class TestHistoricalMetricsDataEndpoint:
{
"dag_run_states": {"failed": 1, "queued": 0, "running": 0,
"success": 0},
"task_instance_states": {
+ "awaiting_input": 0,
"deferred": 0,
"failed": 2,
"no_status": 0,
@@ -290,6 +292,7 @@ class TestHistoricalMetricsDataEndpoint:
{
"dag_run_states": {"failed": 1, "queued": 1, "running": 1,
"success": 0},
"task_instance_states": {
+ "awaiting_input": 0,
"deferred": 0,
"failed": 2,
"no_status": 4,
diff --git a/airflow-core/tests/unit/dag_processing/test_processor.py
b/airflow-core/tests/unit/dag_processing/test_processor.py
index ef92372f573..c1d5c76712e 100644
--- a/airflow-core/tests/unit/dag_processing/test_processor.py
+++ b/airflow-core/tests/unit/dag_processing/test_processor.py
@@ -1944,6 +1944,7 @@ class TestDagProcessingMessageTypes:
dag_processor_types = get_type_names(ToDagProcessor)
in_supervisor_but_not_in_manager = {
+ "AwaitInputTask",
"DeferTask",
"DeleteXCom",
"GetAssetByName",
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 258ce3db4cf..cca74569bdf 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -82,6 +82,7 @@ from airflow.models.dagwarning import DagWarning
from airflow.models.db_callback_request import DbCallbackRequest
from airflow.models.deadline import Deadline
from airflow.models.deadline_alert import DeadlineAlert
+from airflow.models.hitl import HITLDetail
from airflow.models.log import Log
from airflow.models.pool import Pool
from airflow.models.serialized_dag import SerializedDagModel
@@ -7312,6 +7313,97 @@ class TestSchedulerJob:
assert ti1.next_method == "__fail__"
assert ti2.state == State.DEFERRED
+ def test_awaiting_input_timeout_with_defaults_resumes(self, dag_maker):
+ """
+ A parked ``awaiting_input`` task past its deadline with defaults is
resumed to SCHEDULED by
+ the scheduler sweep (the default applied as the response), while a
not-yet-expired one is
+ left untouched. This proves HITL timeout liveness with no triggerer
running.
+ """
+ session = settings.Session()
+ with dag_maker(
+ dag_id="test_awaiting_input_defaults",
+ start_date=DEFAULT_DATE,
+ schedule="@once",
+ session=session,
+ ):
+ EmptyOperator(task_id="dummy1")
+ dr_past = dag_maker.create_dagrun()
+ dr_future = dag_maker.create_dagrun(
+ run_id="future", logical_date=DEFAULT_DATE +
datetime.timedelta(seconds=1)
+ )
+ ti_past = dr_past.get_task_instance("dummy1", session=session)
+ ti_future = dr_future.get_task_instance("dummy1", session=session)
+ for ti, offset in ((ti_past, -60), (ti_future, 60)):
+ ti.state = State.AWAITING_INPUT
+ ti.trigger_timeout = timezone.utcnow() +
datetime.timedelta(seconds=offset)
+ ti.next_method = "execute_complete"
+ ti.next_kwargs = {}
+ session.add(
+ HITLDetail(
+ ti_id=ti.id,
+ options=["Approve", "Reject"],
+ subject="approve?",
+ defaults=["Approve"],
+ multiple=False,
+ params={},
+ )
+ )
+ session.flush()
+
+ self.job_runner = SchedulerJobRunner(job=Job())
+ self.job_runner.check_awaiting_input_timeouts(session=session)
+
+ session.refresh(ti_past)
+ session.refresh(ti_future)
+ # Past-deadline task resumed with the default applied; the future one
is left parked.
+ assert ti_past.state == State.SCHEDULED
+ assert ti_past.next_method == "execute_complete"
+ assert ti_past.next_kwargs["event"]["chosen_options"] == ["Approve"]
+ assert ti_past.next_kwargs["event"]["timedout"] is True
+ assert ti_future.state == State.AWAITING_INPUT
+ hitl_detail = session.get(HITLDetail, ti_past.id)
+ assert hitl_detail.chosen_options == ["Approve"]
+ assert hitl_detail.responded_by is None
+ assert hitl_detail.responded_at is not None
+
+ def test_awaiting_input_timeout_without_defaults_fails(self, dag_maker):
+ """A parked ``awaiting_input`` task past its deadline with no defaults
is failed by the sweep."""
+ session = settings.Session()
+ with dag_maker(
+ dag_id="test_awaiting_input_nodefaults",
+ start_date=DEFAULT_DATE,
+ schedule="@once",
+ session=session,
+ ):
+ EmptyOperator(task_id="dummy1")
+ dr = dag_maker.create_dagrun()
+ ti = dr.get_task_instance("dummy1", session=session)
+ ti.state = State.AWAITING_INPUT
+ ti.trigger_timeout = timezone.utcnow() - datetime.timedelta(seconds=60)
+ ti.next_method = "execute_complete"
+ ti.next_kwargs = {}
+ session.add(
+ HITLDetail(
+ ti_id=ti.id,
+ options=["Approve", "Reject"],
+ subject="approve?",
+ defaults=None,
+ multiple=False,
+ params={},
+ )
+ )
+ session.flush()
+
+ self.job_runner = SchedulerJobRunner(job=Job())
+ self.job_runner.check_awaiting_input_timeouts(session=session)
+
+ session.refresh(ti)
+ # Resumed into execute_complete with a timeout failure event (raises
HITLTimeoutError on
+ # resume), rather than the generic __fail__ deferral-timeout path.
+ assert ti.state == State.SCHEDULED
+ assert ti.next_method == "execute_complete"
+ assert ti.next_kwargs["event"]["error_type"] == "timeout"
+
def test_retry_on_db_error_when_update_timeout_triggers(self, dag_maker,
testing_dag_bundle, session):
"""
Tests that it will retry on DB error like deadlock when updating
timeout triggers.
diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py
b/airflow-core/tests/unit/jobs/test_triggerer_job.py
index eab9540df3c..4984c0864a8 100644
--- a/airflow-core/tests/unit/jobs/test_triggerer_job.py
+++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py
@@ -1907,6 +1907,7 @@ class TestTriggererMessageTypes:
trigger_runner_types = get_type_names(ToTriggerRunner)
in_supervisor_but_not_in_trigger_supervisor = {
+ "AwaitInputTask",
"DeferTask",
"GetAssetByName",
"GetAssetByUri",
diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
index fff609bcaea..4aac0e9a6fa 100644
--- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
+++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
@@ -994,6 +994,7 @@ class TaskInstanceState(str, Enum):
UPSTREAM_FAILED = "upstream_failed"
SKIPPED = "skipped"
DEFERRED = "deferred"
+ AWAITING_INPUT = "awaiting_input"
class TaskInstancesBatchBody(BaseModel):
diff --git
a/devel-common/src/docs/diagrams/task_lifecycle/awaiting_input_state.png
b/devel-common/src/docs/diagrams/task_lifecycle/awaiting_input_state.png
new file mode 100644
index 00000000000..3ba1c879ebe
Binary files /dev/null and
b/devel-common/src/docs/diagrams/task_lifecycle/awaiting_input_state.png differ
diff --git
a/providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml
b/providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml
index a09d22b76fd..8c0c6a3eacd 100644
---
a/providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml
+++
b/providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml
@@ -1341,6 +1341,7 @@ components:
- upstream_failed
- skipped
- deferred
+ - awaiting_input
title: TaskInstanceState
description: 'All possible states that a Task Instance can be in.
diff --git
a/providers/standard/src/airflow/providers/standard/operators/hitl.py
b/providers/standard/src/airflow/providers/standard/operators/hitl.py
index bef40d504a3..ce42221e7d9 100644
--- a/providers/standard/src/airflow/providers/standard/operators/hitl.py
+++ b/providers/standard/src/airflow/providers/standard/operators/hitl.py
@@ -21,7 +21,11 @@ import warnings
from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.providers.common.compat.sdk import
AirflowOptionalProviderFeatureException
-from airflow.providers.standard.version_compat import AIRFLOW_V_3_1_3_PLUS,
AIRFLOW_V_3_1_PLUS
+from airflow.providers.standard.version_compat import (
+ AIRFLOW_V_3_1_3_PLUS,
+ AIRFLOW_V_3_1_PLUS,
+ AIRFLOW_V_3_3_PLUS,
+)
if not AIRFLOW_V_3_1_PLUS:
raise AirflowOptionalProviderFeatureException("Human in the loop
functionality needs Airflow 3.1+.")
@@ -40,6 +44,11 @@ from airflow.sdk.definitions.param import ParamsDict
from airflow.sdk.execution_time.hitl import upsert_hitl_detail
from airflow.sdk.timezone import utcnow
+if AIRFLOW_V_3_3_PLUS:
+ # On Airflow 3.3+ the operator parks the task in the first-class
AWAITING_INPUT state instead of
+ # deferring to a trigger. On older cores this name is absent and the
defer() fallback is used.
+ from airflow.sdk.exceptions import TaskAwaitingInput
+
if TYPE_CHECKING:
from airflow.providers.common.compat.sdk import Context
from airflow.sdk.execution_time.hitl import HITLUser
@@ -58,9 +67,10 @@ class HITLOperator(BaseOperator):
:param params: dictionary of parameter definitions that are in the format
of Dag params such that
a Form Field can be rendered. Entered data is validated (schema,
required fields) like for a Dag run
and added to XCom of the task result.
- :param response_timeout: Maximum time to wait for a human response after
deferring to the trigger.
- This is separate from ``execution_timeout`` which controls the
pre-defer execution phase.
- If not set, no timeout is applied to the human response wait.
+ :param response_timeout: Maximum time to wait for a human response. On
Airflow 3.3+ this is
+ enforced by the scheduler's ``awaiting_input`` timeout sweep; on older
versions it is passed
+ to the triggerer. This is separate from ``execution_timeout``, which
controls the execution
+ phase before the task starts waiting. If not set, no timeout is
applied to the human response wait.
"""
template_fields: Collection[str] = ("subject", "body")
@@ -174,7 +184,13 @@ class HITLOperator(BaseOperator):
raise ValueError('More than one defaults given when "multiple"
is set to False.')
def execute(self, context: Context):
- """Add a Human-in-the-loop Response and then defer to HITLTrigger and
wait for user input."""
+ """
+ Write the Human-in-the-loop request, then wait for a user response.
+
+ On Airflow 3.3+ the task waits in the ``awaiting_input`` state with no
trigger or triggerer
+ involved; on older versions it defers to :class:`HITLTrigger`. Either
way it resumes in
+ ``execute_complete`` once a response (or timeout default) arrives.
+ """
ti_id = context["task_instance"].id
# Write Human-in-the-loop input request to DB
upsert_hitl_detail(
@@ -200,7 +216,16 @@ class HITLOperator(BaseOperator):
for notifier in self.notifiers:
notifier(context)
- # Defer the Human-in-the-loop response checking process to HITLTrigger
+ if AIRFLOW_V_3_3_PLUS:
+ # New core (3.3+): park the task in AWAITING_INPUT -- no trigger,
no triggerer. The task
+ # is resumed by the Core API response handler or the scheduler
timeout sweep, so the
+ # triggerer no longer needs to run for Human-in-the-loop tasks to
make progress.
+ raise TaskAwaitingInput(
+ method_name="execute_complete",
+ timeout=self.response_timeout,
+ )
+
+ # Fallback for cores < 3.3: defer the response check to HITLTrigger on
the triggerer.
self.defer(
trigger=HITLTrigger(
ti_id=ti_id,
diff --git a/providers/standard/tests/unit/standard/operators/test_hitl.py
b/providers/standard/tests/unit/standard/operators/test_hitl.py
index fb6839e0967..7b7d5941b60 100644
--- a/providers/standard/tests/unit/standard/operators/test_hitl.py
+++ b/providers/standard/tests/unit/standard/operators/test_hitl.py
@@ -20,7 +20,11 @@ from uuid import UUID, uuid4
import pytest
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_1_PLUS,
AIRFLOW_V_3_2_PLUS
+from tests_common.test_utils.version_compat import (
+ AIRFLOW_V_3_1_PLUS,
+ AIRFLOW_V_3_2_PLUS,
+ AIRFLOW_V_3_3_PLUS,
+)
if not AIRFLOW_V_3_1_PLUS:
pytest.skip("Human in the loop is only compatible with Airflow >= 3.1.0",
allow_module_level=True)
@@ -47,10 +51,14 @@ from airflow.providers.standard.operators.hitl import (
from airflow.sdk import Param, timezone
from airflow.sdk.definitions.param import ParamsDict
from airflow.sdk.execution_time.hitl import HITLUser
+from airflow.utils.state import TaskInstanceState
from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.version_compat import AIRFLOW_V_3_1_3_PLUS
+if AIRFLOW_V_3_3_PLUS:
+ from airflow.sdk.exceptions import TaskAwaitingInput
+
if TYPE_CHECKING:
from sqlalchemy.orm import Session
@@ -65,6 +73,23 @@ DEFAULT_DATE = timezone.datetime(2016, 1, 1)
INTERVAL = datetime.timedelta(hours=12)
+def _run_execute_until_park(op: HITLOperator, context: Any) -> None:
+ """
+ Run ``execute()`` through the HITL pause point.
+
+ Tolerates both the Airflow 3.3+ behavior (raises ``TaskAwaitingInput`` to
park the task in
+ AWAITING_INPUT) and the < 3.3 fallback (calls ``self.defer()``, mocked
here), so the
+ ``hitl_summary`` enriched just before the pause can be asserted afterwards.
+ """
+ with patch("airflow.providers.standard.operators.hitl.upsert_hitl_detail"):
+ if AIRFLOW_V_3_3_PLUS:
+ with pytest.raises(TaskAwaitingInput):
+ op.execute(context)
+ else:
+ with patch.object(op, "defer"):
+ op.execute(context)
+
+
@pytest.fixture
def hitl_task_and_ti_for_generating_link(dag_maker: DagMaker) ->
tuple[HITLOperator, TaskInstance]:
with dag_maker("test_dag"):
@@ -284,19 +309,31 @@ class TestHITLOperator:
else:
expected_params_in_trigger_kwargs = {"input_1": {"value": 1,
"description": None, "schema": {}}}
- registered_trigger = session.scalar(
- select(Trigger).where(Trigger.classpath ==
"airflow.providers.standard.triggers.hitl.HITLTrigger")
- )
- assert registered_trigger is not None
- assert registered_trigger.kwargs == {
- "ti_id": expected_ti_id,
- "options": ["1", "2", "3", "4", "5"],
- "defaults": ["1"],
- "params": expected_params_in_trigger_kwargs,
- "multiple": False,
- "timeout_datetime": None,
- "poke_interval": 5.0,
- }
+ if AIRFLOW_V_3_3_PLUS:
+ # On Airflow 3.3+ the task parks in AWAITING_INPUT with no trigger
/ triggerer.
+ assert ti.state == TaskInstanceState.AWAITING_INPUT
+ registered_trigger = session.scalar(
+ select(Trigger).where(
+ Trigger.classpath ==
"airflow.providers.standard.triggers.hitl.HITLTrigger"
+ )
+ )
+ assert registered_trigger is None
+ else:
+ registered_trigger = session.scalar(
+ select(Trigger).where(
+ Trigger.classpath ==
"airflow.providers.standard.triggers.hitl.HITLTrigger"
+ )
+ )
+ assert registered_trigger is not None
+ assert registered_trigger.kwargs == {
+ "ti_id": expected_ti_id,
+ "options": ["1", "2", "3", "4", "5"],
+ "defaults": ["1"],
+ "params": expected_params_in_trigger_kwargs,
+ "multiple": False,
+ "timeout_datetime": None,
+ "poke_interval": 5.0,
+ }
@pytest.mark.skipif(not AIRFLOW_V_3_1_3_PLUS, reason="This only works in
airflow-core >= 3.1.3")
@pytest.mark.parametrize(
@@ -1058,11 +1095,7 @@ class TestHITLSummaryForListeners:
response_timeout=datetime.timedelta(minutes=10),
)
- with (
-
patch("airflow.providers.standard.operators.hitl.upsert_hitl_detail"),
- patch.object(op, "defer"),
- ):
- op.execute({"task_instance": MagicMock(id=uuid4())}) # type:
ignore[arg-type]
+ _run_execute_until_park(op, {"task_instance": MagicMock(id=uuid4())})
s = op.hitl_summary
# Validate the timeout value is a parseable ISO string
@@ -1088,11 +1121,7 @@ class TestHITLSummaryForListeners:
options=["OK"],
)
- with (
-
patch("airflow.providers.standard.operators.hitl.upsert_hitl_detail"),
- patch.object(op, "defer"),
- ):
- op.execute({"task_instance": MagicMock(id=uuid4())}) # type:
ignore[arg-type]
+ _run_execute_until_park(op, {"task_instance": MagicMock(id=uuid4())})
assert op.hitl_summary == {
"subject": "Review",
@@ -1321,11 +1350,7 @@ class TestHITLSummaryForListeners:
}
# -- After execute (mocked defer): timeout_datetime added --
- with (
-
patch("airflow.providers.standard.operators.hitl.upsert_hitl_detail"),
- patch.object(op, "defer"),
- ):
- op.execute({"task_instance": MagicMock(id=uuid4())}) # type:
ignore[arg-type]
+ _run_execute_until_park(op, {"task_instance": MagicMock(id=uuid4())})
s = op.hitl_summary
timeout_dt_str = s["timeout_datetime"]
diff --git a/task-sdk/src/airflow/sdk/api/client.py
b/task-sdk/src/airflow/sdk/api/client.py
index 4a3160d6ecd..2a4bb63fd5d 100644
--- a/task-sdk/src/airflow/sdk/api/client.py
+++ b/task-sdk/src/airflow/sdk/api/client.py
@@ -69,6 +69,7 @@ from airflow.sdk.api.datamodels._generated import (
TaskStorePutBody,
TaskStoreResponse,
TerminalStateNonSuccess,
+ TIAwaitingInputStatePayload,
TIDeferredStatePayload,
TIEnterRunningPayload,
TIHeartbeatInfo,
@@ -307,6 +308,11 @@ class TaskInstanceOperations:
# Create a deferred state payload from msg
self.client.patch(f"task-instances/{id}/state",
content=body.model_dump_json())
+ def await_input(self, id: uuid.UUID, msg):
+ """Tell the API server that this TI is parked awaiting human input
(Human-in-the-loop)."""
+ body =
TIAwaitingInputStatePayload(**msg.model_dump(exclude_unset=True,
exclude={"type"}))
+ self.client.patch(f"task-instances/{id}/state",
content=body.model_dump_json())
+
def reschedule(self, id: uuid.UUID, msg: RescheduleTask):
"""Tell the API server that this TI has been reschduled."""
body = TIRescheduleStatePayload(**msg.model_dump(exclude_unset=True,
exclude={"type"}))
diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
index e1ac21585bf..9bdd8ac4fba 100644
--- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
+++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
@@ -215,6 +215,7 @@ class IntermediateTIState(str, Enum):
UP_FOR_RETRY = "up_for_retry"
UP_FOR_RESCHEDULE = "up_for_reschedule"
DEFERRED = "deferred"
+ AWAITING_INPUT = "awaiting_input"
class PrevSuccessfulDagRunResponse(BaseModel):
@@ -245,6 +246,21 @@ class PreviousTIResponse(BaseModel):
duration: Annotated[float | None, Field(title="Duration")] = None
+class TIAwaitingInputStatePayload(BaseModel):
+ """
+ Schema for parking a TaskInstance in an awaiting_input state
(Human-in-the-loop, no trigger).
+ """
+
+ model_config = ConfigDict(
+ extra="forbid",
+ )
+ state: Annotated[Literal["awaiting_input"] | None, Field(title="State")] =
"awaiting_input"
+ timeout: Annotated[timedelta | None, Field(title="Timeout")] = None
+ next_method: Annotated[str, Field(title="Next Method")]
+ next_kwargs: Annotated[dict[str, JsonValue] | None, Field(title="Next
Kwargs")] = None
+ rendered_map_index: Annotated[str | None, Field(title="Rendered Map
Index")] = None
+
+
class TIDeferredStatePayload(BaseModel):
"""
Schema for updating TaskInstance to a deferred state.
@@ -382,6 +398,7 @@ class TaskInstanceState(str, Enum):
UPSTREAM_FAILED = "upstream_failed"
SKIPPED = "skipped"
DEFERRED = "deferred"
+ AWAITING_INPUT = "awaiting_input"
class TaskStatesResponse(BaseModel):
diff --git a/task-sdk/src/airflow/sdk/exceptions.py
b/task-sdk/src/airflow/sdk/exceptions.py
index 0c211de4e36..6f43d5421ec 100644
--- a/task-sdk/src/airflow/sdk/exceptions.py
+++ b/task-sdk/src/airflow/sdk/exceptions.py
@@ -236,6 +236,48 @@ class TaskDeferred(BaseException):
return f"<TaskDeferred trigger={self.trigger}
method={self.method_name}>"
+class TaskAwaitingInput(BaseException):
+ """
+ Signal an operator parking the task in awaiting_input state
(Human-in-the-loop).
+
+ Raised to signal that the operator wishes to pause until external human
input arrives,
+ WITHOUT creating a trigger or involving the triggerer. Resumption is
driven by the Core API
+ response handler (or the scheduler timeout sweep) flipping the task
instance back to SCHEDULED
+ with ``next_method`` / ``next_kwargs`` intact, after which the worker calls
+ ``resume_execution(method_name, kwargs)``.
+
+ Subclasses ``BaseException`` (like ``TaskDeferred``) so that a user
``except Exception`` in
+ ``execute()`` cannot accidentally swallow the park signal.
+ """
+
+ def __init__(
+ self,
+ *,
+ method_name: str,
+ kwargs: dict[str, Any] | None = None,
+ timeout=None,
+ ):
+ super().__init__()
+ self.method_name = method_name
+ self.kwargs = kwargs
+ self.timeout = timeout
+
+ def serialize(self):
+ cls = self.__class__
+ return (
+ f"{cls.__module__}.{cls.__name__}",
+ (),
+ {
+ "method_name": self.method_name,
+ "kwargs": self.kwargs,
+ "timeout": self.timeout,
+ },
+ )
+
+ def __repr__(self) -> str:
+ return f"<TaskAwaitingInput method={self.method_name}>"
+
+
class TaskDeferralError(AirflowException):
"""Raised when a task failed during deferral for some reason."""
diff --git a/task-sdk/src/airflow/sdk/execution_time/comms.py
b/task-sdk/src/airflow/sdk/execution_time/comms.py
index 0e0483be70e..330fe934c7c 100644
--- a/task-sdk/src/airflow/sdk/execution_time/comms.py
+++ b/task-sdk/src/airflow/sdk/execution_time/comms.py
@@ -83,6 +83,7 @@ from airflow.sdk.api.datamodels._generated import (
TaskInstanceState,
TaskStatesResponse,
TaskStoreResponse,
+ TIAwaitingInputStatePayload,
TIDeferredStatePayload,
TIRescheduleStatePayload,
TIRetryStatePayload,
@@ -837,6 +838,12 @@ class DeferTask(TIDeferredStatePayload):
type: Literal["DeferTask"] = "DeferTask"
+class AwaitInputTask(TIAwaitingInputStatePayload):
+ """Park a task instance awaiting human input (Human-in-the-loop), without
a trigger."""
+
+ type: Literal["AwaitInputTask"] = "AwaitInputTask"
+
+
class RetryTask(TIRetryStatePayload):
"""Update a task instance state to up_for_retry."""
@@ -1193,7 +1200,8 @@ class GetDag(BaseModel):
ToSupervisor = Annotated[
- ClearAssetStoreByName
+ AwaitInputTask
+ | ClearAssetStoreByName
| ClearAssetStoreByUri
| ClearTaskStore
| DeferTask
diff --git a/task-sdk/src/airflow/sdk/execution_time/schema/schema.json
b/task-sdk/src/airflow/sdk/execution_time/schema/schema.json
index 094c3f94d9e..c02aa76791e 100644
--- a/task-sdk/src/airflow/sdk/execution_time/schema/schema.json
+++ b/task-sdk/src/airflow/sdk/execution_time/schema/schema.json
@@ -342,6 +342,80 @@
"title": "AssetsByAliasResult",
"type": "object"
},
+ "AwaitInputTask": {
+ "additionalProperties": false,
+ "description": "Park a task instance awaiting human input
(Human-in-the-loop), without a trigger.",
+ "properties": {
+ "state": {
+ "anyOf": [
+ {
+ "const": "awaiting_input",
+ "type": "string"
+ },
+ {
+ "type": "null"
+ }
+ ],
+ "default": "awaiting_input",
+ "title": "State"
+ },
+ "timeout": {
+ "anyOf": [
+ {
+ "format": "duration",
+ "type": "string"
+ },
+ {
+ "type": "null"
+ }
+ ],
+ "default": null,
+ "title": "Timeout"
+ },
+ "next_method": {
+ "title": "Next Method",
+ "type": "string"
+ },
+ "next_kwargs": {
+ "anyOf": [
+ {
+ "additionalProperties": {
+ "$ref": "#/$defs/JsonValue"
+ },
+ "type": "object"
+ },
+ {
+ "type": "null"
+ }
+ ],
+ "default": null,
+ "title": "Next Kwargs"
+ },
+ "rendered_map_index": {
+ "anyOf": [
+ {
+ "type": "string"
+ },
+ {
+ "type": "null"
+ }
+ ],
+ "default": null,
+ "title": "Rendered Map Index"
+ },
+ "type": {
+ "const": "AwaitInputTask",
+ "default": "AwaitInputTask",
+ "title": "Type",
+ "type": "string"
+ }
+ },
+ "required": [
+ "next_method"
+ ],
+ "title": "AwaitInputTask",
+ "type": "object"
+ },
"BundleInfo": {
"description": "Schema for telling task which bundle to run with.",
"properties": {
@@ -3989,7 +4063,8 @@
"up_for_reschedule",
"upstream_failed",
"skipped",
- "deferred"
+ "deferred",
+ "awaiting_input"
],
"title": "TaskInstanceState",
"type": "string"
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index 0005c36bc7e..52821579759 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -63,6 +63,7 @@ from airflow.sdk.execution_time.comms import (
AssetEventsResult,
AssetResult,
AssetStoreResult,
+ AwaitInputTask,
ClearAssetStoreByName,
ClearAssetStoreByUri,
ClearTaskStore,
@@ -197,6 +198,7 @@ SERVER_TERMINATED = "SERVER_TERMINATED"
STATES_SENT_DIRECTLY: frozenset[TaskInstanceState | str] = frozenset(
{
TaskInstanceState.DEFERRED,
+ TaskInstanceState.AWAITING_INPUT,
TaskInstanceState.UP_FOR_RESCHEDULE,
TaskInstanceState.UP_FOR_RETRY,
TaskInstanceState.SUCCESS,
@@ -1289,9 +1291,9 @@ class ActivitySubprocess(WatchedSubprocess):
# falling back to `finish()`, which doesn't accept SUCCESS / DEFERRED /
# SERVER_TERMINATED on the server side. Cleared (and `_terminal_state`
# set) only after the API call returns successfully.
- _pending_terminal_state_msg: SucceedTask | RetryTask | DeferTask |
RescheduleTask | None = attrs.field(
- default=None, init=False
- )
+ _pending_terminal_state_msg: (
+ SucceedTask | RetryTask | DeferTask | RescheduleTask | AwaitInputTask
| None
+ ) = attrs.field(default=None, init=False)
_last_successful_heartbeat: float = attrs.field(default=0, init=False)
_last_heartbeat_attempt: float = attrs.field(default=0, init=False)
@@ -1455,7 +1457,9 @@ class ActivitySubprocess(WatchedSubprocess):
rendered_map_index=self._rendered_map_index,
)
- def _send_terminal_state_msg(self, msg: SucceedTask | RetryTask |
DeferTask | RescheduleTask) -> None:
+ def _send_terminal_state_msg(
+ self, msg: SucceedTask | RetryTask | DeferTask | RescheduleTask |
AwaitInputTask
+ ) -> None:
# Capture the message BEFORE the API call so the recovery dispatcher
# in `update_task_state_if_needed` can re-issue it if the call raises
# (network blip, transient server 5xx). Clear the pending slot and
@@ -1485,6 +1489,9 @@ class ActivitySubprocess(WatchedSubprocess):
elif isinstance(msg, RescheduleTask):
self.client.task_instances.reschedule(self.id, msg)
self._terminal_state = TaskInstanceState.UP_FOR_RESCHEDULE
+ elif isinstance(msg, AwaitInputTask):
+ self.client.task_instances.await_input(self.id, msg)
+ self._terminal_state = TaskInstanceState.AWAITING_INPUT
self._pending_terminal_state_msg = None
def _replay_pending_terminal_state_msg(self) -> None:
@@ -1707,6 +1714,9 @@ class ActivitySubprocess(WatchedSubprocess):
elif isinstance(msg, DeferTask):
self._rendered_map_index = msg.rendered_map_index
self._send_terminal_state_msg(msg)
+ elif isinstance(msg, AwaitInputTask):
+ self._rendered_map_index = msg.rendered_map_index
+ self._send_terminal_state_msg(msg)
elif isinstance(msg, RescheduleTask):
self._send_terminal_state_msg(msg)
elif isinstance(msg, SkipDownstreamTasks):
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index 6f48756433b..123667bfa1d 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -77,11 +77,13 @@ from airflow.sdk.exceptions import (
AirflowRuntimeError,
AirflowTaskTimeout,
ErrorType,
+ TaskAwaitingInput,
TaskDeferred,
)
from airflow.sdk.execution_time.callback_runner import create_executable_runner
from airflow.sdk.execution_time.comms import (
AssetEventDagRunReferenceResult,
+ AwaitInputTask,
CommsDecoder,
DagResult,
DagRunStateResult,
@@ -1368,6 +1370,29 @@ def _defer_task(
return msg, state
+def _await_input_task(
+ awaiting: TaskAwaitingInput, ti: RuntimeTaskInstance, log: Logger
+) -> tuple[ToSupervisor, TaskInstanceState]:
+ """Build the message that parks a task in AWAITING_INPUT (HITL), with no
trigger."""
+ log.info("Pausing task as AWAITING_INPUT.", dag_id=ti.dag_id,
task_id=ti.task_id, run_id=ti.run_id)
+
+ from airflow.sdk.serde import serialize as serde_serialize
+
+ next_kwargs = serde_serialize(awaiting.kwargs or {})
+
+ if TYPE_CHECKING:
+ assert isinstance(next_kwargs, dict)
+
+ msg = AwaitInputTask(
+ timeout=awaiting.timeout,
+ next_method=awaiting.method_name,
+ next_kwargs=next_kwargs,
+ )
+ state = TaskInstanceState.AWAITING_INPUT
+
+ return msg, state
+
+
@Sentry.enrich_errors
@detail_span("run")
def run(
@@ -1386,6 +1411,7 @@ def run(
AirflowTaskTerminated,
DagRunTriggerException,
DownstreamTasksSkipped,
+ TaskAwaitingInput,
TaskDeferred,
)
@@ -1470,6 +1496,9 @@ def run(
except TaskDeferred as defer:
log.info("::group::Post Execute")
msg, state = _defer_task(defer, ti, log)
+ except TaskAwaitingInput as awaiting:
+ log.info("::group::Post Execute")
+ msg, state = _await_input_task(awaiting, ti, log)
except AirflowSkipException as e:
log.info("::group::Post Execute")
if e.args:
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index b72fbdb1e30..62a9d00fdf2 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -74,6 +74,7 @@ from airflow.sdk.execution_time.comms import (
AssetResult,
AssetsByAliasResult,
AssetStoreResult,
+ AwaitInputTask,
ClearAssetStoreByName,
ClearAssetStoreByUri,
ClearTaskStore,
@@ -1717,6 +1718,14 @@ REQUEST_TEST_CASES = [
args=(TI_ID, DeferTask(next_method="execute_callback",
classpath="my-classpath")),
),
),
+ RequestTestCase(
+ message=AwaitInputTask(next_method="execute_complete"),
+ test_id="patch_task_instance_to_awaiting_input",
+ client_mock=ClientMock(
+ method_path="task_instances.await_input",
+ args=(TI_ID, AwaitInputTask(next_method="execute_complete")),
+ ),
+ ),
RequestTestCase(
message=RescheduleTask(
reschedule_date=timezone.parse("2024-10-31T12:00:00Z"),