This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new fc36f89a665 feat: Add OpenLineage support for HITL operators (#61867)
fc36f89a665 is described below
commit fc36f89a665cba1ab9696765cb88fe927d1fa400
Author: Kacper Muda <[email protected]>
AuthorDate: Mon Feb 16 00:41:30 2026 +0100
feat: Add OpenLineage support for HITL operators (#61867)
---
.../airflow/providers/openlineage/utils/utils.py | 1 +
.../tests/unit/openlineage/utils/test_utils.py | 4 +
.../airflow/providers/standard/operators/hitl.py | 31 ++
.../tests/unit/standard/operators/test_hitl.py | 461 ++++++++++++++++++++-
4 files changed, 495 insertions(+), 2 deletions(-)
diff --git
a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
index 0ad0f0e88bc..6c6f5c6c670 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
@@ -861,6 +861,7 @@ class TaskInfo(InfoJsonEncodable):
"postoperator", # SQLInsertRowsOperator
"table_name_with_schema", # SQLInsertRowsOperator
"column_names", # SQLInsertRowsOperator
+ "hitl_summary", # All HITLOperator based operators
]
casts = {
"operator_class": lambda task: task.task_type,
diff --git a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
index cb3eaee8218..914b928a50e 100644
--- a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
+++ b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
@@ -2474,6 +2474,7 @@ def test_task_info_af3():
self.tol = "tol" # SQLValueCheckOperator
self.trigger_dag_id = "trigger_dag_id" # TriggerDagRunOperator
self.trigger_run_id = "trigger_run_id" # TriggerDagRunOperator
+ self.hitl_summary = "hitl_summary" # HITLOperator
super().__init__(*args, **kwargs)
with DAG(
@@ -2512,6 +2513,7 @@ def test_task_info_af3():
"downstream_task_ids": "['task_1']",
"execution_timeout": None,
"executor_config": {},
+ "hitl_summary": "hitl_summary",
"ignore_first_depends_on_past": False,
"inlets": "[{'uri': 'uri1', 'extra': {'a': 1}}]",
"mapped": False,
@@ -2599,6 +2601,7 @@ def test_task_info_af2():
self.tol = "tol" # SQLValueCheckOperator
self.trigger_dag_id = "trigger_dag_id" # TriggerDagRunOperator
self.trigger_run_id = "trigger_run_id" # TriggerDagRunOperator
+ self.hitl_summary = "hitl_summary" # HITLOperator
super().__init__(*args, **kwargs)
with DAG(
@@ -2637,6 +2640,7 @@ def test_task_info_af2():
"downstream_task_ids": "['task_1']",
"execution_timeout": None,
"executor_config": {},
+ "hitl_summary": "hitl_summary",
"ignore_first_depends_on_past": True,
"is_setup": False,
"is_teardown": False,
diff --git
a/providers/standard/src/airflow/providers/standard/operators/hitl.py
b/providers/standard/src/airflow/providers/standard/operators/hitl.py
index 842b448b08e..fc999bf8cd2 100644
--- a/providers/standard/src/airflow/providers/standard/operators/hitl.py
+++ b/providers/standard/src/airflow/providers/standard/operators/hitl.py
@@ -101,6 +101,17 @@ class HITLOperator(BaseOperator):
self.validate_params()
self.validate_defaults()
+ # HITL summary for the use of listeners; subclasses can extend it.
+ self.hitl_summary: dict[str, Any] = {
+ "subject": self.subject,
+ "body": self.body,
+ "options": self.options,
+ "defaults": self.defaults,
+ "multiple": self.multiple,
+ "assigned_users": self.assigned_users,
+ "serialized_params": self.serialized_params or None,
+ }
+
def validate_options(self) -> None:
"""
Validate the `options` attribute of the instance.
@@ -156,6 +167,9 @@ class HITLOperator(BaseOperator):
else:
timeout_datetime = None
+ # Enrich summary with runtime info
+ self.hitl_summary["timeout_datetime"] = timeout_datetime.isoformat()
if timeout_datetime else None
+
self.log.info("Waiting for response")
for notifier in self.notifiers:
notifier(context)
@@ -181,12 +195,23 @@ class HITLOperator(BaseOperator):
def execute_complete(self, context: Context, event: dict[str, Any]) -> Any:
if "error" in event:
+ self.hitl_summary["error_type"] = event["error_type"]
self.process_trigger_event_error(event)
chosen_options = event["chosen_options"]
params_input = event["params_input"] or {}
self.validate_chosen_options(chosen_options)
self.validate_params_input(params_input)
+
+ self.hitl_summary.update(
+ {
+ "chosen_options": chosen_options,
+ "params_input": params_input,
+ "responded_at": event["responded_at"].isoformat(),
+ "responded_by_user": event["responded_by_user"],
+ }
+ )
+
return HITLTriggerEventSuccessPayload(
chosen_options=chosen_options,
params_input=params_input,
@@ -356,10 +381,14 @@ class ApprovalOperator(HITLOperator, SkipMixin):
**kwargs,
)
+ self.hitl_summary["ignore_downstream_trigger_rules"] =
self.ignore_downstream_trigger_rules
+ self.hitl_summary["fail_on_reject"] = self.fail_on_reject
+
def execute_complete(self, context: Context, event: dict[str, Any]) -> Any:
ret = super().execute_complete(context=context, event=event)
chosen_option = ret["chosen_options"][0]
+ self.hitl_summary["approved"] = chosen_option == self.APPROVE
if chosen_option == self.APPROVE:
self.log.info("Approved. Proceeding with downstream tasks...")
return ret
@@ -413,6 +442,7 @@ class HITLBranchOperator(HITLOperator, BranchMixIn):
super().__init__(**kwargs)
self.options_mapping = options_mapping or {}
self.validate_options_mapping()
+ self.hitl_summary["options_mapping"] = self.options_mapping
def validate_options_mapping(self) -> None:
"""
@@ -447,6 +477,7 @@ class HITLBranchOperator(HITLOperator, BranchMixIn):
# Map options to task IDs using the mapping, fallback to original
option
chosen_options = [self.options_mapping.get(option, option) for option
in chosen_options]
+ self.hitl_summary["branches_to_execute"] = chosen_options
return self.do_branch(context=context,
branches_to_execute=chosen_options)
diff --git a/providers/standard/tests/unit/standard/operators/test_hitl.py
b/providers/standard/tests/unit/standard/operators/test_hitl.py
index 108a6935b38..e4bc4414750 100644
--- a/providers/standard/tests/unit/standard/operators/test_hitl.py
+++ b/providers/standard/tests/unit/standard/operators/test_hitl.py
@@ -16,7 +16,7 @@
# under the License.
from __future__ import annotations
-from uuid import UUID
+from uuid import UUID, uuid4
import pytest
@@ -27,7 +27,7 @@ if not AIRFLOW_V_3_1_PLUS:
import datetime
from typing import TYPE_CHECKING, Any
-from unittest.mock import MagicMock
+from unittest.mock import MagicMock, patch
from urllib.parse import parse_qs, urlparse
import pytest
@@ -871,3 +871,460 @@ class TestHITLBranchOperator:
options=["Approve", "Reject"],
options_mapping={"NotAnOption": "publish"},
)
+
+
+class TestHITLSummaryForListeners:
+ """Verify hitl_summary dict at all lifecycle stages: __init__, execute,
execute_complete."""
+
+ def test_hitl_operator_init_all_fields(self) -> None:
+ """hitl_summary is exactly the expected dict after __init__ with all
fields set."""
+ op = HITLOperator(
+ task_id="test",
+ subject="Please review",
+ body="Details here",
+ options=["Yes", "No"],
+ defaults=["Yes"],
+ multiple=False,
+ assigned_users=HITLUser(id="u1", name="Alice"),
+ params={"env": Param("prod", type="string", description="Target
env")},
+ )
+
+ assert op.hitl_summary == {
+ "subject": "Please review",
+ "body": "Details here",
+ "options": ["Yes", "No"],
+ "defaults": ["Yes"],
+ "multiple": False,
+ "assigned_users": [{"id": "u1", "name": "Alice"}],
+ "serialized_params": op.serialized_params,
+ }
+
+ def test_hitl_operator_init_minimal(self) -> None:
+ """hitl_summary with only required fields; optional ones default to
None."""
+ op = HITLOperator(
+ task_id="test",
+ subject="Review",
+ options=["A", "B"],
+ )
+
+ assert op.hitl_summary == {
+ "subject": "Review",
+ "body": None,
+ "options": ["A", "B"],
+ "defaults": None,
+ "multiple": False,
+ "assigned_users": None,
+ "serialized_params": None,
+ }
+
+ def test_approval_operator_init_summary(self) -> None:
+ """ApprovalOperator hitl_summary includes base + approval-specific
fields."""
+ op = ApprovalOperator(
+ task_id="test",
+ subject="Deploy?",
+ ignore_downstream_trigger_rules=True,
+ fail_on_reject=True,
+ )
+
+ assert op.hitl_summary == {
+ "subject": "Deploy?",
+ "body": None,
+ "options": ["Approve", "Reject"],
+ "defaults": None,
+ "multiple": False,
+ "assigned_users": None,
+ "serialized_params": None,
+ "ignore_downstream_trigger_rules": True,
+ "fail_on_reject": True,
+ }
+
+ def test_approval_operator_init_defaults(self) -> None:
+ """ApprovalOperator with default settings."""
+ op = ApprovalOperator(task_id="test", subject="Deploy?")
+
+ assert op.hitl_summary == {
+ "subject": "Deploy?",
+ "body": None,
+ "options": ["Approve", "Reject"],
+ "defaults": None,
+ "multiple": False,
+ "assigned_users": None,
+ "serialized_params": None,
+ "ignore_downstream_trigger_rules": False,
+ "fail_on_reject": False,
+ }
+
+ def test_hitl_branch_operator_init_with_mapping(self) -> None:
+ """HITLBranchOperator hitl_summary includes base + options_mapping."""
+ op = HITLBranchOperator(
+ task_id="test",
+ subject="Choose",
+ options=["A", "B"],
+ options_mapping={"A": "task_a", "B": "task_b"},
+ )
+
+ assert op.hitl_summary == {
+ "subject": "Choose",
+ "body": None,
+ "options": ["A", "B"],
+ "defaults": None,
+ "multiple": False,
+ "assigned_users": None,
+ "serialized_params": None,
+ "options_mapping": {"A": "task_a", "B": "task_b"},
+ }
+
+ def test_hitl_branch_operator_init_without_mapping(self) -> None:
+ """HITLBranchOperator stores empty dict for options_mapping when not
provided."""
+ op = HITLBranchOperator(
+ task_id="test",
+ subject="Choose",
+ options=["A", "B"],
+ )
+
+ assert op.hitl_summary == {
+ "subject": "Choose",
+ "body": None,
+ "options": ["A", "B"],
+ "defaults": None,
+ "multiple": False,
+ "assigned_users": None,
+ "serialized_params": None,
+ "options_mapping": {},
+ }
+
+ def test_hitl_entry_operator_init_summary(self) -> None:
+ """HITLEntryOperator hitl_summary includes base fields with OK
defaults."""
+ op = HITLEntryOperator(
+ task_id="test",
+ subject="Enter data",
+ params={"name": Param("", type="string")},
+ )
+
+ assert op.hitl_summary == {
+ "subject": "Enter data",
+ "body": None,
+ "options": ["OK"],
+ "defaults": ["OK"],
+ "multiple": False,
+ "assigned_users": None,
+ "serialized_params": op.serialized_params,
+ }
+
+ def test_hitl_entry_operator_init_custom_options(self) -> None:
+ """HITLEntryOperator with explicit options and no defaults."""
+ op = HITLEntryOperator(
+ task_id="test",
+ subject="Confirm",
+ options=["OK", "Cancel"],
+ )
+
+ assert op.hitl_summary == {
+ "subject": "Confirm",
+ "body": None,
+ "options": ["OK", "Cancel"],
+ "defaults": None,
+ "multiple": False,
+ "assigned_users": None,
+ "serialized_params": None,
+ }
+
+ def test_execute_enriches_summary_with_timeout(self) -> None:
+ """execute() adds timeout_datetime; all other init keys remain."""
+ op = HITLOperator(
+ task_id="test",
+ subject="Review",
+ options=["OK"],
+ execution_timeout=datetime.timedelta(minutes=10),
+ )
+
+ with (
+
patch("airflow.providers.standard.operators.hitl.upsert_hitl_detail"),
+ patch.object(op, "defer"),
+ ):
+ op.execute({"task_instance": MagicMock(id=uuid4())}) # type:
ignore[arg-type]
+
+ s = op.hitl_summary
+ # Validate the timeout value is a parseable ISO string
+ timeout_dt = datetime.datetime.fromisoformat(s["timeout_datetime"])
+ assert timeout_dt is not None
+
+ assert s == {
+ "subject": "Review",
+ "body": None,
+ "options": ["OK"],
+ "defaults": None,
+ "multiple": False,
+ "assigned_users": None,
+ "serialized_params": None,
+ "timeout_datetime": s["timeout_datetime"],
+ }
+
+ def test_execute_without_timeout(self) -> None:
+ """execute() sets timeout_datetime to None when no
execution_timeout."""
+ op = HITLOperator(
+ task_id="test",
+ subject="Review",
+ options=["OK"],
+ )
+
+ with (
+
patch("airflow.providers.standard.operators.hitl.upsert_hitl_detail"),
+ patch.object(op, "defer"),
+ ):
+ op.execute({"task_instance": MagicMock(id=uuid4())}) # type:
ignore[arg-type]
+
+ assert op.hitl_summary == {
+ "subject": "Review",
+ "body": None,
+ "options": ["OK"],
+ "defaults": None,
+ "multiple": False,
+ "assigned_users": None,
+ "serialized_params": None,
+ "timeout_datetime": None,
+ }
+
+ def test_hitl_operator_execute_complete_enriches_summary(self) -> None:
+ """execute_complete() adds response fields directly into
hitl_summary."""
+ op = HITLOperator(
+ task_id="test",
+ subject="Review",
+ options=["1", "2"],
+ params={"input": 1},
+ )
+
+ responded_at = timezone.utcnow()
+ op.execute_complete(
+ context={},
+ event={
+ "chosen_options": ["1"],
+ "params_input": {"input": 1},
+ "responded_at": responded_at,
+ "responded_by_user": {"id": "u1", "name": "Alice"},
+ },
+ )
+
+ assert op.hitl_summary == {
+ "subject": "Review",
+ "body": None,
+ "options": ["1", "2"],
+ "defaults": None,
+ "multiple": False,
+ "assigned_users": None,
+ "serialized_params": op.serialized_params,
+ "chosen_options": ["1"],
+ "params_input": {"input": 1},
+ "responded_at": responded_at.isoformat(),
+ "responded_by_user": {"id": "u1", "name": "Alice"},
+ }
+
+ def test_hitl_operator_execute_complete_error_stores_error_type(self) ->
None:
+ """execute_complete() stores error_type in hitl_summary on error
events."""
+ op = HITLOperator(
+ task_id="test",
+ subject="Review",
+ options=["OK"],
+ )
+
+ with pytest.raises(HITLTimeoutError):
+ op.execute_complete(
+ context={},
+ event={"error": "timed out", "error_type": "timeout"},
+ )
+
+ assert op.hitl_summary == {
+ "subject": "Review",
+ "body": None,
+ "options": ["OK"],
+ "defaults": None,
+ "multiple": False,
+ "assigned_users": None,
+ "serialized_params": None,
+ "error_type": "timeout",
+ }
+
+ def test_approval_operator_execute_complete_approved(self) -> None:
+ """Approving sets approved=True in hitl_summary."""
+ op = ApprovalOperator(task_id="test", subject="Deploy?")
+
+ responded_at = timezone.utcnow()
+ op.execute_complete(
+ context={},
+ event={
+ "chosen_options": ["Approve"],
+ "params_input": {},
+ "responded_at": responded_at,
+ "responded_by_user": {"id": "u1", "name": "Alice"},
+ },
+ )
+
+ assert op.hitl_summary == {
+ "subject": "Deploy?",
+ "body": None,
+ "options": ["Approve", "Reject"],
+ "defaults": None,
+ "multiple": False,
+ "assigned_users": None,
+ "serialized_params": None,
+ "ignore_downstream_trigger_rules": False,
+ "fail_on_reject": False,
+ "chosen_options": ["Approve"],
+ "params_input": {},
+ "responded_at": responded_at.isoformat(),
+ "responded_by_user": {"id": "u1", "name": "Alice"},
+ "approved": True,
+ }
+
+ def test_approval_operator_execute_complete_rejected(self) -> None:
+ """Rejecting sets approved=False in hitl_summary."""
+ op = ApprovalOperator(task_id="test", subject="Deploy?")
+
+ responded_at = timezone.utcnow()
+ op.execute_complete(
+ context={},
+ event={
+ "chosen_options": ["Reject"],
+ "params_input": {},
+ "responded_at": responded_at,
+ "responded_by_user": {"id": "u1", "name": "Alice"},
+ },
+ )
+
+ assert op.hitl_summary == {
+ "subject": "Deploy?",
+ "body": None,
+ "options": ["Approve", "Reject"],
+ "defaults": None,
+ "multiple": False,
+ "assigned_users": None,
+ "serialized_params": None,
+ "ignore_downstream_trigger_rules": False,
+ "fail_on_reject": False,
+ "chosen_options": ["Reject"],
+ "params_input": {},
+ "responded_at": responded_at.isoformat(),
+ "responded_by_user": {"id": "u1", "name": "Alice"},
+ "approved": False,
+ }
+
+ def test_hitl_branch_operator_execute_complete_records_branches(
+ self, dag_maker: DagMaker, get_context_from_model_ti: Any
+ ) -> None:
+ """HITLBranchOperator stores branches_to_execute in hitl_summary."""
+ with dag_maker("hitl_summary_dag", serialized=True):
+ op = HITLBranchOperator(
+ task_id="choose",
+ subject="Choose",
+ options=["A", "B"],
+ options_mapping={"A": "task_a", "B": "task_b"},
+ )
+ op >> [EmptyOperator(task_id="task_a"),
EmptyOperator(task_id="task_b")]
+
+ dr = dag_maker.create_dagrun()
+ ti = dr.get_task_instance("choose")
+
+ responded_at = timezone.utcnow()
+ with pytest.raises(DownstreamTasksSkipped):
+ op.execute_complete(
+ context=get_context_from_model_ti(ti, op),
+ event={
+ "chosen_options": ["A"],
+ "params_input": {},
+ "responded_at": responded_at,
+ "responded_by_user": {"id": "u1", "name": "Alice"},
+ },
+ )
+
+ assert op.hitl_summary == {
+ "subject": "Choose",
+ "body": None,
+ "options": ["A", "B"],
+ "defaults": None,
+ "multiple": False,
+ "assigned_users": None,
+ "serialized_params": None,
+ "options_mapping": {"A": "task_a", "B": "task_b"},
+ "chosen_options": ["A"],
+ "params_input": {},
+ "responded_at": responded_at.isoformat(),
+ "responded_by_user": {"id": "u1", "name": "Alice"},
+ "branches_to_execute": ["task_a"],
+ }
+
+ def test_full_lifecycle_approval(self) -> None:
+ """Verify exact hitl_summary at each stage: __init__ -> execute ->
execute_complete."""
+ op = ApprovalOperator(
+ task_id="test",
+ subject="Release v2.0?",
+ body="Please approve the production deployment.",
+ execution_timeout=datetime.timedelta(minutes=30),
+ )
+
+ # -- After __init__: only base + approval keys --
+ assert op.hitl_summary == {
+ "subject": "Release v2.0?",
+ "body": "Please approve the production deployment.",
+ "options": ["Approve", "Reject"],
+ "defaults": None,
+ "multiple": False,
+ "assigned_users": None,
+ "serialized_params": None,
+ "ignore_downstream_trigger_rules": False,
+ "fail_on_reject": False,
+ }
+
+ # -- After execute (mocked defer): timeout_datetime added --
+ with (
+
patch("airflow.providers.standard.operators.hitl.upsert_hitl_detail"),
+ patch.object(op, "defer"),
+ ):
+ op.execute({"task_instance": MagicMock(id=uuid4())}) # type:
ignore[arg-type]
+
+ s = op.hitl_summary
+ timeout_dt_str = s["timeout_datetime"]
+ assert timeout_dt_str is not None
+ datetime.datetime.fromisoformat(timeout_dt_str)
+
+ assert s == {
+ "subject": "Release v2.0?",
+ "body": "Please approve the production deployment.",
+ "options": ["Approve", "Reject"],
+ "defaults": None,
+ "multiple": False,
+ "assigned_users": None,
+ "serialized_params": None,
+ "ignore_downstream_trigger_rules": False,
+ "fail_on_reject": False,
+ "timeout_datetime": timeout_dt_str,
+ }
+
+ # -- After execute_complete: response + approved fields added --
+ responded_at = timezone.utcnow()
+ op.execute_complete(
+ context={},
+ event={
+ "chosen_options": ["Approve"],
+ "params_input": {},
+ "responded_at": responded_at,
+ "responded_by_user": {"id": "admin", "name": "Admin"},
+ },
+ )
+
+ assert s == {
+ "subject": "Release v2.0?",
+ "body": "Please approve the production deployment.",
+ "options": ["Approve", "Reject"],
+ "defaults": None,
+ "multiple": False,
+ "assigned_users": None,
+ "serialized_params": None,
+ "ignore_downstream_trigger_rules": False,
+ "fail_on_reject": False,
+ "timeout_datetime": timeout_dt_str,
+ "chosen_options": ["Approve"],
+ "params_input": {},
+ "responded_at": responded_at.isoformat(),
+ "responded_by_user": {"id": "admin", "name": "Admin"},
+ "approved": True,
+ }