This is an automated email from the ASF dual-hosted git repository.

kaxil pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b64a0e636d9 Add `awaiting_input` task state for Human-in-the-loop, off 
the triggerer (#68028)
b64a0e636d9 is described below

commit b64a0e636d9b693a1a17d6f226e3a78014c13afc
Author: Kaxil Naik <[email protected]>
AuthorDate: Sat Jun 6 05:03:59 2026 +0100

    Add `awaiting_input` task state for Human-in-the-loop, off the triggerer 
(#68028)
    
    HITL tasks previously deferred onto the triggerer, so a triggerer had to
    stay running for the whole (potentially days-long) wait and could never
    scale to zero while any task awaited a response. They now park in a new
    scheduler-managed `awaiting_input` state that holds neither a worker slot
    nor the triggerer, the same way `up_for_reschedule` waits. A human
    response (Core API) or the scheduler timeout sweep resumes the task
    directly. On Airflow < 3.3 the operator falls back to the trigger path.
---
 .../docs/authoring-and-scheduling/deferring.rst    |   6 ++
 airflow-core/docs/core-concepts/overview.rst       |   3 +-
 airflow-core/docs/core-concepts/tasks.rst          |   1 +
 .../docs/img/diagram_task_lifecycle.md5sum         |   2 +-
 airflow-core/docs/img/diagram_task_lifecycle.png   | Bin 601178 -> 671852 bytes
 airflow-core/docs/img/diagram_task_lifecycle.py    |  23 ++++-
 airflow-core/docs/security/audit_logs.rst          |   1 +
 airflow-core/docs/tutorial/hitl.rst                |  11 +++
 airflow-core/newsfragments/68028.feature.rst       |   1 +
 airflow-core/src/airflow/api/common/mark_tasks.py  |   1 +
 .../src/airflow/api_fastapi/common/parameters.py   |   3 +-
 .../core_api/datamodels/ui/dashboard.py            |   1 +
 .../api_fastapi/core_api/openapi/_private_ui.yaml  |   5 +
 .../core_api/openapi/v2-rest-api-generated.yaml    |   1 +
 .../api_fastapi/core_api/routes/public/hitl.py     |  52 ++++++++++
 .../airflow/api_fastapi/core_api/routes/ui/dags.py |   2 +-
 .../execution_api/datamodels/taskinstance.py       |  30 ++++++
 .../execution_api/routes/task_instances.py         |  47 ++++++++-
 .../api_fastapi/execution_api/versions/__init__.py |   8 +-
 .../execution_api/versions/v2026_06_30.py          |  18 +++-
 .../src/airflow/jobs/scheduler_job_runner.py       | 106 ++++++++++++++++++++-
 airflow-core/src/airflow/models/dagrun.py          |   5 +-
 airflow-core/src/airflow/models/hitl.py            |  18 ++++
 airflow-core/src/airflow/models/taskinstance.py    |  12 ++-
 .../src/airflow/ti_deps/dependencies_states.py     |   7 ++
 .../airflow/ui/openapi-gen/requests/schemas.gen.ts |   8 +-
 .../airflow/ui/openapi-gen/requests/types.gen.ts   |   3 +-
 .../airflow/ui/public/i18n/locales/en/common.json  |   1 +
 .../ui/src/components/NeedsReviewButton.tsx        |   2 +-
 .../src/airflow/ui/src/components/StateIcon.tsx    |  11 ++-
 .../src/airflow/ui/src/constants/stateOptions.ts   |   1 +
 .../airflow/ui/src/hooks/useRequiredActionTabs.ts  |   4 +-
 .../HistoricalMetrics/TaskInstanceMetrics.tsx      |   1 +
 .../HITLTaskInstances/HITLResponseForm.test.tsx    |  34 +++++++
 .../pages/HITLTaskInstances/HITLResponseForm.tsx   |   4 +-
 .../HITLTaskInstances/HITLTaskInstances.test.tsx   |  49 ++++++++++
 .../pages/HITLTaskInstances/HITLTaskInstances.tsx  |  12 +--
 airflow-core/src/airflow/ui/src/theme.ts           |   1 +
 airflow-core/src/airflow/ui/src/utils/hitl.test.ts |  52 +++++++++-
 airflow-core/src/airflow/ui/src/utils/hitl.ts      |  12 ++-
 airflow-core/src/airflow/ui/src/utils/query.ts     |   1 +
 .../src/airflow/ui/src/utils/stateUtils.ts         |   1 +
 .../src/airflow/ui/tests/e2e/utils/test-helpers.ts |  10 +-
 airflow-core/src/airflow/utils/state.py            |   5 +
 .../core_api/routes/public/test_hitl.py            |  55 +++++++++++
 .../core_api/routes/ui/test_dashboard.py           |   3 +
 .../tests/unit/dag_processing/test_processor.py    |   1 +
 airflow-core/tests/unit/jobs/test_scheduler_job.py |  92 ++++++++++++++++++
 airflow-core/tests/unit/jobs/test_triggerer_job.py |   1 +
 .../src/airflowctl/api/datamodels/generated.py     |   1 +
 .../task_lifecycle/awaiting_input_state.png        | Bin 0 -> 2467 bytes
 .../edge3/worker_api/v2-edge-generated.yaml        |   1 +
 .../airflow/providers/standard/operators/hitl.py   |  37 +++++--
 .../tests/unit/standard/operators/test_hitl.py     |  83 ++++++++++------
 task-sdk/src/airflow/sdk/api/client.py             |   6 ++
 .../src/airflow/sdk/api/datamodels/_generated.py   |  17 ++++
 task-sdk/src/airflow/sdk/exceptions.py             |  42 ++++++++
 task-sdk/src/airflow/sdk/execution_time/comms.py   |  10 +-
 .../airflow/sdk/execution_time/schema/schema.json  |  77 ++++++++++++++-
 .../src/airflow/sdk/execution_time/supervisor.py   |  18 +++-
 .../src/airflow/sdk/execution_time/task_runner.py  |  29 ++++++
 .../task_sdk/execution_time/test_supervisor.py     |   9 ++
 62 files changed, 975 insertions(+), 83 deletions(-)

diff --git a/airflow-core/docs/authoring-and-scheduling/deferring.rst 
b/airflow-core/docs/authoring-and-scheduling/deferring.rst
index 0e54a9ca593..95c7c2b0b47 100644
--- a/airflow-core/docs/authoring-and-scheduling/deferring.rst
+++ b/airflow-core/docs/authoring-and-scheduling/deferring.rst
@@ -54,6 +54,12 @@ If you want to use pre-written deferrable operators that 
come with Airflow, such
 
 Airflow automatically handles and implements the deferral processes for you.
 
+.. note::
+
+    :doc:`Human-in-the-loop <../tutorial/hitl>` operators do **not** use 
deferral or the triggerer.
+    They wait in the scheduler-managed ``awaiting_input`` task state, so a 
deployment that waits only
+    on human input rather than deferrable operators does not need a running 
triggerer.
+
 If you're upgrading existing Dags to use deferrable operators, Airflow 
contains API-compatible sensor variants. Add these variants into your Dag to 
use deferrable operators with no other changes required.
 
 Note that you can't use the deferral ability from inside custom PythonOperator 
or TaskFlow Python functions. Deferral is only available to traditional, 
class-based operators.
diff --git a/airflow-core/docs/core-concepts/overview.rst 
b/airflow-core/docs/core-concepts/overview.rst
index dc51476a263..d8c1d5f959e 100644
--- a/airflow-core/docs/core-concepts/overview.rst
+++ b/airflow-core/docs/core-concepts/overview.rst
@@ -77,7 +77,8 @@ performance in your Airflow:
 
 * Optional *triggerer*, which executes deferred tasks in an asyncio event 
loop. In basic installation
   where deferred tasks are not used, a triggerer is not necessary. More about 
deferring tasks can be
-  found in :doc:`/authoring-and-scheduling/deferring`.
+  found in :doc:`/authoring-and-scheduling/deferring`. Note that 
:doc:`Human-in-the-loop </tutorial/hitl>`
+  tasks wait in the scheduler-managed ``awaiting_input`` state and do not use 
the triggerer.
 
 * Optional folder of *plugins*. Plugins are a way to extend Airflow's 
functionality (similar to installed
   packages). Plugins are read by the *scheduler*, *Dag processor*, *triggerer* 
and *webserver*. More about
diff --git a/airflow-core/docs/core-concepts/tasks.rst 
b/airflow-core/docs/core-concepts/tasks.rst
index 24553f8a8ab..95d9846dd5e 100644
--- a/airflow-core/docs/core-concepts/tasks.rst
+++ b/airflow-core/docs/core-concepts/tasks.rst
@@ -83,6 +83,7 @@ The possible states for a Task Instance are:
 * ``up_for_retry``: The task failed, but has retry attempts left and will be 
rescheduled.
 * ``up_for_reschedule``: The task is a :doc:`Sensor <sensors>` that is in 
``reschedule`` mode
 * ``deferred``: The task has been :doc:`deferred to a trigger 
<../authoring-and-scheduling/deferring>`
+* ``awaiting_input``: The task is a :doc:`Human-in-the-loop 
<../tutorial/hitl>` task waiting for a human response. It is managed by the 
scheduler and uses neither a worker slot nor the triggerer.
 * ``removed``: The task has vanished from the Dag since the run started
 
 .. image:: /img/diagram_task_lifecycle.png
diff --git a/airflow-core/docs/img/diagram_task_lifecycle.md5sum 
b/airflow-core/docs/img/diagram_task_lifecycle.md5sum
index 78135f22287..6cc716bd17e 100644
--- a/airflow-core/docs/img/diagram_task_lifecycle.md5sum
+++ b/airflow-core/docs/img/diagram_task_lifecycle.md5sum
@@ -1 +1 @@
-f129c61777c2b56652b2c68d36c309ee
+0821383375d44dc24d69c5b68d50f423
diff --git a/airflow-core/docs/img/diagram_task_lifecycle.png 
b/airflow-core/docs/img/diagram_task_lifecycle.png
index 6c2cea9f78a..5687cdb4ad9 100644
Binary files a/airflow-core/docs/img/diagram_task_lifecycle.png and 
b/airflow-core/docs/img/diagram_task_lifecycle.png differ
diff --git a/airflow-core/docs/img/diagram_task_lifecycle.py 
b/airflow-core/docs/img/diagram_task_lifecycle.py
index aa24e2fc3ae..061a3912d03 100644
--- a/airflow-core/docs/img/diagram_task_lifecycle.py
+++ b/airflow-core/docs/img/diagram_task_lifecycle.py
@@ -47,6 +47,7 @@ SHARED_STATE_IMG = (DIAGRAMS_DIR / "task_lifecycle" / 
"shared_state.png").as_pos
 TERMINAL_STATE_IMG = (DIAGRAMS_DIR / "task_lifecycle" / 
"terminal_state.png").as_posix()
 SENSOR_STATE_IMG = (DIAGRAMS_DIR / "task_lifecycle" / 
"sensor_state.png").as_posix()
 DEFERRABLE_STATE_IMG = (DIAGRAMS_DIR / "task_lifecycle" / 
"deferrable_state.png").as_posix()
+AWAITING_INPUT_STATE_IMG = (DIAGRAMS_DIR / "task_lifecycle" / 
"awaiting_input_state.png").as_posix()
 
 STATE_NODE_ATTRS = {"width": "4.16", "height": "1", "fontname": "Monospace", 
"fontsize": "20"}
 COMPONENT_NODE_ATTRS = {
@@ -107,6 +108,7 @@ def generate_task_lifecycle_diagram():
         state_scheduled = Custom("scheduled", SHARED_STATE_IMG, 
**STATE_NODE_ATTRS)
         state_queued = Custom("queued", SHARED_STATE_IMG, **STATE_NODE_ATTRS)
         state_deferred = Custom("deferred", DEFERRABLE_STATE_IMG, 
**STATE_NODE_ATTRS)
+        state_awaiting_input = Custom("awaiting_input", 
AWAITING_INPUT_STATE_IMG, **STATE_NODE_ATTRS)
         state_running = Custom("running", SHARED_STATE_IMG, **STATE_NODE_ATTRS)
         state_up_for_reschedule = Custom("up_for_reschedule", 
SENSOR_STATE_IMG, **STATE_NODE_ATTRS)
         state_restarting = Custom("restarting", SHARED_STATE_IMG, 
**STATE_NODE_ATTRS)
@@ -150,6 +152,11 @@ def generate_task_lifecycle_diagram():
             CONDITION_IMG,
             **CONDITION_NODE_ATTRS,
         )
+        cond_hitl_requested = Custom(
+            "\n\n\n\n\nHuman-in-the-loop task,\nand human input is requested?",
+            CONDITION_IMG,
+            **CONDITION_NODE_ATTRS,
+        )
         cond_skip_signal = Custom("\n\n\n\n\nSkip signal is raised?", 
CONDITION_IMG, **CONDITION_NODE_ATTRS)
         cond_sensor_reschedule = Custom(
             "\n\n\n\n\nTask is a sensor in reschedule mode,\nand result is 
undetermined?",
@@ -183,7 +190,14 @@ def generate_task_lifecycle_diagram():
         cond_trigger_task_2 >> Edge(label="YES") >> cond_task_complete_1
         cond_task_complete_1 >> Edge(label="NO") >> state_deferred
         cond_task_complete_1 >> Edge(label="YES") >> cond_defer_signal_raised
-        component_worker >> state_running >> cond_defer_signal_raised
+        component_worker >> state_running >> cond_hitl_requested
+        cond_hitl_requested >> Edge(label="YES") >> state_awaiting_input
+        cond_hitl_requested >> Edge(label="NO") >> cond_defer_signal_raised
+        (
+            state_awaiting_input
+            >> Edge(label="response received,\nor timeout default applied")
+            >> component_scheduler
+        )
         cond_defer_signal_raised >> Edge(label="NO") >> cond_skip_signal
         cond_defer_signal_raised >> Edge(label="YES") >> component_triggerer
         cond_skip_signal >> Edge(label="NO") >> cond_sensor_reschedule
@@ -218,6 +232,13 @@ def generate_task_lifecycle_diagram():
                 height="0.77",
                 **LEGEND_NODE_ATTRS,
             )
+            Custom(
+                "\n\nState for Human-in-the-loop Tasks",
+                AWAITING_INPUT_STATE_IMG,
+                width="3.2",
+                height="0.77",
+                **LEGEND_NODE_ATTRS,
+            )
             Custom("\n\nTerminal State", TERMINAL_STATE_IMG, width="3.2", 
height="0.77", **LEGEND_NODE_ATTRS)
             Custom("\n\nShared State", SHARED_STATE_IMG, width="3.2", 
height="0.77", **LEGEND_NODE_ATTRS)
             Custom("\n\nComponent", COMPONENT_IMG, width="2.53", 
height="0.77", **LEGEND_NODE_ATTRS)
diff --git a/airflow-core/docs/security/audit_logs.rst 
b/airflow-core/docs/security/audit_logs.rst
index 3b8a09435e1..3c7df803cb0 100644
--- a/airflow-core/docs/security/audit_logs.rst
+++ b/airflow-core/docs/security/audit_logs.rst
@@ -187,6 +187,7 @@ Task Instance Events
 - ``queued``: Task instance is queued for execution
 - ``scheduled``: Task instance is scheduled
 - ``deferred``: Task instance is deferred (waiting for trigger)
+- ``awaiting_input``: Task instance is waiting for human input 
(Human-in-the-loop)
 - ``restarting``: Task instance is restarting
 - ``removed``: Task instance was removed
 
diff --git a/airflow-core/docs/tutorial/hitl.rst 
b/airflow-core/docs/tutorial/hitl.rst
index 3a5e35c6f79..8e89b49d88d 100644
--- a/airflow-core/docs/tutorial/hitl.rst
+++ b/airflow-core/docs/tutorial/hitl.rst
@@ -23,6 +23,17 @@ HITLOperator (Human-in-the-loop)
 Human-in-the-Loop (HITL) functionality allows you to incorporate human 
decision-making directly into your workflows.
 This powerful feature enables workflows to pause and wait for human input, 
making it perfect for approval processes, manual quality checks, and scenarios 
where human judgment is essential.
 
+.. versionchanged:: 3.3
+
+   A HITL task waiting for input now uses a dedicated, scheduler-managed 
``awaiting_input`` task state
+   instead of deferring onto the triggerer. While waiting, the task holds 
neither a worker slot nor the
+   triggerer, so the triggerer can scale to zero even while tasks await a 
response; tasks resume on a human
+   response or on the scheduler's response-timeout sweep. On Airflow 3.1 and 
3.2, HITL tasks use the older
+   trigger-based deferral.
+
+   A waiting ``awaiting_input`` task does not occupy a pool slot. This differs 
from the older deferral
+   path, where a deferred HITL task counted against a pool that had 
``include_deferred`` enabled.
+
 In this tutorial, we will explore how to use the HITL operators in workflows 
and demonstrate how it would look like in Airflow UI.
 
 An HITL Example Dag
diff --git a/airflow-core/newsfragments/68028.feature.rst 
b/airflow-core/newsfragments/68028.feature.rst
new file mode 100644
index 00000000000..fa9db5b21d8
--- /dev/null
+++ b/airflow-core/newsfragments/68028.feature.rst
@@ -0,0 +1 @@
+Human-in-the-loop tasks now wait in a new ``awaiting_input`` task state 
managed by the scheduler instead of deferring onto the triggerer, so the 
triggerer can scale to zero while tasks await a human response. Waiting tasks 
resume directly on a response or on the scheduler's response-timeout sweep. On 
Airflow versions before 3.3 the operator falls back to the previous 
trigger-based path.
diff --git a/airflow-core/src/airflow/api/common/mark_tasks.py 
b/airflow-core/src/airflow/api/common/mark_tasks.py
index 915af5952a3..efc2a016a43 100644
--- a/airflow-core/src/airflow/api/common/mark_tasks.py
+++ b/airflow-core/src/airflow/api/common/mark_tasks.py
@@ -301,6 +301,7 @@ def set_dag_run_state_to_failed(
         TaskInstanceState.RUNNING,
         TaskInstanceState.DEFERRED,
         TaskInstanceState.UP_FOR_RESCHEDULE,
+        TaskInstanceState.AWAITING_INPUT,
     )
 
     # Mark only RUNNING task instances.
diff --git a/airflow-core/src/airflow/api_fastapi/common/parameters.py 
b/airflow-core/src/airflow/api_fastapi/common/parameters.py
index 56b4c20884c..4daf3418e2c 100644
--- a/airflow-core/src/airflow/api_fastapi/common/parameters.py
+++ b/airflow-core/src/airflow/api_fastapi/common/parameters.py
@@ -1220,7 +1220,7 @@ class _PendingActionsFilter(BaseParam[bool]):
             .join(TaskInstance, HITLDetail.ti_id == TaskInstance.id)
             .where(
                 HITLDetail.responded_at.is_(None),
-                TaskInstance.state == TaskInstanceState.DEFERRED,
+                TaskInstance.state.in_((TaskInstanceState.DEFERRED, 
TaskInstanceState.AWAITING_INPUT)),
             )
             .where(TaskInstance.dag_id == DagModel.dag_id)
             .scalar_subquery()
@@ -1600,6 +1600,7 @@ state_priority: list[None | TaskInstanceState] = [
     TaskInstanceState.QUEUED,
     TaskInstanceState.SCHEDULED,
     TaskInstanceState.DEFERRED,
+    TaskInstanceState.AWAITING_INPUT,
     TaskInstanceState.RUNNING,
     TaskInstanceState.RESTARTING,
     None,
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/dashboard.py 
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/dashboard.py
index 4f48ea48351..46fc051daf3 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/dashboard.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/dashboard.py
@@ -44,6 +44,7 @@ class TaskInstanceStateCount(BaseModel):
     upstream_failed: int
     skipped: int
     deferred: int
+    awaiting_input: int
 
 
 class HistoricalMetricDataResponse(BaseModel):
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
index de3f5869864..8b28661c26a 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
+++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
@@ -3758,6 +3758,7 @@ components:
       - upstream_failed
       - skipped
       - deferred
+      - awaiting_input
       title: TaskInstanceState
       description: 'All possible states that a Task Instance can be in.
 
@@ -3804,6 +3805,9 @@ components:
         deferred:
           type: integer
           title: Deferred
+        awaiting_input:
+          type: integer
+          title: Awaiting Input
       type: object
       required:
       - no_status
@@ -3819,6 +3823,7 @@ components:
       - upstream_failed
       - skipped
       - deferred
+      - awaiting_input
       title: TaskInstanceStateCount
       description: TaskInstance serializer for responses.
     TeamCollectionResponse:
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index 6c0ec008322..180d9b98a43 100644
--- 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++ 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -15867,6 +15867,7 @@ components:
       - upstream_failed
       - skipped
       - deferred
+      - awaiting_input
       title: TaskInstanceState
       description: 'All possible states that a Task Instance can be in.
 
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py
index 45285e108a7..2ad11a245d6 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py
@@ -67,6 +67,9 @@ from airflow.models.dagrun import DagRun
 from airflow.models.hitl import HITLDetail as HITLDetailModel, HITLUser
 from airflow.models.taskinstance import TaskInstance as TI
 from airflow.models.taskinstancehistory import TaskInstanceHistory as TIH
+from airflow.models.trigger import handle_event_submit
+from airflow.triggers.base import TriggerEvent
+from airflow.utils.state import TaskInstanceState
 
 task_instances_hitl_router = AirflowRouter(
     tags=["Task Instance"],
@@ -161,6 +164,23 @@ def update_hitl_detail(
         map_index=map_index,
     )
 
+    # Acquire row locks in a fixed order -- TaskInstance first, then the HITL 
row -- matching the
+    # Execution API park transition, so a human response racing the worker's 
park cannot deadlock.
+    # Locking the TI also serializes respond-vs-clear (the clear path locks 
the TI, not the HITL row).
+    locked_ti = (
+        session.get(TI, task_instance.id, with_for_update={"of": TI})
+        if isinstance(task_instance, TI)
+        else None
+    )
+    # Lock the hitl_detail row (FOR UPDATE OF hitl_detail). of= scopes the 
lock to hitl_detail, which
+    # eager-joins task_instance (lazy="joined"); a bare with_for_update() 
would emit FOR UPDATE against
+    # the nullable side of that outer join, which Postgres rejects. The 
joinedloaded relationship object
+    # reused below is the same identity-mapped row, now locked for this 
transaction.
+    session.execute(
+        select(HITLDetailModel)
+        .where(HITLDetailModel.ti_id == task_instance.id)
+        .with_for_update(of=HITLDetailModel)
+    )
     hitl_detail_model = task_instance.hitl_detail
     if hitl_detail_model.response_received:
         raise HTTPException(
@@ -187,11 +207,43 @@ def update_hitl_detail(
                 f"User={user_name} (id={user_id}) is not a respondent for the 
task.",
             )
 
+    # Write-side validation: reject an invalid response here (400) instead of 
accepting it and
+    # failing the task later on resume. Mirrors 
HITLOperator.validate_chosen_options + cardinality.
+    allowed_options = set(hitl_detail_model.options or [])
+    invalid_options = set(update_hitl_detail_payload.chosen_options) - 
allowed_options
+    if invalid_options:
+        raise HTTPException(
+            status.HTTP_400_BAD_REQUEST,
+            f"Invalid options {sorted(invalid_options)}; allowed options are 
{sorted(allowed_options)}.",
+        )
+    if not hitl_detail_model.multiple and 
len(update_hitl_detail_payload.chosen_options) > 1:
+        raise HTTPException(
+            status.HTTP_400_BAD_REQUEST,
+            "Multiple options chosen but this Human-in-the-loop task accepts 
only a single option.",
+        )
+
     hitl_detail_model.responded_by = hitl_user
     hitl_detail_model.responded_at = timezone.utcnow()
     hitl_detail_model.chosen_options = 
update_hitl_detail_payload.chosen_options
     hitl_detail_model.params_input = update_hitl_detail_payload.params_input
     session.add(hitl_detail_model)
+
+    # Event-driven resume: if the task is parked waiting for this input, 
transition it directly,
+    # without a trigger. handle_event_submit packs the response into 
next_kwargs["event"], sets
+    # state=SCHEDULED + scheduled_dttm; the scheduler then re-queues 
execute_complete. Gated on the
+    # parked states so a finished/cleared TI is never resurrected. `locked_ti` 
was locked at the top
+    # (TI-before-HITLDetail order), so a concurrent clear cannot interleave 
between this state check
+    # and the resume and have its reset silently overwritten.
+    if locked_ti is not None and locked_ti.state in (
+        TaskInstanceState.AWAITING_INPUT,
+        TaskInstanceState.DEFERRED,
+    ):
+        handle_event_submit(
+            TriggerEvent(hitl_detail_model.as_resume_event_payload()),
+            task_instance=locked_ti,
+            session=session,
+        )
+
     session.commit()
     return HITLDetailResponse.model_validate(hitl_detail_model)
 
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py
index dfd3a71d815..b3f1325a34e 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py
@@ -233,7 +233,7 @@ def get_dags(
             )
             .where(
                 HITLDetail.responded_at.is_(None),
-                TaskInstance.state == TaskInstanceState.DEFERRED,
+                TaskInstance.state.in_((TaskInstanceState.DEFERRED, 
TaskInstanceState.AWAITING_INPUT)),
             )
             .where(TaskInstance.dag_id.in_([dag.dag_id for dag in dags]))
             .order_by(TaskInstance.dag_id)
diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
index a0d97390801..c7f6b1ee9f8 100644
--- 
a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
+++ 
b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
@@ -168,6 +168,33 @@ class TIRescheduleStatePayload(StrictBaseModel):
     end_date: UtcDateTime
 
 
+class TIAwaitingInputStatePayload(StrictBaseModel):
+    """Schema for parking a TaskInstance in an awaiting_input state 
(Human-in-the-loop, no trigger)."""
+
+    state: Annotated[
+        Literal[IntermediateTIState.AWAITING_INPUT],
+        # Specify a default in the schema, but not in code, so Pydantic marks 
it as required.
+        WithJsonSchema(
+            {
+                "type": "string",
+                "enum": [IntermediateTIState.AWAITING_INPUT],
+                "default": IntermediateTIState.AWAITING_INPUT,
+            }
+        ),
+    ]
+    timeout: timedelta | None = None
+    """Optional response deadline (relative); converted to an absolute 
datetime server-side."""
+    next_method: str
+    """The name of the method on the operator to call in the worker after 
input is received."""
+    next_kwargs: Annotated[dict[str, JsonValue], Field(default_factory=dict)]
+    """
+    Kwargs to pass to the above method, either a plain dict or an encrypted 
string.
+
+    Both forms will be passed along to the TaskSDK upon resume, the server 
will not handle either.
+    """
+    rendered_map_index: str | None = None
+
+
 class TIRetryStatePayload(StrictBaseModel):
     """Schema for updating TaskInstance to up_for_retry."""
 
@@ -216,6 +243,8 @@ def ti_state_discriminator(v: dict[str, str] | 
StrictBaseModel) -> str:
         return "deferred"
     if state == TIState.UP_FOR_RESCHEDULE:
         return "up_for_reschedule"
+    if state == TIState.AWAITING_INPUT:
+        return "awaiting_input"
     if state == TIState.UP_FOR_RETRY:
         return "up_for_retry"
     return "_other_"
@@ -229,6 +258,7 @@ TIStateUpdate = Annotated[
     | Annotated[TITargetStatePayload, Tag("_other_")]
     | Annotated[TIDeferredStatePayload, Tag("deferred")]
     | Annotated[TIRescheduleStatePayload, Tag("up_for_reschedule")]
+    | Annotated[TIAwaitingInputStatePayload, Tag("awaiting_input")]
     | Annotated[TIRetryStatePayload, Tag("up_for_retry")],
     Field(discriminator=ti_state_discriminator),
 ]
diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
index e1062be1314..8a91614e09a 100644
--- 
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
+++ 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
@@ -55,6 +55,7 @@ from 
airflow.api_fastapi.execution_api.datamodels.taskinstance import (
     PrevSuccessfulDagRunResponse,
     TaskBreadcrumbsResponse,
     TaskStatesResponse,
+    TIAwaitingInputStatePayload,
     TIDeferredStatePayload,
     TIEnterRunningPayload,
     TIHeartbeatInfo,
@@ -79,14 +80,16 @@ from airflow.exceptions import TaskNotFound
 from airflow.models.asset import AssetActive
 from airflow.models.dag import DagModel
 from airflow.models.dagrun import DagRun as DR
+from airflow.models.hitl import HITLDetail
 from airflow.models.log import Log
 from airflow.models.taskinstance import TaskInstance as TI, 
_stop_remaining_tasks
 from airflow.models.taskinstancehistory import TaskInstanceHistory as TIH
 from airflow.models.taskreschedule import TaskReschedule
-from airflow.models.trigger import Trigger
+from airflow.models.trigger import Trigger, handle_event_submit
 from airflow.models.xcom import XComModel
 from airflow.serialization.definitions.assets import SerializedAsset, 
SerializedAssetUniqueKey
 from airflow.state import get_state_backend
+from airflow.triggers.base import TriggerEvent
 from airflow.utils.sqlalchemy import get_dialect_name
 from airflow.utils.state import DagRunState, TaskInstanceState, TerminalTIState
 
@@ -636,6 +639,48 @@ def _create_ti_state_update_query_and_update_state(
             trigger_timeout=timeout,
         )
         updated_state = TaskInstanceState.DEFERRED
+    elif isinstance(ti_patch_payload, TIAwaitingInputStatePayload):
+        # Park the task waiting for human input (Human-in-the-loop). No 
trigger / triggerer is
+        # created: the task is resumed by the Core API response handler or the 
scheduler timeout
+        # sweep. The optional response deadline is stored on the existing 
trigger_timeout column.
+        #
+        # Fixed lock order (TaskInstance -> HITLDetail), matching the Core API 
response path, so a
+        # human response racing this park transition cannot deadlock.
+        ti = session.get(TI, task_instance_id, with_for_update={"of": TI})
+        # Lock only the hitl_detail row (of=...): HITLDetail eager-joins 
task_instance (lazy="joined"),
+        # and Postgres rejects FOR UPDATE against the nullable side of that 
outer join.
+        hitl_detail = session.scalar(
+            select(HITLDetail).where(HITLDetail.ti_id == 
task_instance_id).with_for_update(of=HITLDetail)
+        )
+        if ti is not None and hitl_detail is not None and 
hitl_detail.response_received:
+            # The human responded in the window between the operator writing 
the HITL request and
+            # the worker parking the task. Resume straight to execute_complete 
instead of parking,
+            # which would otherwise strand an already-responded task (no 
trigger/sweep would fire).
+            # Carry next_method/next_kwargs onto the TI first so the resume 
dispatches correctly;
+            # handle_event_submit then injects the response event into 
next_kwargs.
+            ti.next_method = ti_patch_payload.next_method
+            ti.next_kwargs = ti_patch_payload.next_kwargs
+            handle_event_submit(
+                TriggerEvent(hitl_detail.as_resume_event_payload()),
+                task_instance=ti,
+                session=session,
+            )
+            query = update(TI).where(TI.id == 
task_instance_id).values(state=TaskInstanceState.SCHEDULED)
+            updated_state = TaskInstanceState.SCHEDULED
+        else:
+            timeout = None
+            if ti_patch_payload.timeout is not None:
+                timeout = timezone.utcnow() + ti_patch_payload.timeout
+
+            query = update(TI).where(TI.id == task_instance_id)
+            query = query.values(
+                state=TaskInstanceState.AWAITING_INPUT,
+                trigger_id=None,
+                next_method=ti_patch_payload.next_method,
+                next_kwargs=ti_patch_payload.next_kwargs,
+                trigger_timeout=timeout,
+            )
+            updated_state = TaskInstanceState.AWAITING_INPUT
     elif isinstance(ti_patch_payload, TIRescheduleStatePayload):
         # Quick check for poke_interval isn't immediately over MySQL's 
TIMESTAMP limit.
         # This check is only rudimentary to catch trivial user errors, e.g. 
mistakenly
diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
index 50ddb6985e8..9e4d486aa30 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
@@ -47,13 +47,19 @@ from airflow.api_fastapi.execution_api.versions.v2026_06_16 
import (
     AddTeamNameField,
 )
 from airflow.api_fastapi.execution_api.versions.v2026_06_30 import (
+    AddAwaitingInputStatePayload,
     AddConnectionTestEndpoint,
     AddVariableKeysEndpoint,
 )
 
 bundle = VersionBundle(
     HeadVersion(),
-    Version("2026-06-30", AddVariableKeysEndpoint, AddConnectionTestEndpoint),
+    Version(
+        "2026-06-30",
+        AddVariableKeysEndpoint,
+        AddConnectionTestEndpoint,
+        AddAwaitingInputStatePayload,
+    ),
     Version(
         "2026-06-16",
         AddRetryPolicyFields,
diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_06_30.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_06_30.py
index cc751bcc797..f9c22f6cd82 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_06_30.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_06_30.py
@@ -17,7 +17,9 @@
 
 from __future__ import annotations
 
-from cadwyn import VersionChange, endpoint
+from cadwyn import VersionChange, endpoint, schema
+
+from airflow.api_fastapi.execution_api.datamodels.taskinstance import 
TIAwaitingInputStatePayload
 
 
 class AddVariableKeysEndpoint(VersionChange):
@@ -37,3 +39,17 @@ class AddConnectionTestEndpoint(VersionChange):
         endpoint("/connection-tests/{connection_test_id}", 
["PATCH"]).didnt_exist,
         endpoint("/connection-tests/{connection_test_id}/connection", 
["GET"]).didnt_exist,
     )
+
+
+class AddAwaitingInputStatePayload(VersionChange):
+    """Add the awaiting_input task instance state transition payload 
(Human-in-the-loop, no trigger)."""
+
+    description = __doc__
+
+    instructions_to_migrate_to_previous_version = (
+        schema(TIAwaitingInputStatePayload).field("state").didnt_exist,
+        schema(TIAwaitingInputStatePayload).field("timeout").didnt_exist,
+        schema(TIAwaitingInputStatePayload).field("next_method").didnt_exist,
+        schema(TIAwaitingInputStatePayload).field("next_kwargs").didnt_exist,
+        
schema(TIAwaitingInputStatePayload).field("rendered_map_index").didnt_exist,
+    )
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index ba39e44030c..a6496665e8b 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -107,7 +107,7 @@ from airflow.models.serialized_dag import SerializedDagModel
 from airflow.models.taskinstance import TaskInstance
 from airflow.models.taskinstancekey import TaskInstanceKey
 from airflow.models.team import Team
-from airflow.models.trigger import TRIGGER_FAIL_REPR, Trigger, 
TriggerFailureReason
+from airflow.models.trigger import TRIGGER_FAIL_REPR, Trigger, 
TriggerFailureReason, handle_event_submit
 from airflow.observability.metrics import stats_utils
 from airflow.partition_mappers.base import is_rollup
 from airflow.serialization.definitions.assets import SerializedAssetUniqueKey
@@ -115,6 +115,7 @@ from airflow.serialization.definitions.notset import NOTSET
 from airflow.ti_deps.dependencies_states import ACTIVE_STATES, EXECUTION_STATES
 from airflow.timetables.base import compute_rollup_fingerprint
 from airflow.timetables.simple import AssetTriggeredTimetable, 
PartitionedAssetTimetable
+from airflow.triggers.base import TriggerEvent
 from airflow.utils.event_scheduler import EventScheduler
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.retries import MAX_DB_RETRIES, retry_db_transaction, 
run_with_db_retries
@@ -249,9 +250,10 @@ class ConcurrencyMap:
             # max_active_tis_per_dagrun), including DEFERRED.
             self.task_concurrency_map[(dag_id, task_id)] += count
             self.task_dagrun_concurrency_map[(dag_id, run_id, task_id)] += 
count
-            # Only count non-deferred states towards DAG-run active tasks
-            # (max_active_tasks / worker slot accounting).
-            if state != TaskInstanceState.DEFERRED:
+            # Only count states that hold a worker slot towards DAG-run active 
tasks
+            # (max_active_tasks / worker slot accounting). DEFERRED and 
AWAITING_INPUT
+            # are in-flight but parked, holding no worker slot.
+            if state not in (TaskInstanceState.DEFERRED, 
TaskInstanceState.AWAITING_INPUT):
                 self.dag_run_active_tasks_map[dag_id, run_id] += count
 
 
@@ -1644,6 +1646,11 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             self.check_trigger_timeouts,
         )
 
+        timers.call_regular_interval(
+            conf.getfloat("scheduler", "trigger_timeout_check_interval", 
fallback=15.0),
+            self.check_awaiting_input_timeouts,
+        )
+
         timers.call_regular_interval(
             30,
             self._mark_backfills_complete,
@@ -2956,7 +2963,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
 
     @provide_session
     def _emit_ti_metrics(self, *, session: Session = NEW_SESSION) -> None:
-        metric_states = {State.SCHEDULED, State.QUEUED, State.RUNNING, 
State.DEFERRED}
+        metric_states = {State.SCHEDULED, State.QUEUED, State.RUNNING, 
State.DEFERRED, State.AWAITING_INPUT}
         stmt = (
             select(
                 TaskInstance.state,
@@ -3174,6 +3181,95 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                 if num_timed_out_tasks:
                     self.log.info("Timed out %i deferred tasks without fired 
triggers", num_timed_out_tasks)
 
+    @provide_session
+    def check_awaiting_input_timeouts(
+        self, max_retries: int = MAX_DB_RETRIES, *, session: Session = 
NEW_SESSION
+    ) -> None:
+        """
+        Resolve Human-in-the-loop tasks parked in AWAITING_INPUT whose 
response deadline has passed.
+
+        This is the scheduler-side liveness guarantee for HITL and runs 
independently of the
+        triggerer. For each timed-out task instance: if a response arrived 
just before the deadline,
+        resume with it; otherwise, if the request defines defaults, write the 
defaults as the
+        response and resume to success; otherwise fail the task (mirroring 
``check_trigger_timeouts``).
+        """
+        for attempt in run_with_db_retries(max_retries, logger=self.log):
+            with attempt:
+                now = timezone.utcnow()
+                query = (
+                    select(TI)
+                    .where(
+                        TI.state == TaskInstanceState.AWAITING_INPUT,
+                        TI.trigger_timeout < now,
+                    )
+                    .options(joinedload(TI.hitl_detail))
+                    # Bound the batch so a single scheduler tick cannot 
lock/process an unbounded
+                    # backlog of timed-out tasks (which would block concurrent 
responses/clears);
+                    # any remaining rows are handled on subsequent ticks.
+                    .limit(100)
+                )
+                # Lock only the TI rows (of=TI) so HA schedulers don't 
double-resolve, and so the
+                # FOR UPDATE is not applied to the nullable side of the 
hitl_detail outer join.
+                query = with_row_locks(query, of=TI, session=session, 
skip_locked=True)
+                timed_out_tis = session.scalars(query).all()
+                if not timed_out_tis:
+                    return
+
+                num_resolved = 0
+                num_failed = 0
+                for ti in timed_out_tis:
+                    hitl_detail = ti.hitl_detail
+                    if hitl_detail is not None and hitl_detail.responded_at is 
not None:
+                        # A response landed just before the deadline; resume 
with it.
+                        handle_event_submit(
+                            
TriggerEvent(hitl_detail.as_resume_event_payload(timedout=False)),
+                            task_instance=ti,
+                            session=session,
+                        )
+                        num_resolved += 1
+                    elif hitl_detail is not None and hitl_detail.defaults is 
not None:
+                        # Apply the configured defaults as the response, then 
resume to success.
+                        hitl_detail.chosen_options = list(hitl_detail.defaults)
+                        hitl_detail.params_input = {
+                            key: value["value"] if isinstance(value, dict) and 
"value" in value else value
+                            for key, value in (hitl_detail.params or 
{}).items()
+                        }
+                        hitl_detail.responded_by = None
+                        hitl_detail.responded_at = now
+                        session.add(hitl_detail)
+                        handle_event_submit(
+                            
TriggerEvent(hitl_detail.as_resume_event_payload(timedout=True)),
+                            task_instance=ti,
+                            session=session,
+                        )
+                        num_resolved += 1
+                    else:
+                        # No defaults and no response: resume into 
execute_complete with a timeout
+                        # failure event so the operator raises 
HITLTimeoutError (matching the old
+                        # trigger path), rather than a generic 
deferral-timeout failure.
+                        handle_event_submit(
+                            TriggerEvent(
+                                {
+                                    "error": "The Human-in-the-loop response 
timeout has passed "
+                                    "without a response.",
+                                    "error_type": "timeout",
+                                }
+                            ),
+                            task_instance=ti,
+                            session=session,
+                        )
+                        num_failed += 1
+
+                # Flush within the retry block so both branches persist 
consistently (the defaults
+                # branch already flushes via handle_event_submit; the fail 
branch relies on this).
+                session.flush()
+                if num_resolved or num_failed:
+                    self.log.info(
+                        "AWAITING_INPUT timeout sweep: %i resolved 
(response/defaults), %i failed",
+                        num_resolved,
+                        num_failed,
+                    )
+
     # [START find_and_purge_task_instances_without_heartbeats]
     def _find_and_purge_task_instances_without_heartbeats(self) -> None:
         """
diff --git a/airflow-core/src/airflow/models/dagrun.py 
b/airflow-core/src/airflow/models/dagrun.py
index 564e11a9522..1ed4dd198a4 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -1137,7 +1137,10 @@ class DagRun(Base, LoggingMixin):
                     and all(
                         getattr(t.task, "max_active_tis_per_dagrun", None) is 
None for t in self.tis if t.task
                     )
-                    and all(t.state != TaskInstanceState.DEFERRED for t in 
self.tis)
+                    and all(
+                        t.state not in (TaskInstanceState.DEFERRED, 
TaskInstanceState.AWAITING_INPUT)
+                        for t in self.tis
+                    )
                 )
 
             def recalculate(self) -> _UnfinishedStates:
diff --git a/airflow-core/src/airflow/models/hitl.py 
b/airflow-core/src/airflow/models/hitl.py
index 38e2d2f112f..dda93d9c86f 100644
--- a/airflow-core/src/airflow/models/hitl.py
+++ b/airflow-core/src/airflow/models/hitl.py
@@ -177,3 +177,21 @@ class HITLDetail(Base, HITLDetailPropertyMixin):
             onupdate="CASCADE",
         ),
     )
+
+    def as_resume_event_payload(self, *, timedout: bool = False) -> dict[str, 
Any]:
+        """
+        Build the event payload that ``HITLOperator.execute_complete`` 
consumes on resume.
+
+        Single source of truth mapping the response columns to the event dict 
the operator reads,
+        reproducing the provider's ``HITLTriggerEventSuccessPayload`` contract 
so a human response
+        (via the Core API) or the scheduler timeout sweep can resume an 
``awaiting_input`` task
+        directly, without a trigger. ``responded_at`` stays a ``datetime`` 
(``execute_complete``
+        calls ``.isoformat()`` on it); ``responded_by_user`` is ``None`` for 
the timeout default.
+        """
+        return {
+            "chosen_options": list(self.chosen_options or []),
+            "params_input": self.params_input or {},
+            "responded_at": self.responded_at,
+            "responded_by_user": self.responded_by_user,
+            "timedout": timedout,
+        }
diff --git a/airflow-core/src/airflow/models/taskinstance.py 
b/airflow-core/src/airflow/models/taskinstance.py
index 0d0183665fa..92300f4ac31 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -2040,15 +2040,19 @@ class TaskInstance(Base, LoggingMixin, BaseWorkload):
 
     def get_num_active_task_instances(self, *, same_dagrun: bool = False, 
session: Session) -> int:
         """
-        Count active (running or deferred) TIs for this task from the DB.
+        Count active (running, deferred, or awaiting-input) TIs for this task 
from the DB.
 
-        Deferred TIs are included because they are still logically in-flight
-        and must count against max_active_tis_per_dag / 
max_active_tis_per_dagrun.
+        Deferred and awaiting-input TIs are included because they are still 
logically
+        in-flight and must count against max_active_tis_per_dag / 
max_active_tis_per_dagrun.
 
         :meta private:
         """
         return self._get_num_task_instances_of_state(
-            [TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED],
+            [
+                TaskInstanceState.RUNNING,
+                TaskInstanceState.DEFERRED,
+                TaskInstanceState.AWAITING_INPUT,
+            ],
             same_dagrun=same_dagrun,
             session=session,
         )
diff --git a/airflow-core/src/airflow/ti_deps/dependencies_states.py 
b/airflow-core/src/airflow/ti_deps/dependencies_states.py
index 7d5bd5091e0..21fd8ff31da 100644
--- a/airflow-core/src/airflow/ti_deps/dependencies_states.py
+++ b/airflow-core/src/airflow/ti_deps/dependencies_states.py
@@ -31,6 +31,13 @@ EXECUTION_STATES = {
 ACTIVE_STATES = {
     *EXECUTION_STATES,
     TaskInstanceState.DEFERRED,
+    # AWAITING_INPUT (HITL) is logically in-flight like DEFERRED: it counts 
towards task-level
+    # concurrency (max_active_tis_per_dag / max_active_tis_per_dagrun) and, 
like DEFERRED, is
+    # excluded from DAG-run active-task accounting. It is also excluded from 
pool slot accounting,
+    # but -- unlike DEFERRED, which a pool can opt to count via 
``include_deferred`` -- an
+    # awaiting_input task is *never* counted against pool slots: an open-ended 
human wait should
+    # not reserve one.
+    TaskInstanceState.AWAITING_INPUT,
 }
 
 # In order to be able to get queued a task must have one of these states
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 23170a462ca..fbebbf10c8e 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -6612,7 +6612,7 @@ export const $TaskInstanceResponse = {
 
 export const $TaskInstanceState = {
     type: 'string',
-    enum: ['removed', 'scheduled', 'queued', 'running', 'success', 
'restarting', 'failed', 'up_for_retry', 'up_for_reschedule', 'upstream_failed', 
'skipped', 'deferred'],
+    enum: ['removed', 'scheduled', 'queued', 'running', 'success', 
'restarting', 'failed', 'up_for_retry', 'up_for_reschedule', 'upstream_failed', 
'skipped', 'deferred', 'awaiting_input'],
     title: 'TaskInstanceState',
     description: `All possible states that a Task Instance can be in.
 
@@ -10177,10 +10177,14 @@ export const $TaskInstanceStateCount = {
         deferred: {
             type: 'integer',
             title: 'Deferred'
+        },
+        awaiting_input: {
+            type: 'integer',
+            title: 'Awaiting Input'
         }
     },
     type: 'object',
-    required: ['no_status', 'removed', 'scheduled', 'queued', 'running', 
'success', 'restarting', 'failed', 'up_for_retry', 'up_for_reschedule', 
'upstream_failed', 'skipped', 'deferred'],
+    required: ['no_status', 'removed', 'scheduled', 'queued', 'running', 
'success', 'restarting', 'failed', 'up_for_retry', 'up_for_reschedule', 
'upstream_failed', 'skipped', 'deferred', 'awaiting_input'],
     title: 'TaskInstanceStateCount',
     description: 'TaskInstance serializer for responses.'
 } as const;
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 15d50056069..f242a2235dd 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -1709,7 +1709,7 @@ export type TaskInstanceResponse = {
  *
  * Note that None is also allowed, so always use this in a type hint with 
Optional.
  */
-export type TaskInstanceState = 'removed' | 'scheduled' | 'queued' | 'running' 
| 'success' | 'restarting' | 'failed' | 'up_for_retry' | 'up_for_reschedule' | 
'upstream_failed' | 'skipped' | 'deferred';
+export type TaskInstanceState = 'removed' | 'scheduled' | 'queued' | 'running' 
| 'success' | 'restarting' | 'failed' | 'up_for_retry' | 'up_for_reschedule' | 
'upstream_failed' | 'skipped' | 'deferred' | 'awaiting_input';
 
 /**
  * Task Instance body for get batch.
@@ -2564,6 +2564,7 @@ export type TaskInstanceStateCount = {
     upstream_failed: number;
     skipped: number;
     deferred: number;
+    awaiting_input: number;
 };
 
 /**
diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json 
b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json
index 3083bc27276..95a86d0fab1 100644
--- a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json
+++ b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json
@@ -229,6 +229,7 @@
   "startDate": "Start Date",
   "state": "State",
   "states": {
+    "awaiting_input": "Awaiting Input",
     "deferred": "Deferred",
     "failed": "Failed",
     "no_status": "No Status",
diff --git a/airflow-core/src/airflow/ui/src/components/NeedsReviewButton.tsx 
b/airflow-core/src/airflow/ui/src/components/NeedsReviewButton.tsx
index c0ec70f44dd..8e48c0c6fbe 100644
--- a/airflow-core/src/airflow/ui/src/components/NeedsReviewButton.tsx
+++ b/airflow-core/src/airflow/ui/src/components/NeedsReviewButton.tsx
@@ -41,7 +41,7 @@ export const NeedsReviewButton = ({
       dagId: dagId ?? "~",
       dagRunId: runId ?? "~",
       responseReceived: false,
-      state: ["deferred"],
+      state: ["deferred", "awaiting_input"],
       taskId,
     },
     undefined,
diff --git a/airflow-core/src/airflow/ui/src/components/StateIcon.tsx 
b/airflow-core/src/airflow/ui/src/components/StateIcon.tsx
index 3fad9f62edb..abfef20f216 100644
--- a/airflow-core/src/airflow/ui/src/components/StateIcon.tsx
+++ b/airflow-core/src/airflow/ui/src/components/StateIcon.tsx
@@ -18,7 +18,14 @@
  */
 import type { IconBaseProps } from "react-icons";
 import { FiActivity, FiCalendar, FiRepeat, FiSkipForward, FiSlash, FiWatch, 
FiX } from "react-icons/fi";
-import { LuCalendarSync, LuCheck, LuCircleDashed, LuCircleFadingArrowUp, 
LuRedo2 } from "react-icons/lu";
+import {
+  LuCalendarSync,
+  LuCheck,
+  LuCircleDashed,
+  LuCircleFadingArrowUp,
+  LuRedo2,
+  LuUserRoundPen,
+} from "react-icons/lu";
 import { PiQueue } from "react-icons/pi";
 
 import type { TaskInstanceState } from "openapi/requests/types.gen";
@@ -31,6 +38,8 @@ export const StateIcon = ({ state, ...rest }: Props) => {
   // false positive eslint - we have a default.
   // eslint-disable-next-line @typescript-eslint/switch-exhaustiveness-check
   switch (state) {
+    case "awaiting_input":
+      return <LuUserRoundPen {...rest} />;
     case "deferred":
       return <FiWatch {...rest} />;
     case "failed":
diff --git a/airflow-core/src/airflow/ui/src/constants/stateOptions.ts 
b/airflow-core/src/airflow/ui/src/constants/stateOptions.ts
index b9df60a5208..f9237e1f221 100644
--- a/airflow-core/src/airflow/ui/src/constants/stateOptions.ts
+++ b/airflow-core/src/airflow/ui/src/constants/stateOptions.ts
@@ -37,6 +37,7 @@ export const taskInstanceStateOptions = createListCollection<{
     { label: "common:states.upstream_failed", value: "upstream_failed" },
     { label: "common:states.skipped", value: "skipped" },
     { label: "common:states.deferred", value: "deferred" },
+    { label: "common:states.awaiting_input", value: "awaiting_input" },
     { label: "common:states.removed", value: "removed" },
     { label: "common:states.none", value: "none" },
   ],
diff --git a/airflow-core/src/airflow/ui/src/hooks/useRequiredActionTabs.ts 
b/airflow-core/src/airflow/ui/src/hooks/useRequiredActionTabs.ts
index 39ddbb8f335..0560327fac3 100644
--- a/airflow-core/src/airflow/ui/src/hooks/useRequiredActionTabs.ts
+++ b/airflow-core/src/airflow/ui/src/hooks/useRequiredActionTabs.ts
@@ -83,7 +83,9 @@ export const useRequiredActionTabs = (
   const hasHitlData = (hitlData?.total_entries ?? 0) > 0;
   const pendingActionsCount =
     hitlData?.hitl_details.filter(
-      (hitl) => hitl.task_instance.state === "deferred" && 
!hitl.response_received,
+      (hitl) =>
+        (hitl.task_instance.state === "deferred" || hitl.task_instance.state 
=== "awaiting_input") &&
+        !hitl.response_received,
     ).length ?? 0;
 
   const processedTabs = tabs
diff --git 
a/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/TaskInstanceMetrics.tsx
 
b/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/TaskInstanceMetrics.tsx
index f36c0a4635b..617d3e62b04 100644
--- 
a/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/TaskInstanceMetrics.tsx
+++ 
b/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/TaskInstanceMetrics.tsx
@@ -43,6 +43,7 @@ const TASK_STATES: Array<keyof TaskInstanceStateCount> = [
   "up_for_reschedule",
   "upstream_failed",
   "deferred",
+  "awaiting_input",
   "no_status",
 ];
 
diff --git 
a/airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLResponseForm.test.tsx
 
b/airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLResponseForm.test.tsx
index 09d9f0efc84..4627a326703 100644
--- 
a/airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLResponseForm.test.tsx
+++ 
b/airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLResponseForm.test.tsx
@@ -140,3 +140,37 @@ describe("HITLResponseForm – option button rendering 
boundary", () => {
     expect(screen.getByTestId("hitl-option-Reject")).toBeInTheDocument();
   });
 });
+
+// ---------------------------------------------------------------------------
+// Tests: pending-state enablement
+//
+// HITL tasks park in "deferred" (pre-3.3) or "awaiting_input" (from 3.3). Both
+// are "pending a response", so the option buttons must be enabled in either
+// state. Previously the form only treated "deferred" as pending, which 
disabled
+// the response form for awaiting_input tasks.
+// ---------------------------------------------------------------------------
+describe("HITLResponseForm – pending-state enablement", () => {
+  it("enables option buttons for a deferred task", () => {
+    renderForm(["Yes", "No"], {
+      task_instance: { ...MOCK_TASK_INSTANCE, state: "deferred" },
+    });
+    expect(screen.getByTestId("hitl-option-Yes")).toBeEnabled();
+    expect(screen.getByTestId("hitl-option-No")).toBeEnabled();
+  });
+
+  it("enables option buttons for an awaiting_input task", () => {
+    renderForm(["Yes", "No"], {
+      task_instance: { ...MOCK_TASK_INSTANCE, state: "awaiting_input" },
+    });
+    expect(screen.getByTestId("hitl-option-Yes")).toBeEnabled();
+    expect(screen.getByTestId("hitl-option-No")).toBeEnabled();
+  });
+
+  it("disables option buttons for a finished task", () => {
+    renderForm(["Yes", "No"], {
+      task_instance: { ...MOCK_TASK_INSTANCE, state: "success" },
+    });
+    expect(screen.getByTestId("hitl-option-Yes")).toBeDisabled();
+    expect(screen.getByTestId("hitl-option-No")).toBeDisabled();
+  });
+});
diff --git 
a/airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLResponseForm.tsx 
b/airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLResponseForm.tsx
index 178962a4423..87bc57c1d42 100644
--- 
a/airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLResponseForm.tsx
+++ 
b/airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLResponseForm.tsx
@@ -28,7 +28,7 @@ import Time from "src/components/Time";
 import { useParamStore } from "src/queries/useParamStore";
 import { useUpdateHITLDetail } from "src/queries/useUpdateHITLDetail";
 import { DEFAULT_DATETIME_FORMAT } from "src/utils/datetimeUtils";
-import { getHITLParamsDict, getHITLFormData, getPreloadHITLFormData } from 
"src/utils/hitl";
+import { getHITLParamsDict, getHITLFormData, getPreloadHITLFormData, 
isHITLPending } from "src/utils/hitl";
 
 type HITLResponseFormProps = {
   readonly hitlDetail: {
@@ -70,7 +70,7 @@ export const HITLResponseForm = ({ hitlDetail }: 
HITLResponseFormProps) => {
   const shouldRenderOptionButton =
     hitlDetail.options.length <= 4 && !hitlDetail.multiple && 
preloadedHITLOptions.length === 0;
 
-  const isPending = hitlDetail.task_instance.state === "deferred";
+  const isPending = isHITLPending(hitlDetail.task_instance.state);
 
   const { updateHITLResponse } = useUpdateHITLDetail({
     dagId: hitlDetail.task_instance.dag_id,
diff --git 
a/airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLTaskInstances.test.tsx
 
b/airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLTaskInstances.test.tsx
index c149eac187f..534a7aa216c 100644
--- 
a/airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLTaskInstances.test.tsx
+++ 
b/airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLTaskInstances.test.tsx
@@ -148,6 +148,33 @@ describe("HITLTaskInstances – mapIndex URL param handling 
(#66428)", () => {
   });
 });
 
+// ---------------------------------------------------------------------------
+// State filter: pending required-action rows park in either "deferred" 
(pre-3.3)
+// or "awaiting_input" (from 3.3). Filtering on unreceived responses must 
request
+// both states, otherwise awaiting_input rows are hidden from the listing.
+// ---------------------------------------------------------------------------
+describe("HITLTaskInstances – pending state filter", () => {
+  it("requests both deferred and awaiting_input when filtering for unreceived 
responses", () => {
+    mockSearchParams = new URLSearchParams("response_received=false");
+
+    render(<HITLTaskInstances />, { wrapper: Wrapper });
+
+    const args = lastListingCall()?.[0] as { state?: Array<string> } | 
undefined;
+
+    expect(args?.state).toEqual(["deferred", "awaiting_input"]);
+  });
+
+  it("does not send a state filter when not filtering for unreceived 
responses", () => {
+    mockSearchParams = new URLSearchParams();
+
+    render(<HITLTaskInstances />, { wrapper: Wrapper });
+
+    const args = lastListingCall()?.[0] as { state?: Array<string> } | 
undefined;
+
+    expect(args?.state).toBeUndefined();
+  });
+});
+
 // ---------------------------------------------------------------------------
 // Refetch predicate — the API serializes `responded_at` as JSON `null`, not an
 // omitted field, so `=== undefined` never matched and the page never polled
@@ -198,6 +225,28 @@ describe("HITLTaskInstances – auto-refresh predicate", () 
=> {
     expect(result).toBe(5000);
   });
 
+  it("triggers refetch when an awaiting_input row has responded_at: null", () 
=> {
+    mockSearchParams = new URLSearchParams();
+
+    render(<HITLTaskInstances />, { wrapper: Wrapper });
+
+    const refetchInterval = getRefetchInterval();
+    const result = refetchInterval({
+      state: {
+        data: {
+          hitl_details: [
+            {
+              responded_at: null,
+              task_instance: { state: "awaiting_input" },
+            },
+          ],
+        },
+      },
+    });
+
+    expect(result).toBe(5000);
+  });
+
   it("does not refetch when every row already has a responded_at value", () => 
{
     mockSearchParams = new URLSearchParams();
 
diff --git 
a/airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLTaskInstances.tsx 
b/airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLTaskInstances.tsx
index e9e0e86e4d2..4ad3c945cf4 100644
--- 
a/airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLTaskInstances.tsx
+++ 
b/airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLTaskInstances.tsx
@@ -34,7 +34,7 @@ import { RouterLink } from "src/components/ui";
 import { SearchParamsKeys, type SearchParamsKeysType } from 
"src/constants/searchParams";
 import { useAdvancedSearchArg } from "src/hooks/useAdvancedSearch";
 import { useAutoRefresh } from "src/utils";
-import { getHITLState } from "src/utils/hitl";
+import { getHITLState, isHITLPending } from "src/utils/hitl";
 import { getTaskInstanceLink } from "src/utils/links";
 
 import { HITLFilters } from "./HITLFilters";
@@ -216,7 +216,7 @@ export const HITLTaskInstances = () => {
         Boolean(effectiveResponseReceived) && effectiveResponseReceived !== 
"all"
           ? effectiveResponseReceived === "true"
           : undefined,
-      state: effectiveResponseReceived === "false" ? ["deferred"] : undefined,
+      state: effectiveResponseReceived === "false" ? ["deferred", 
"awaiting_input"] : undefined,
       subjectSearch,
       taskId,
       ...taskIdArg,
@@ -224,15 +224,15 @@ export const HITLTaskInstances = () => {
     undefined,
     {
       // Only continue auto-refetching when filtering for unreceived responses
-      // and at least one TaskInstance is still deferred without a response.
+      // and at least one TaskInstance is still pending (parked) without a 
response.
       refetchInterval: (query) => {
-        const hasDeferredWithoutResponse = Boolean(
+        const hasPendingWithoutResponse = Boolean(
           query.state.data?.hitl_details.some(
-            (detail: HITLDetail) => detail.responded_at === null && 
detail.task_instance.state === "deferred",
+            (detail: HITLDetail) => detail.responded_at === null && 
isHITLPending(detail.task_instance.state),
           ),
         );
 
-        return hasDeferredWithoutResponse ? baseRefetchInterval : false;
+        return hasPendingWithoutResponse ? baseRefetchInterval : false;
       },
     },
   );
diff --git a/airflow-core/src/airflow/ui/src/theme.ts 
b/airflow-core/src/airflow/ui/src/theme.ts
index 5dea56794a7..8fc32ed4516 100644
--- a/airflow-core/src/airflow/ui/src/theme.ts
+++ b/airflow-core/src/airflow/ui/src/theme.ts
@@ -411,6 +411,7 @@ const defaultAirflowTheme: ThemingConfig = {
       running: generateSemanticTokens("cyan"),
       restarting: generateSemanticTokens("violet"),
       deferred: generateSemanticTokens("purple"),
+      awaiting_input: generateSemanticTokens("orange"),
       scheduled: generateSemanticTokens("zinc"),
       none: generateSemanticTokens("gray"),
       no_status: generateSemanticTokens("gray"),
diff --git a/airflow-core/src/airflow/ui/src/utils/hitl.test.ts 
b/airflow-core/src/airflow/ui/src/utils/hitl.test.ts
index daead151359..90dd1c7fdaf 100644
--- a/airflow-core/src/airflow/ui/src/utils/hitl.test.ts
+++ b/airflow-core/src/airflow/ui/src/utils/hitl.test.ts
@@ -21,7 +21,7 @@ import { describe, it, expect, vi } from "vitest";
 
 import type { HITLDetail } from "openapi/requests/types.gen";
 
-import { getHITLParamsDict } from "./hitl";
+import { getHITLParamsDict, getHITLState, isHITLPending } from "./hitl";
 
 const mockTranslate = vi.fn((key: string) => key) as unknown as TFunction;
 
@@ -48,6 +48,56 @@ const createMockHITLDetail = (overrides?: 
Partial<HITLDetail>): HITLDetail =>
     ...overrides,
   }) as HITLDetail;
 
+describe("isHITLPending", () => {
+  it("treats a deferred task as pending", () => {
+    expect(isHITLPending("deferred")).toBe(true);
+  });
+
+  it("treats an awaiting_input task as pending", () => {
+    expect(isHITLPending("awaiting_input")).toBe(true);
+  });
+
+  it("treats a finished/cleared task as not pending", () => {
+    expect(isHITLPending("success")).toBe(false);
+    expect(isHITLPending(undefined)).toBe(false);
+    expect(isHITLPending(null)).toBe(false);
+  });
+});
+
+describe("getHITLState", () => {
+  it("reports a 'required' state for an awaiting_input task without a 
response", () => {
+    const hitlDetail = createMockHITLDetail({
+      response_received: false,
+      task_instance: {
+        dag_id: "test_dag",
+        dag_run_id: "test_run",
+        map_index: -1,
+        state: "awaiting_input",
+        task_id: "test_task",
+      },
+    } as Partial<HITLDetail>);
+
+    // Empty params + non-approval options -> choice task; the point is that a 
parked
+    // awaiting_input task is "required", not "noResponseReceived".
+    expect(getHITLState(mockTranslate, 
hitlDetail)).toBe("state.choiceRequired");
+  });
+
+  it("reports no response received for a finished task without a response", () 
=> {
+    const hitlDetail = createMockHITLDetail({
+      response_received: false,
+      task_instance: {
+        dag_id: "test_dag",
+        dag_run_id: "test_run",
+        map_index: -1,
+        state: "success",
+        task_id: "test_task",
+      },
+    } as Partial<HITLDetail>);
+
+    expect(getHITLState(mockTranslate, 
hitlDetail)).toBe("state.noResponseReceived");
+  });
+});
+
 describe("getHITLParamsDict", () => {
   it("correctly types object parameters as 'object' instead of 'string'", () 
=> {
     const hitlDetail = createMockHITLDetail({
diff --git a/airflow-core/src/airflow/ui/src/utils/hitl.ts 
b/airflow-core/src/airflow/ui/src/utils/hitl.ts
index 6ea2c88eca5..ab1ab135526 100644
--- a/airflow-core/src/airflow/ui/src/utils/hitl.ts
+++ b/airflow-core/src/airflow/ui/src/utils/hitl.ts
@@ -18,7 +18,7 @@
  */
 import type { TFunction } from "i18next";
 
-import type { HITLDetail, HITLDetailHistory } from 
"openapi/requests/types.gen";
+import type { HITLDetail, HITLDetailHistory, TaskInstanceState } from 
"openapi/requests/types.gen";
 import type { ParamSchema, ParamsSpec } from "src/queries/useDagParams";
 
 export type HITLResponseParams = {
@@ -26,6 +26,12 @@ export type HITLResponseParams = {
   params_input?: Record<string, unknown>;
 };
 
+// A HITL task is "pending a response" while parked: pre-3.3 it parks in 
"deferred", from 3.3 it
+// parks in "awaiting_input". Either way an unanswered parked task is 
"response required", not
+// "no response received" (which is reserved for tasks no longer parked, e.g. 
cleared/finished).
+export const isHITLPending = (state?: TaskInstanceState | null): boolean =>
+  state === "deferred" || state === "awaiting_input";
+
 const getChosenOptionsValue = (hitlDetail: HITLDetailHistory) => {
   // if response_received is true, display the chosen_options, otherwise 
display the defaults
   const sourceValues = hitlDetail.response_received ? 
hitlDetail.chosen_options : hitlDetail.defaults;
@@ -211,11 +217,9 @@ export const getHITLState = (translate: TFunction, 
hitlDetail: HITLDetail) => {
     task_instance: { state: taskInstanceState },
   } = hitlDetail;
 
-  const isNotDeferred = taskInstanceState !== "deferred";
-
   let stateType: [string, string] = ["responseRequired", "responseReceived"];
 
-  if (!responseReceived && isNotDeferred) {
+  if (!responseReceived && !isHITLPending(taskInstanceState)) {
     return translate("state.noResponseReceived");
   }
 
diff --git a/airflow-core/src/airflow/ui/src/utils/query.ts 
b/airflow-core/src/airflow/ui/src/utils/query.ts
index 10c031f47e3..ebdfdf16f63 100644
--- a/airflow-core/src/airflow/ui/src/utils/query.ts
+++ b/airflow-core/src/airflow/ui/src/utils/query.ts
@@ -21,6 +21,7 @@ import type { TaskInstanceState } from 
"openapi/requests/types.gen";
 import { useConfig } from "src/queries/useConfig";
 
 export const isStatePending = (state?: TaskInstanceState | null) =>
+  state === "awaiting_input" ||
   state === "deferred" ||
   state === "scheduled" ||
   state === "running" ||
diff --git a/airflow-core/src/airflow/ui/src/utils/stateUtils.ts 
b/airflow-core/src/airflow/ui/src/utils/stateUtils.ts
index ea4de7a7b40..3d09c537643 100644
--- a/airflow-core/src/airflow/ui/src/utils/stateUtils.ts
+++ b/airflow-core/src/airflow/ui/src/utils/stateUtils.ts
@@ -33,6 +33,7 @@ export const STATE_PRIORITY: Array<string> = [
   "running",
   "restarting",
   "deferred",
+  "awaiting_input",
   "queued",
   "scheduled",
   "success",
diff --git a/airflow-core/src/airflow/ui/tests/e2e/utils/test-helpers.ts 
b/airflow-core/src/airflow/ui/tests/e2e/utils/test-helpers.ts
index 178a2f3caa0..b0b159fce44 100644
--- a/airflow-core/src/airflow/ui/tests/e2e/utils/test-helpers.ts
+++ b/airflow-core/src/airflow/ui/tests/e2e/utils/test-helpers.ts
@@ -371,7 +371,7 @@ export async function setupHITLFlowViaAPI(
 
   await waitForTaskInstanceState(request, {
     dagId,
-    expectedState: "deferred",
+    expectedState: "awaiting_input",
     runId: dagRunId,
     taskId: "wait_for_input",
   });
@@ -385,7 +385,7 @@ export async function setupHITLFlowViaAPI(
 
   await waitForTaskInstanceState(request, {
     dagId,
-    expectedState: "deferred",
+    expectedState: "awaiting_input",
     runId: dagRunId,
     taskId: "wait_for_option",
   });
@@ -398,7 +398,7 @@ export async function setupHITLFlowViaAPI(
 
   await waitForTaskInstanceState(request, {
     dagId,
-    expectedState: "deferred",
+    expectedState: "awaiting_input",
     runId: dagRunId,
     taskId: "wait_for_multiple_options",
   });
@@ -431,7 +431,7 @@ export async function setupHITLFlowViaAPI(
 
   await waitForTaskInstanceState(request, {
     dagId,
-    expectedState: "deferred",
+    expectedState: "awaiting_input",
     runId: dagRunId,
     taskId: "valid_input_and_options",
   });
@@ -451,7 +451,7 @@ export async function setupHITLFlowViaAPI(
   if (approve) {
     await waitForTaskInstanceState(request, {
       dagId,
-      expectedState: "deferred",
+      expectedState: "awaiting_input",
       runId: dagRunId,
       taskId: "choose_a_branch_to_run",
     });
diff --git a/airflow-core/src/airflow/utils/state.py 
b/airflow-core/src/airflow/utils/state.py
index 332efb10553..89c8efcaa3e 100644
--- a/airflow-core/src/airflow/utils/state.py
+++ b/airflow-core/src/airflow/utils/state.py
@@ -56,6 +56,7 @@ class IntermediateTIState(str, Enum):
     UP_FOR_RETRY = "up_for_retry"
     UP_FOR_RESCHEDULE = "up_for_reschedule"
     DEFERRED = "deferred"
+    AWAITING_INPUT = "awaiting_input"
 
     def __str__(self) -> str:
         return self.value
@@ -87,6 +88,7 @@ class TaskInstanceState(str, Enum):
     UPSTREAM_FAILED = TerminalTIState.UPSTREAM_FAILED  # One or more upstream 
deps failed
     SKIPPED = TerminalTIState.SKIPPED  # Skipped by branching or some other 
mechanism
     DEFERRED = IntermediateTIState.DEFERRED  # Deferrable operator waiting on 
a trigger
+    AWAITING_INPUT = IntermediateTIState.AWAITING_INPUT  # Parked waiting for 
human input (HITL)
 
     def __str__(self) -> str:
         return self.value
@@ -130,6 +132,7 @@ class State:
     UPSTREAM_FAILED = TaskInstanceState.UPSTREAM_FAILED
     SKIPPED = TaskInstanceState.SKIPPED
     DEFERRED = TaskInstanceState.DEFERRED
+    AWAITING_INPUT = TaskInstanceState.AWAITING_INPUT
 
     finished_dr_states: frozenset[DagRunState] = 
frozenset([DagRunState.SUCCESS, DagRunState.FAILED])
     unfinished_dr_states: frozenset[DagRunState] = 
frozenset([DagRunState.QUEUED, DagRunState.RUNNING])
@@ -157,6 +160,7 @@ class State:
         TaskInstanceState.REMOVED: "lightgrey",
         TaskInstanceState.SCHEDULED: "tan",
         TaskInstanceState.DEFERRED: "mediumpurple",
+        TaskInstanceState.AWAITING_INPUT: "darkorange",
     }
 
     @classmethod
@@ -200,6 +204,7 @@ class State:
             TaskInstanceState.UP_FOR_RETRY,
             TaskInstanceState.UP_FOR_RESCHEDULE,
             TaskInstanceState.DEFERRED,
+            TaskInstanceState.AWAITING_INPUT,
         ]
     )
     """
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py
index eabbb0a7559..6d21cc8c6c4 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py
@@ -31,6 +31,7 @@ from sqlalchemy.orm import Session
 from airflow._shared.timezones.timezone import utc, utcnow
 from airflow.models.hitl import HITLDetail
 from airflow.models.log import Log
+from airflow.models.taskinstance import TaskInstance as TIModel
 from airflow.sdk.execution_time.hitl import HITLUser
 from airflow.utils.state import TaskInstanceState
 
@@ -332,6 +333,60 @@ class TestUpdateHITLDetailEndpoint:
         assert audit_log is not None
         _assert_sample_audit_log(audit_log)
 
+    @time_machine.travel(datetime(2025, 7, 3, 0, 0, 0), tick=False)
+    @pytest.mark.usefixtures("sample_hitl_detail")
+    def test_response_resumes_awaiting_input_task_without_trigger(
+        self,
+        test_client: TestClient,
+        sample_ti_url_identifier: str,
+        sample_update_payload: dict[str, Any],
+        sample_ti: TaskInstance,
+        session: Session,
+    ) -> None:
+        """A human response transitions a parked AWAITING_INPUT task straight 
to SCHEDULED, no trigger."""
+        # Park the task exactly as the 3.3 operator does: AWAITING_INPUT, 
next_method set, no trigger.
+        ti = session.get(TIModel, sample_ti.id)
+        assert ti is not None
+        ti.state = TaskInstanceState.AWAITING_INPUT
+        ti.next_method = "execute_complete"
+        ti.next_kwargs = {}
+        ti.trigger_id = None
+        session.commit()
+        # Sanity: the park persisted before we respond.
+        session.expire_all()
+        parked = session.get(TIModel, sample_ti.id)
+        assert parked is not None
+        assert parked.state == TaskInstanceState.AWAITING_INPUT
+
+        response = test_client.patch(
+            f"{sample_ti_url_identifier}/hitlDetails",
+            json=sample_update_payload,
+        )
+        assert response.status_code == 200
+
+        session.expire_all()
+        refreshed = session.get(TIModel, sample_ti.id)
+        assert refreshed is not None
+        # Resumed so the scheduler re-queues execute_complete -- with no 
triggerer involved.
+        assert refreshed.state == TaskInstanceState.SCHEDULED
+        assert refreshed.trigger_id is None
+        assert refreshed.next_method == "execute_complete"
+        assert "event" in (refreshed.next_kwargs or {})
+
+    @pytest.mark.usefixtures("sample_hitl_detail")
+    def test_should_respond_400_for_invalid_option(
+        self,
+        test_client: TestClient,
+        sample_ti_url_identifier: str,
+    ) -> None:
+        """Write-side validation rejects an out-of-set option (400) instead of 
failing the task on resume."""
+        response = test_client.patch(
+            f"{sample_ti_url_identifier}/hitlDetails",
+            json={"chosen_options": ["Maybe"], "params_input": {}},
+        )
+        assert response.status_code == 400
+        assert "Invalid options" in response.json()["detail"]
+
     @time_machine.travel(datetime(2025, 7, 3, 0, 0, 0), tick=False)
     @pytest.mark.usefixtures("sample_hitl_detail_respondent")
     def test_should_respond_200_to_assigned_users(
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dashboard.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dashboard.py
index ccfc2b26f78..ebb31fff965 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dashboard.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dashboard.py
@@ -246,6 +246,7 @@ class TestHistoricalMetricsDataEndpoint:
                 {
                     "dag_run_states": {"failed": 1, "queued": 1, "running": 1, 
"success": 1},
                     "task_instance_states": {
+                        "awaiting_input": 0,
                         "deferred": 0,
                         "failed": 2,
                         "no_status": 4,
@@ -268,6 +269,7 @@ class TestHistoricalMetricsDataEndpoint:
                 {
                     "dag_run_states": {"failed": 1, "queued": 0, "running": 0, 
"success": 0},
                     "task_instance_states": {
+                        "awaiting_input": 0,
                         "deferred": 0,
                         "failed": 2,
                         "no_status": 0,
@@ -290,6 +292,7 @@ class TestHistoricalMetricsDataEndpoint:
                 {
                     "dag_run_states": {"failed": 1, "queued": 1, "running": 1, 
"success": 0},
                     "task_instance_states": {
+                        "awaiting_input": 0,
                         "deferred": 0,
                         "failed": 2,
                         "no_status": 4,
diff --git a/airflow-core/tests/unit/dag_processing/test_processor.py 
b/airflow-core/tests/unit/dag_processing/test_processor.py
index ef92372f573..c1d5c76712e 100644
--- a/airflow-core/tests/unit/dag_processing/test_processor.py
+++ b/airflow-core/tests/unit/dag_processing/test_processor.py
@@ -1944,6 +1944,7 @@ class TestDagProcessingMessageTypes:
         dag_processor_types = get_type_names(ToDagProcessor)
 
         in_supervisor_but_not_in_manager = {
+            "AwaitInputTask",
             "DeferTask",
             "DeleteXCom",
             "GetAssetByName",
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 258ce3db4cf..cca74569bdf 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -82,6 +82,7 @@ from airflow.models.dagwarning import DagWarning
 from airflow.models.db_callback_request import DbCallbackRequest
 from airflow.models.deadline import Deadline
 from airflow.models.deadline_alert import DeadlineAlert
+from airflow.models.hitl import HITLDetail
 from airflow.models.log import Log
 from airflow.models.pool import Pool
 from airflow.models.serialized_dag import SerializedDagModel
@@ -7312,6 +7313,97 @@ class TestSchedulerJob:
         assert ti1.next_method == "__fail__"
         assert ti2.state == State.DEFERRED
 
+    def test_awaiting_input_timeout_with_defaults_resumes(self, dag_maker):
+        """
+        A parked ``awaiting_input`` task past its deadline with defaults is 
resumed to SCHEDULED by
+        the scheduler sweep (the default applied as the response), while a 
not-yet-expired one is
+        left untouched. This proves HITL timeout liveness with no triggerer 
running.
+        """
+        session = settings.Session()
+        with dag_maker(
+            dag_id="test_awaiting_input_defaults",
+            start_date=DEFAULT_DATE,
+            schedule="@once",
+            session=session,
+        ):
+            EmptyOperator(task_id="dummy1")
+        dr_past = dag_maker.create_dagrun()
+        dr_future = dag_maker.create_dagrun(
+            run_id="future", logical_date=DEFAULT_DATE + 
datetime.timedelta(seconds=1)
+        )
+        ti_past = dr_past.get_task_instance("dummy1", session=session)
+        ti_future = dr_future.get_task_instance("dummy1", session=session)
+        for ti, offset in ((ti_past, -60), (ti_future, 60)):
+            ti.state = State.AWAITING_INPUT
+            ti.trigger_timeout = timezone.utcnow() + 
datetime.timedelta(seconds=offset)
+            ti.next_method = "execute_complete"
+            ti.next_kwargs = {}
+            session.add(
+                HITLDetail(
+                    ti_id=ti.id,
+                    options=["Approve", "Reject"],
+                    subject="approve?",
+                    defaults=["Approve"],
+                    multiple=False,
+                    params={},
+                )
+            )
+        session.flush()
+
+        self.job_runner = SchedulerJobRunner(job=Job())
+        self.job_runner.check_awaiting_input_timeouts(session=session)
+
+        session.refresh(ti_past)
+        session.refresh(ti_future)
+        # Past-deadline task resumed with the default applied; the future one 
is left parked.
+        assert ti_past.state == State.SCHEDULED
+        assert ti_past.next_method == "execute_complete"
+        assert ti_past.next_kwargs["event"]["chosen_options"] == ["Approve"]
+        assert ti_past.next_kwargs["event"]["timedout"] is True
+        assert ti_future.state == State.AWAITING_INPUT
+        hitl_detail = session.get(HITLDetail, ti_past.id)
+        assert hitl_detail.chosen_options == ["Approve"]
+        assert hitl_detail.responded_by is None
+        assert hitl_detail.responded_at is not None
+
+    def test_awaiting_input_timeout_without_defaults_fails(self, dag_maker):
+        """A parked ``awaiting_input`` task past its deadline with no defaults 
is failed by the sweep."""
+        session = settings.Session()
+        with dag_maker(
+            dag_id="test_awaiting_input_nodefaults",
+            start_date=DEFAULT_DATE,
+            schedule="@once",
+            session=session,
+        ):
+            EmptyOperator(task_id="dummy1")
+        dr = dag_maker.create_dagrun()
+        ti = dr.get_task_instance("dummy1", session=session)
+        ti.state = State.AWAITING_INPUT
+        ti.trigger_timeout = timezone.utcnow() - datetime.timedelta(seconds=60)
+        ti.next_method = "execute_complete"
+        ti.next_kwargs = {}
+        session.add(
+            HITLDetail(
+                ti_id=ti.id,
+                options=["Approve", "Reject"],
+                subject="approve?",
+                defaults=None,
+                multiple=False,
+                params={},
+            )
+        )
+        session.flush()
+
+        self.job_runner = SchedulerJobRunner(job=Job())
+        self.job_runner.check_awaiting_input_timeouts(session=session)
+
+        session.refresh(ti)
+        # Resumed into execute_complete with a timeout failure event (raises 
HITLTimeoutError on
+        # resume), rather than the generic __fail__ deferral-timeout path.
+        assert ti.state == State.SCHEDULED
+        assert ti.next_method == "execute_complete"
+        assert ti.next_kwargs["event"]["error_type"] == "timeout"
+
     def test_retry_on_db_error_when_update_timeout_triggers(self, dag_maker, 
testing_dag_bundle, session):
         """
         Tests that it will retry on DB error like deadlock when updating 
timeout triggers.
diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py 
b/airflow-core/tests/unit/jobs/test_triggerer_job.py
index eab9540df3c..4984c0864a8 100644
--- a/airflow-core/tests/unit/jobs/test_triggerer_job.py
+++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py
@@ -1907,6 +1907,7 @@ class TestTriggererMessageTypes:
         trigger_runner_types = get_type_names(ToTriggerRunner)
 
         in_supervisor_but_not_in_trigger_supervisor = {
+            "AwaitInputTask",
             "DeferTask",
             "GetAssetByName",
             "GetAssetByUri",
diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py 
b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
index fff609bcaea..4aac0e9a6fa 100644
--- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
+++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
@@ -994,6 +994,7 @@ class TaskInstanceState(str, Enum):
     UPSTREAM_FAILED = "upstream_failed"
     SKIPPED = "skipped"
     DEFERRED = "deferred"
+    AWAITING_INPUT = "awaiting_input"
 
 
 class TaskInstancesBatchBody(BaseModel):
diff --git 
a/devel-common/src/docs/diagrams/task_lifecycle/awaiting_input_state.png 
b/devel-common/src/docs/diagrams/task_lifecycle/awaiting_input_state.png
new file mode 100644
index 00000000000..3ba1c879ebe
Binary files /dev/null and 
b/devel-common/src/docs/diagrams/task_lifecycle/awaiting_input_state.png differ
diff --git 
a/providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml 
b/providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml
index a09d22b76fd..8c0c6a3eacd 100644
--- 
a/providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml
+++ 
b/providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml
@@ -1341,6 +1341,7 @@ components:
       - upstream_failed
       - skipped
       - deferred
+      - awaiting_input
       title: TaskInstanceState
       description: 'All possible states that a Task Instance can be in.
 
diff --git 
a/providers/standard/src/airflow/providers/standard/operators/hitl.py 
b/providers/standard/src/airflow/providers/standard/operators/hitl.py
index bef40d504a3..ce42221e7d9 100644
--- a/providers/standard/src/airflow/providers/standard/operators/hitl.py
+++ b/providers/standard/src/airflow/providers/standard/operators/hitl.py
@@ -21,7 +21,11 @@ import warnings
 
 from airflow.exceptions import AirflowProviderDeprecationWarning
 from airflow.providers.common.compat.sdk import 
AirflowOptionalProviderFeatureException
-from airflow.providers.standard.version_compat import AIRFLOW_V_3_1_3_PLUS, 
AIRFLOW_V_3_1_PLUS
+from airflow.providers.standard.version_compat import (
+    AIRFLOW_V_3_1_3_PLUS,
+    AIRFLOW_V_3_1_PLUS,
+    AIRFLOW_V_3_3_PLUS,
+)
 
 if not AIRFLOW_V_3_1_PLUS:
     raise AirflowOptionalProviderFeatureException("Human in the loop 
functionality needs Airflow 3.1+.")
@@ -40,6 +44,11 @@ from airflow.sdk.definitions.param import ParamsDict
 from airflow.sdk.execution_time.hitl import upsert_hitl_detail
 from airflow.sdk.timezone import utcnow
 
+if AIRFLOW_V_3_3_PLUS:
+    # On Airflow 3.3+ the operator parks the task in the first-class 
AWAITING_INPUT state instead of
+    # deferring to a trigger. On older cores this name is absent and the 
defer() fallback is used.
+    from airflow.sdk.exceptions import TaskAwaitingInput
+
 if TYPE_CHECKING:
     from airflow.providers.common.compat.sdk import Context
     from airflow.sdk.execution_time.hitl import HITLUser
@@ -58,9 +67,10 @@ class HITLOperator(BaseOperator):
     :param params: dictionary of parameter definitions that are in the format 
of Dag params such that
         a Form Field can be rendered. Entered data is validated (schema, 
required fields) like for a Dag run
         and added to XCom of the task result.
-    :param response_timeout: Maximum time to wait for a human response after 
deferring to the trigger.
-        This is separate from ``execution_timeout`` which controls the 
pre-defer execution phase.
-        If not set, no timeout is applied to the human response wait.
+    :param response_timeout: Maximum time to wait for a human response. On 
Airflow 3.3+ this is
+        enforced by the scheduler's ``awaiting_input`` timeout sweep; on older 
versions it is passed
+        to the triggerer. This is separate from ``execution_timeout``, which 
controls the execution
+        phase before the task starts waiting. If not set, no timeout is 
applied to the human response wait.
     """
 
     template_fields: Collection[str] = ("subject", "body")
@@ -174,7 +184,13 @@ class HITLOperator(BaseOperator):
                 raise ValueError('More than one defaults given when "multiple" 
is set to False.')
 
     def execute(self, context: Context):
-        """Add a Human-in-the-loop Response and then defer to HITLTrigger and 
wait for user input."""
+        """
+        Write the Human-in-the-loop request, then wait for a user response.
+
+        On Airflow 3.3+ the task waits in the ``awaiting_input`` state with no 
trigger or triggerer
+        involved; on older versions it defers to :class:`HITLTrigger`. Either 
way it resumes in
+        ``execute_complete`` once a response (or timeout default) arrives.
+        """
         ti_id = context["task_instance"].id
         # Write Human-in-the-loop input request to DB
         upsert_hitl_detail(
@@ -200,7 +216,16 @@ class HITLOperator(BaseOperator):
         for notifier in self.notifiers:
             notifier(context)
 
-        # Defer the Human-in-the-loop response checking process to HITLTrigger
+        if AIRFLOW_V_3_3_PLUS:
+            # New core (3.3+): park the task in AWAITING_INPUT -- no trigger, 
no triggerer. The task
+            # is resumed by the Core API response handler or the scheduler 
timeout sweep, so the
+            # triggerer no longer needs to run for Human-in-the-loop tasks to 
make progress.
+            raise TaskAwaitingInput(
+                method_name="execute_complete",
+                timeout=self.response_timeout,
+            )
+
+        # Fallback for cores < 3.3: defer the response check to HITLTrigger on 
the triggerer.
         self.defer(
             trigger=HITLTrigger(
                 ti_id=ti_id,
diff --git a/providers/standard/tests/unit/standard/operators/test_hitl.py 
b/providers/standard/tests/unit/standard/operators/test_hitl.py
index fb6839e0967..7b7d5941b60 100644
--- a/providers/standard/tests/unit/standard/operators/test_hitl.py
+++ b/providers/standard/tests/unit/standard/operators/test_hitl.py
@@ -20,7 +20,11 @@ from uuid import UUID, uuid4
 
 import pytest
 
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_1_PLUS, 
AIRFLOW_V_3_2_PLUS
+from tests_common.test_utils.version_compat import (
+    AIRFLOW_V_3_1_PLUS,
+    AIRFLOW_V_3_2_PLUS,
+    AIRFLOW_V_3_3_PLUS,
+)
 
 if not AIRFLOW_V_3_1_PLUS:
     pytest.skip("Human in the loop is only compatible with Airflow >= 3.1.0", 
allow_module_level=True)
@@ -47,10 +51,14 @@ from airflow.providers.standard.operators.hitl import (
 from airflow.sdk import Param, timezone
 from airflow.sdk.definitions.param import ParamsDict
 from airflow.sdk.execution_time.hitl import HITLUser
+from airflow.utils.state import TaskInstanceState
 
 from tests_common.test_utils.config import conf_vars
 from tests_common.test_utils.version_compat import AIRFLOW_V_3_1_3_PLUS
 
+if AIRFLOW_V_3_3_PLUS:
+    from airflow.sdk.exceptions import TaskAwaitingInput
+
 if TYPE_CHECKING:
     from sqlalchemy.orm import Session
 
@@ -65,6 +73,23 @@ DEFAULT_DATE = timezone.datetime(2016, 1, 1)
 INTERVAL = datetime.timedelta(hours=12)
 
 
+def _run_execute_until_park(op: HITLOperator, context: Any) -> None:
+    """
+    Run ``execute()`` through the HITL pause point.
+
+    Tolerates both the Airflow 3.3+ behavior (raises ``TaskAwaitingInput`` to 
park the task in
+    AWAITING_INPUT) and the < 3.3 fallback (calls ``self.defer()``, mocked 
here), so the
+    ``hitl_summary`` enriched just before the pause can be asserted afterwards.
+    """
+    with patch("airflow.providers.standard.operators.hitl.upsert_hitl_detail"):
+        if AIRFLOW_V_3_3_PLUS:
+            with pytest.raises(TaskAwaitingInput):
+                op.execute(context)
+        else:
+            with patch.object(op, "defer"):
+                op.execute(context)
+
+
 @pytest.fixture
 def hitl_task_and_ti_for_generating_link(dag_maker: DagMaker) -> 
tuple[HITLOperator, TaskInstance]:
     with dag_maker("test_dag"):
@@ -284,19 +309,31 @@ class TestHITLOperator:
         else:
             expected_params_in_trigger_kwargs = {"input_1": {"value": 1, 
"description": None, "schema": {}}}
 
-        registered_trigger = session.scalar(
-            select(Trigger).where(Trigger.classpath == 
"airflow.providers.standard.triggers.hitl.HITLTrigger")
-        )
-        assert registered_trigger is not None
-        assert registered_trigger.kwargs == {
-            "ti_id": expected_ti_id,
-            "options": ["1", "2", "3", "4", "5"],
-            "defaults": ["1"],
-            "params": expected_params_in_trigger_kwargs,
-            "multiple": False,
-            "timeout_datetime": None,
-            "poke_interval": 5.0,
-        }
+        if AIRFLOW_V_3_3_PLUS:
+            # On Airflow 3.3+ the task parks in AWAITING_INPUT with no trigger 
/ triggerer.
+            assert ti.state == TaskInstanceState.AWAITING_INPUT
+            registered_trigger = session.scalar(
+                select(Trigger).where(
+                    Trigger.classpath == 
"airflow.providers.standard.triggers.hitl.HITLTrigger"
+                )
+            )
+            assert registered_trigger is None
+        else:
+            registered_trigger = session.scalar(
+                select(Trigger).where(
+                    Trigger.classpath == 
"airflow.providers.standard.triggers.hitl.HITLTrigger"
+                )
+            )
+            assert registered_trigger is not None
+            assert registered_trigger.kwargs == {
+                "ti_id": expected_ti_id,
+                "options": ["1", "2", "3", "4", "5"],
+                "defaults": ["1"],
+                "params": expected_params_in_trigger_kwargs,
+                "multiple": False,
+                "timeout_datetime": None,
+                "poke_interval": 5.0,
+            }
 
     @pytest.mark.skipif(not AIRFLOW_V_3_1_3_PLUS, reason="This only works in 
airflow-core >= 3.1.3")
     @pytest.mark.parametrize(
@@ -1058,11 +1095,7 @@ class TestHITLSummaryForListeners:
             response_timeout=datetime.timedelta(minutes=10),
         )
 
-        with (
-            
patch("airflow.providers.standard.operators.hitl.upsert_hitl_detail"),
-            patch.object(op, "defer"),
-        ):
-            op.execute({"task_instance": MagicMock(id=uuid4())})  # type: 
ignore[arg-type]
+        _run_execute_until_park(op, {"task_instance": MagicMock(id=uuid4())})
 
         s = op.hitl_summary
         # Validate the timeout value is a parseable ISO string
@@ -1088,11 +1121,7 @@ class TestHITLSummaryForListeners:
             options=["OK"],
         )
 
-        with (
-            
patch("airflow.providers.standard.operators.hitl.upsert_hitl_detail"),
-            patch.object(op, "defer"),
-        ):
-            op.execute({"task_instance": MagicMock(id=uuid4())})  # type: 
ignore[arg-type]
+        _run_execute_until_park(op, {"task_instance": MagicMock(id=uuid4())})
 
         assert op.hitl_summary == {
             "subject": "Review",
@@ -1321,11 +1350,7 @@ class TestHITLSummaryForListeners:
         }
 
         # -- After execute (mocked defer): timeout_datetime added --
-        with (
-            
patch("airflow.providers.standard.operators.hitl.upsert_hitl_detail"),
-            patch.object(op, "defer"),
-        ):
-            op.execute({"task_instance": MagicMock(id=uuid4())})  # type: 
ignore[arg-type]
+        _run_execute_until_park(op, {"task_instance": MagicMock(id=uuid4())})
 
         s = op.hitl_summary
         timeout_dt_str = s["timeout_datetime"]
diff --git a/task-sdk/src/airflow/sdk/api/client.py 
b/task-sdk/src/airflow/sdk/api/client.py
index 4a3160d6ecd..2a4bb63fd5d 100644
--- a/task-sdk/src/airflow/sdk/api/client.py
+++ b/task-sdk/src/airflow/sdk/api/client.py
@@ -69,6 +69,7 @@ from airflow.sdk.api.datamodels._generated import (
     TaskStorePutBody,
     TaskStoreResponse,
     TerminalStateNonSuccess,
+    TIAwaitingInputStatePayload,
     TIDeferredStatePayload,
     TIEnterRunningPayload,
     TIHeartbeatInfo,
@@ -307,6 +308,11 @@ class TaskInstanceOperations:
         # Create a deferred state payload from msg
         self.client.patch(f"task-instances/{id}/state", 
content=body.model_dump_json())
 
+    def await_input(self, id: uuid.UUID, msg):
+        """Tell the API server that this TI is parked awaiting human input 
(Human-in-the-loop)."""
+        body = 
TIAwaitingInputStatePayload(**msg.model_dump(exclude_unset=True, 
exclude={"type"}))
+        self.client.patch(f"task-instances/{id}/state", 
content=body.model_dump_json())
+
     def reschedule(self, id: uuid.UUID, msg: RescheduleTask):
         """Tell the API server that this TI has been reschduled."""
         body = TIRescheduleStatePayload(**msg.model_dump(exclude_unset=True, 
exclude={"type"}))
diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py 
b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
index e1ac21585bf..9bdd8ac4fba 100644
--- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
+++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
@@ -215,6 +215,7 @@ class IntermediateTIState(str, Enum):
     UP_FOR_RETRY = "up_for_retry"
     UP_FOR_RESCHEDULE = "up_for_reschedule"
     DEFERRED = "deferred"
+    AWAITING_INPUT = "awaiting_input"
 
 
 class PrevSuccessfulDagRunResponse(BaseModel):
@@ -245,6 +246,21 @@ class PreviousTIResponse(BaseModel):
     duration: Annotated[float | None, Field(title="Duration")] = None
 
 
+class TIAwaitingInputStatePayload(BaseModel):
+    """
+    Schema for parking a TaskInstance in an awaiting_input state 
(Human-in-the-loop, no trigger).
+    """
+
+    model_config = ConfigDict(
+        extra="forbid",
+    )
+    state: Annotated[Literal["awaiting_input"] | None, Field(title="State")] = 
"awaiting_input"
+    timeout: Annotated[timedelta | None, Field(title="Timeout")] = None
+    next_method: Annotated[str, Field(title="Next Method")]
+    next_kwargs: Annotated[dict[str, JsonValue] | None, Field(title="Next 
Kwargs")] = None
+    rendered_map_index: Annotated[str | None, Field(title="Rendered Map 
Index")] = None
+
+
 class TIDeferredStatePayload(BaseModel):
     """
     Schema for updating TaskInstance to a deferred state.
@@ -382,6 +398,7 @@ class TaskInstanceState(str, Enum):
     UPSTREAM_FAILED = "upstream_failed"
     SKIPPED = "skipped"
     DEFERRED = "deferred"
+    AWAITING_INPUT = "awaiting_input"
 
 
 class TaskStatesResponse(BaseModel):
diff --git a/task-sdk/src/airflow/sdk/exceptions.py 
b/task-sdk/src/airflow/sdk/exceptions.py
index 0c211de4e36..6f43d5421ec 100644
--- a/task-sdk/src/airflow/sdk/exceptions.py
+++ b/task-sdk/src/airflow/sdk/exceptions.py
@@ -236,6 +236,48 @@ class TaskDeferred(BaseException):
         return f"<TaskDeferred trigger={self.trigger} 
method={self.method_name}>"
 
 
+class TaskAwaitingInput(BaseException):
+    """
+    Signal an operator parking the task in awaiting_input state 
(Human-in-the-loop).
+
+    Raised to signal that the operator wishes to pause until external human 
input arrives,
+    WITHOUT creating a trigger or involving the triggerer. Resumption is 
driven by the Core API
+    response handler (or the scheduler timeout sweep) flipping the task 
instance back to SCHEDULED
+    with ``next_method`` / ``next_kwargs`` intact, after which the worker calls
+    ``resume_execution(method_name, kwargs)``.
+
+    Subclasses ``BaseException`` (like ``TaskDeferred``) so that a user 
``except Exception`` in
+    ``execute()`` cannot accidentally swallow the park signal.
+    """
+
+    def __init__(
+        self,
+        *,
+        method_name: str,
+        kwargs: dict[str, Any] | None = None,
+        timeout=None,
+    ):
+        super().__init__()
+        self.method_name = method_name
+        self.kwargs = kwargs
+        self.timeout = timeout
+
+    def serialize(self):
+        cls = self.__class__
+        return (
+            f"{cls.__module__}.{cls.__name__}",
+            (),
+            {
+                "method_name": self.method_name,
+                "kwargs": self.kwargs,
+                "timeout": self.timeout,
+            },
+        )
+
+    def __repr__(self) -> str:
+        return f"<TaskAwaitingInput method={self.method_name}>"
+
+
 class TaskDeferralError(AirflowException):
     """Raised when a task failed during deferral for some reason."""
 
diff --git a/task-sdk/src/airflow/sdk/execution_time/comms.py 
b/task-sdk/src/airflow/sdk/execution_time/comms.py
index 0e0483be70e..330fe934c7c 100644
--- a/task-sdk/src/airflow/sdk/execution_time/comms.py
+++ b/task-sdk/src/airflow/sdk/execution_time/comms.py
@@ -83,6 +83,7 @@ from airflow.sdk.api.datamodels._generated import (
     TaskInstanceState,
     TaskStatesResponse,
     TaskStoreResponse,
+    TIAwaitingInputStatePayload,
     TIDeferredStatePayload,
     TIRescheduleStatePayload,
     TIRetryStatePayload,
@@ -837,6 +838,12 @@ class DeferTask(TIDeferredStatePayload):
     type: Literal["DeferTask"] = "DeferTask"
 
 
+class AwaitInputTask(TIAwaitingInputStatePayload):
+    """Park a task instance awaiting human input (Human-in-the-loop), without 
a trigger."""
+
+    type: Literal["AwaitInputTask"] = "AwaitInputTask"
+
+
 class RetryTask(TIRetryStatePayload):
     """Update a task instance state to up_for_retry."""
 
@@ -1193,7 +1200,8 @@ class GetDag(BaseModel):
 
 
 ToSupervisor = Annotated[
-    ClearAssetStoreByName
+    AwaitInputTask
+    | ClearAssetStoreByName
     | ClearAssetStoreByUri
     | ClearTaskStore
     | DeferTask
diff --git a/task-sdk/src/airflow/sdk/execution_time/schema/schema.json 
b/task-sdk/src/airflow/sdk/execution_time/schema/schema.json
index 094c3f94d9e..c02aa76791e 100644
--- a/task-sdk/src/airflow/sdk/execution_time/schema/schema.json
+++ b/task-sdk/src/airflow/sdk/execution_time/schema/schema.json
@@ -342,6 +342,80 @@
       "title": "AssetsByAliasResult",
       "type": "object"
     },
+    "AwaitInputTask": {
+      "additionalProperties": false,
+      "description": "Park a task instance awaiting human input 
(Human-in-the-loop), without a trigger.",
+      "properties": {
+        "state": {
+          "anyOf": [
+            {
+              "const": "awaiting_input",
+              "type": "string"
+            },
+            {
+              "type": "null"
+            }
+          ],
+          "default": "awaiting_input",
+          "title": "State"
+        },
+        "timeout": {
+          "anyOf": [
+            {
+              "format": "duration",
+              "type": "string"
+            },
+            {
+              "type": "null"
+            }
+          ],
+          "default": null,
+          "title": "Timeout"
+        },
+        "next_method": {
+          "title": "Next Method",
+          "type": "string"
+        },
+        "next_kwargs": {
+          "anyOf": [
+            {
+              "additionalProperties": {
+                "$ref": "#/$defs/JsonValue"
+              },
+              "type": "object"
+            },
+            {
+              "type": "null"
+            }
+          ],
+          "default": null,
+          "title": "Next Kwargs"
+        },
+        "rendered_map_index": {
+          "anyOf": [
+            {
+              "type": "string"
+            },
+            {
+              "type": "null"
+            }
+          ],
+          "default": null,
+          "title": "Rendered Map Index"
+        },
+        "type": {
+          "const": "AwaitInputTask",
+          "default": "AwaitInputTask",
+          "title": "Type",
+          "type": "string"
+        }
+      },
+      "required": [
+        "next_method"
+      ],
+      "title": "AwaitInputTask",
+      "type": "object"
+    },
     "BundleInfo": {
       "description": "Schema for telling task which bundle to run with.",
       "properties": {
@@ -3989,7 +4063,8 @@
         "up_for_reschedule",
         "upstream_failed",
         "skipped",
-        "deferred"
+        "deferred",
+        "awaiting_input"
       ],
       "title": "TaskInstanceState",
       "type": "string"
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py 
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index 0005c36bc7e..52821579759 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -63,6 +63,7 @@ from airflow.sdk.execution_time.comms import (
     AssetEventsResult,
     AssetResult,
     AssetStoreResult,
+    AwaitInputTask,
     ClearAssetStoreByName,
     ClearAssetStoreByUri,
     ClearTaskStore,
@@ -197,6 +198,7 @@ SERVER_TERMINATED = "SERVER_TERMINATED"
 STATES_SENT_DIRECTLY: frozenset[TaskInstanceState | str] = frozenset(
     {
         TaskInstanceState.DEFERRED,
+        TaskInstanceState.AWAITING_INPUT,
         TaskInstanceState.UP_FOR_RESCHEDULE,
         TaskInstanceState.UP_FOR_RETRY,
         TaskInstanceState.SUCCESS,
@@ -1289,9 +1291,9 @@ class ActivitySubprocess(WatchedSubprocess):
     # falling back to `finish()`, which doesn't accept SUCCESS / DEFERRED /
     # SERVER_TERMINATED on the server side. Cleared (and `_terminal_state`
     # set) only after the API call returns successfully.
-    _pending_terminal_state_msg: SucceedTask | RetryTask | DeferTask | 
RescheduleTask | None = attrs.field(
-        default=None, init=False
-    )
+    _pending_terminal_state_msg: (
+        SucceedTask | RetryTask | DeferTask | RescheduleTask | AwaitInputTask 
| None
+    ) = attrs.field(default=None, init=False)
 
     _last_successful_heartbeat: float = attrs.field(default=0, init=False)
     _last_heartbeat_attempt: float = attrs.field(default=0, init=False)
@@ -1455,7 +1457,9 @@ class ActivitySubprocess(WatchedSubprocess):
                 rendered_map_index=self._rendered_map_index,
             )
 
-    def _send_terminal_state_msg(self, msg: SucceedTask | RetryTask | 
DeferTask | RescheduleTask) -> None:
+    def _send_terminal_state_msg(
+        self, msg: SucceedTask | RetryTask | DeferTask | RescheduleTask | 
AwaitInputTask
+    ) -> None:
         # Capture the message BEFORE the API call so the recovery dispatcher
         # in `update_task_state_if_needed` can re-issue it if the call raises
         # (network blip, transient server 5xx). Clear the pending slot and
@@ -1485,6 +1489,9 @@ class ActivitySubprocess(WatchedSubprocess):
         elif isinstance(msg, RescheduleTask):
             self.client.task_instances.reschedule(self.id, msg)
             self._terminal_state = TaskInstanceState.UP_FOR_RESCHEDULE
+        elif isinstance(msg, AwaitInputTask):
+            self.client.task_instances.await_input(self.id, msg)
+            self._terminal_state = TaskInstanceState.AWAITING_INPUT
         self._pending_terminal_state_msg = None
 
     def _replay_pending_terminal_state_msg(self) -> None:
@@ -1707,6 +1714,9 @@ class ActivitySubprocess(WatchedSubprocess):
         elif isinstance(msg, DeferTask):
             self._rendered_map_index = msg.rendered_map_index
             self._send_terminal_state_msg(msg)
+        elif isinstance(msg, AwaitInputTask):
+            self._rendered_map_index = msg.rendered_map_index
+            self._send_terminal_state_msg(msg)
         elif isinstance(msg, RescheduleTask):
             self._send_terminal_state_msg(msg)
         elif isinstance(msg, SkipDownstreamTasks):
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py 
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index 6f48756433b..123667bfa1d 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -77,11 +77,13 @@ from airflow.sdk.exceptions import (
     AirflowRuntimeError,
     AirflowTaskTimeout,
     ErrorType,
+    TaskAwaitingInput,
     TaskDeferred,
 )
 from airflow.sdk.execution_time.callback_runner import create_executable_runner
 from airflow.sdk.execution_time.comms import (
     AssetEventDagRunReferenceResult,
+    AwaitInputTask,
     CommsDecoder,
     DagResult,
     DagRunStateResult,
@@ -1368,6 +1370,29 @@ def _defer_task(
     return msg, state
 
 
+def _await_input_task(
+    awaiting: TaskAwaitingInput, ti: RuntimeTaskInstance, log: Logger
+) -> tuple[ToSupervisor, TaskInstanceState]:
+    """Build the message that parks a task in AWAITING_INPUT (HITL), with no 
trigger."""
+    log.info("Pausing task as AWAITING_INPUT.", dag_id=ti.dag_id, 
task_id=ti.task_id, run_id=ti.run_id)
+
+    from airflow.sdk.serde import serialize as serde_serialize
+
+    next_kwargs = serde_serialize(awaiting.kwargs or {})
+
+    if TYPE_CHECKING:
+        assert isinstance(next_kwargs, dict)
+
+    msg = AwaitInputTask(
+        timeout=awaiting.timeout,
+        next_method=awaiting.method_name,
+        next_kwargs=next_kwargs,
+    )
+    state = TaskInstanceState.AWAITING_INPUT
+
+    return msg, state
+
+
 @Sentry.enrich_errors
 @detail_span("run")
 def run(
@@ -1386,6 +1411,7 @@ def run(
         AirflowTaskTerminated,
         DagRunTriggerException,
         DownstreamTasksSkipped,
+        TaskAwaitingInput,
         TaskDeferred,
     )
 
@@ -1470,6 +1496,9 @@ def run(
     except TaskDeferred as defer:
         log.info("::group::Post Execute")
         msg, state = _defer_task(defer, ti, log)
+    except TaskAwaitingInput as awaiting:
+        log.info("::group::Post Execute")
+        msg, state = _await_input_task(awaiting, ti, log)
     except AirflowSkipException as e:
         log.info("::group::Post Execute")
         if e.args:
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py 
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index b72fbdb1e30..62a9d00fdf2 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -74,6 +74,7 @@ from airflow.sdk.execution_time.comms import (
     AssetResult,
     AssetsByAliasResult,
     AssetStoreResult,
+    AwaitInputTask,
     ClearAssetStoreByName,
     ClearAssetStoreByUri,
     ClearTaskStore,
@@ -1717,6 +1718,14 @@ REQUEST_TEST_CASES = [
             args=(TI_ID, DeferTask(next_method="execute_callback", 
classpath="my-classpath")),
         ),
     ),
+    RequestTestCase(
+        message=AwaitInputTask(next_method="execute_complete"),
+        test_id="patch_task_instance_to_awaiting_input",
+        client_mock=ClientMock(
+            method_path="task_instances.await_input",
+            args=(TI_ID, AwaitInputTask(next_method="execute_complete")),
+        ),
+    ),
     RequestTestCase(
         message=RescheduleTask(
             reschedule_date=timezone.parse("2024-10-31T12:00:00Z"),

Reply via email to