This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new fc36f89a665 feat: Add OpenLineage support for HITL operators (#61867)
fc36f89a665 is described below

commit fc36f89a665cba1ab9696765cb88fe927d1fa400
Author: Kacper Muda <[email protected]>
AuthorDate: Mon Feb 16 00:41:30 2026 +0100

    feat: Add OpenLineage support for HITL operators (#61867)
---
 .../airflow/providers/openlineage/utils/utils.py   |   1 +
 .../tests/unit/openlineage/utils/test_utils.py     |   4 +
 .../airflow/providers/standard/operators/hitl.py   |  31 ++
 .../tests/unit/standard/operators/test_hitl.py     | 461 ++++++++++++++++++++-
 4 files changed, 495 insertions(+), 2 deletions(-)

diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py 
b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
index 0ad0f0e88bc..6c6f5c6c670 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
@@ -861,6 +861,7 @@ class TaskInfo(InfoJsonEncodable):
         "postoperator",  # SQLInsertRowsOperator
         "table_name_with_schema",  # SQLInsertRowsOperator
         "column_names",  # SQLInsertRowsOperator
+        "hitl_summary",  # All HITLOperator based operators
     ]
     casts = {
         "operator_class": lambda task: task.task_type,
diff --git a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py 
b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
index cb3eaee8218..914b928a50e 100644
--- a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
+++ b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
@@ -2474,6 +2474,7 @@ def test_task_info_af3():
             self.tol = "tol"  # SQLValueCheckOperator
             self.trigger_dag_id = "trigger_dag_id"  # TriggerDagRunOperator
             self.trigger_run_id = "trigger_run_id"  # TriggerDagRunOperator
+            self.hitl_summary = "hitl_summary"  # HITLOperator
             super().__init__(*args, **kwargs)
 
     with DAG(
@@ -2512,6 +2513,7 @@ def test_task_info_af3():
         "downstream_task_ids": "['task_1']",
         "execution_timeout": None,
         "executor_config": {},
+        "hitl_summary": "hitl_summary",
         "ignore_first_depends_on_past": False,
         "inlets": "[{'uri': 'uri1', 'extra': {'a': 1}}]",
         "mapped": False,
@@ -2599,6 +2601,7 @@ def test_task_info_af2():
             self.tol = "tol"  # SQLValueCheckOperator
             self.trigger_dag_id = "trigger_dag_id"  # TriggerDagRunOperator
             self.trigger_run_id = "trigger_run_id"  # TriggerDagRunOperator
+            self.hitl_summary = "hitl_summary"  # HITLOperator
             super().__init__(*args, **kwargs)
 
     with DAG(
@@ -2637,6 +2640,7 @@ def test_task_info_af2():
         "downstream_task_ids": "['task_1']",
         "execution_timeout": None,
         "executor_config": {},
+        "hitl_summary": "hitl_summary",
         "ignore_first_depends_on_past": True,
         "is_setup": False,
         "is_teardown": False,
diff --git 
a/providers/standard/src/airflow/providers/standard/operators/hitl.py 
b/providers/standard/src/airflow/providers/standard/operators/hitl.py
index 842b448b08e..fc999bf8cd2 100644
--- a/providers/standard/src/airflow/providers/standard/operators/hitl.py
+++ b/providers/standard/src/airflow/providers/standard/operators/hitl.py
@@ -101,6 +101,17 @@ class HITLOperator(BaseOperator):
         self.validate_params()
         self.validate_defaults()
 
+        # HITL summary for the use of listeners; subclasses can extend it.
+        self.hitl_summary: dict[str, Any] = {
+            "subject": self.subject,
+            "body": self.body,
+            "options": self.options,
+            "defaults": self.defaults,
+            "multiple": self.multiple,
+            "assigned_users": self.assigned_users,
+            "serialized_params": self.serialized_params or None,
+        }
+
     def validate_options(self) -> None:
         """
         Validate the `options` attribute of the instance.
@@ -156,6 +167,9 @@ class HITLOperator(BaseOperator):
         else:
             timeout_datetime = None
 
+        # Enrich summary with runtime info
+        self.hitl_summary["timeout_datetime"] = timeout_datetime.isoformat() 
if timeout_datetime else None
+
         self.log.info("Waiting for response")
         for notifier in self.notifiers:
             notifier(context)
@@ -181,12 +195,23 @@ class HITLOperator(BaseOperator):
 
     def execute_complete(self, context: Context, event: dict[str, Any]) -> Any:
         if "error" in event:
+            self.hitl_summary["error_type"] = event["error_type"]
             self.process_trigger_event_error(event)
 
         chosen_options = event["chosen_options"]
         params_input = event["params_input"] or {}
         self.validate_chosen_options(chosen_options)
         self.validate_params_input(params_input)
+
+        self.hitl_summary.update(
+            {
+                "chosen_options": chosen_options,
+                "params_input": params_input,
+                "responded_at": event["responded_at"].isoformat(),
+                "responded_by_user": event["responded_by_user"],
+            }
+        )
+
         return HITLTriggerEventSuccessPayload(
             chosen_options=chosen_options,
             params_input=params_input,
@@ -356,10 +381,14 @@ class ApprovalOperator(HITLOperator, SkipMixin):
             **kwargs,
         )
 
+        self.hitl_summary["ignore_downstream_trigger_rules"] = 
self.ignore_downstream_trigger_rules
+        self.hitl_summary["fail_on_reject"] = self.fail_on_reject
+
     def execute_complete(self, context: Context, event: dict[str, Any]) -> Any:
         ret = super().execute_complete(context=context, event=event)
 
         chosen_option = ret["chosen_options"][0]
+        self.hitl_summary["approved"] = chosen_option == self.APPROVE
         if chosen_option == self.APPROVE:
             self.log.info("Approved. Proceeding with downstream tasks...")
             return ret
@@ -413,6 +442,7 @@ class HITLBranchOperator(HITLOperator, BranchMixIn):
         super().__init__(**kwargs)
         self.options_mapping = options_mapping or {}
         self.validate_options_mapping()
+        self.hitl_summary["options_mapping"] = self.options_mapping
 
     def validate_options_mapping(self) -> None:
         """
@@ -447,6 +477,7 @@ class HITLBranchOperator(HITLOperator, BranchMixIn):
 
         # Map options to task IDs using the mapping, fallback to original 
option
         chosen_options = [self.options_mapping.get(option, option) for option 
in chosen_options]
+        self.hitl_summary["branches_to_execute"] = chosen_options
         return self.do_branch(context=context, 
branches_to_execute=chosen_options)
 
 
diff --git a/providers/standard/tests/unit/standard/operators/test_hitl.py 
b/providers/standard/tests/unit/standard/operators/test_hitl.py
index 108a6935b38..e4bc4414750 100644
--- a/providers/standard/tests/unit/standard/operators/test_hitl.py
+++ b/providers/standard/tests/unit/standard/operators/test_hitl.py
@@ -16,7 +16,7 @@
 # under the License.
 from __future__ import annotations
 
-from uuid import UUID
+from uuid import UUID, uuid4
 
 import pytest
 
@@ -27,7 +27,7 @@ if not AIRFLOW_V_3_1_PLUS:
 
 import datetime
 from typing import TYPE_CHECKING, Any
-from unittest.mock import MagicMock
+from unittest.mock import MagicMock, patch
 from urllib.parse import parse_qs, urlparse
 
 import pytest
@@ -871,3 +871,460 @@ class TestHITLBranchOperator:
                 options=["Approve", "Reject"],
                 options_mapping={"NotAnOption": "publish"},
             )
+
+
+class TestHITLSummaryForListeners:
+    """Verify hitl_summary dict at all lifecycle stages: __init__, execute, 
execute_complete."""
+
+    def test_hitl_operator_init_all_fields(self) -> None:
+        """hitl_summary is exactly the expected dict after __init__ with all 
fields set."""
+        op = HITLOperator(
+            task_id="test",
+            subject="Please review",
+            body="Details here",
+            options=["Yes", "No"],
+            defaults=["Yes"],
+            multiple=False,
+            assigned_users=HITLUser(id="u1", name="Alice"),
+            params={"env": Param("prod", type="string", description="Target 
env")},
+        )
+
+        assert op.hitl_summary == {
+            "subject": "Please review",
+            "body": "Details here",
+            "options": ["Yes", "No"],
+            "defaults": ["Yes"],
+            "multiple": False,
+            "assigned_users": [{"id": "u1", "name": "Alice"}],
+            "serialized_params": op.serialized_params,
+        }
+
+    def test_hitl_operator_init_minimal(self) -> None:
+        """hitl_summary with only required fields; optional ones default to 
None."""
+        op = HITLOperator(
+            task_id="test",
+            subject="Review",
+            options=["A", "B"],
+        )
+
+        assert op.hitl_summary == {
+            "subject": "Review",
+            "body": None,
+            "options": ["A", "B"],
+            "defaults": None,
+            "multiple": False,
+            "assigned_users": None,
+            "serialized_params": None,
+        }
+
+    def test_approval_operator_init_summary(self) -> None:
+        """ApprovalOperator hitl_summary includes base + approval-specific 
fields."""
+        op = ApprovalOperator(
+            task_id="test",
+            subject="Deploy?",
+            ignore_downstream_trigger_rules=True,
+            fail_on_reject=True,
+        )
+
+        assert op.hitl_summary == {
+            "subject": "Deploy?",
+            "body": None,
+            "options": ["Approve", "Reject"],
+            "defaults": None,
+            "multiple": False,
+            "assigned_users": None,
+            "serialized_params": None,
+            "ignore_downstream_trigger_rules": True,
+            "fail_on_reject": True,
+        }
+
+    def test_approval_operator_init_defaults(self) -> None:
+        """ApprovalOperator with default settings."""
+        op = ApprovalOperator(task_id="test", subject="Deploy?")
+
+        assert op.hitl_summary == {
+            "subject": "Deploy?",
+            "body": None,
+            "options": ["Approve", "Reject"],
+            "defaults": None,
+            "multiple": False,
+            "assigned_users": None,
+            "serialized_params": None,
+            "ignore_downstream_trigger_rules": False,
+            "fail_on_reject": False,
+        }
+
+    def test_hitl_branch_operator_init_with_mapping(self) -> None:
+        """HITLBranchOperator hitl_summary includes base + options_mapping."""
+        op = HITLBranchOperator(
+            task_id="test",
+            subject="Choose",
+            options=["A", "B"],
+            options_mapping={"A": "task_a", "B": "task_b"},
+        )
+
+        assert op.hitl_summary == {
+            "subject": "Choose",
+            "body": None,
+            "options": ["A", "B"],
+            "defaults": None,
+            "multiple": False,
+            "assigned_users": None,
+            "serialized_params": None,
+            "options_mapping": {"A": "task_a", "B": "task_b"},
+        }
+
+    def test_hitl_branch_operator_init_without_mapping(self) -> None:
+        """HITLBranchOperator stores empty dict for options_mapping when not 
provided."""
+        op = HITLBranchOperator(
+            task_id="test",
+            subject="Choose",
+            options=["A", "B"],
+        )
+
+        assert op.hitl_summary == {
+            "subject": "Choose",
+            "body": None,
+            "options": ["A", "B"],
+            "defaults": None,
+            "multiple": False,
+            "assigned_users": None,
+            "serialized_params": None,
+            "options_mapping": {},
+        }
+
+    def test_hitl_entry_operator_init_summary(self) -> None:
+        """HITLEntryOperator hitl_summary includes base fields with OK 
defaults."""
+        op = HITLEntryOperator(
+            task_id="test",
+            subject="Enter data",
+            params={"name": Param("", type="string")},
+        )
+
+        assert op.hitl_summary == {
+            "subject": "Enter data",
+            "body": None,
+            "options": ["OK"],
+            "defaults": ["OK"],
+            "multiple": False,
+            "assigned_users": None,
+            "serialized_params": op.serialized_params,
+        }
+
+    def test_hitl_entry_operator_init_custom_options(self) -> None:
+        """HITLEntryOperator with explicit options and no defaults."""
+        op = HITLEntryOperator(
+            task_id="test",
+            subject="Confirm",
+            options=["OK", "Cancel"],
+        )
+
+        assert op.hitl_summary == {
+            "subject": "Confirm",
+            "body": None,
+            "options": ["OK", "Cancel"],
+            "defaults": None,
+            "multiple": False,
+            "assigned_users": None,
+            "serialized_params": None,
+        }
+
+    def test_execute_enriches_summary_with_timeout(self) -> None:
+        """execute() adds timeout_datetime; all other init keys remain."""
+        op = HITLOperator(
+            task_id="test",
+            subject="Review",
+            options=["OK"],
+            execution_timeout=datetime.timedelta(minutes=10),
+        )
+
+        with (
+            
patch("airflow.providers.standard.operators.hitl.upsert_hitl_detail"),
+            patch.object(op, "defer"),
+        ):
+            op.execute({"task_instance": MagicMock(id=uuid4())})  # type: 
ignore[arg-type]
+
+        s = op.hitl_summary
+        # Validate the timeout value is a parseable ISO string
+        timeout_dt = datetime.datetime.fromisoformat(s["timeout_datetime"])
+        assert timeout_dt is not None
+
+        assert s == {
+            "subject": "Review",
+            "body": None,
+            "options": ["OK"],
+            "defaults": None,
+            "multiple": False,
+            "assigned_users": None,
+            "serialized_params": None,
+            "timeout_datetime": s["timeout_datetime"],
+        }
+
+    def test_execute_without_timeout(self) -> None:
+        """execute() sets timeout_datetime to None when no 
execution_timeout."""
+        op = HITLOperator(
+            task_id="test",
+            subject="Review",
+            options=["OK"],
+        )
+
+        with (
+            
patch("airflow.providers.standard.operators.hitl.upsert_hitl_detail"),
+            patch.object(op, "defer"),
+        ):
+            op.execute({"task_instance": MagicMock(id=uuid4())})  # type: 
ignore[arg-type]
+
+        assert op.hitl_summary == {
+            "subject": "Review",
+            "body": None,
+            "options": ["OK"],
+            "defaults": None,
+            "multiple": False,
+            "assigned_users": None,
+            "serialized_params": None,
+            "timeout_datetime": None,
+        }
+
+    def test_hitl_operator_execute_complete_enriches_summary(self) -> None:
+        """execute_complete() adds response fields directly into 
hitl_summary."""
+        op = HITLOperator(
+            task_id="test",
+            subject="Review",
+            options=["1", "2"],
+            params={"input": 1},
+        )
+
+        responded_at = timezone.utcnow()
+        op.execute_complete(
+            context={},
+            event={
+                "chosen_options": ["1"],
+                "params_input": {"input": 1},
+                "responded_at": responded_at,
+                "responded_by_user": {"id": "u1", "name": "Alice"},
+            },
+        )
+
+        assert op.hitl_summary == {
+            "subject": "Review",
+            "body": None,
+            "options": ["1", "2"],
+            "defaults": None,
+            "multiple": False,
+            "assigned_users": None,
+            "serialized_params": op.serialized_params,
+            "chosen_options": ["1"],
+            "params_input": {"input": 1},
+            "responded_at": responded_at.isoformat(),
+            "responded_by_user": {"id": "u1", "name": "Alice"},
+        }
+
+    def test_hitl_operator_execute_complete_error_stores_error_type(self) -> 
None:
+        """execute_complete() stores error_type in hitl_summary on error 
events."""
+        op = HITLOperator(
+            task_id="test",
+            subject="Review",
+            options=["OK"],
+        )
+
+        with pytest.raises(HITLTimeoutError):
+            op.execute_complete(
+                context={},
+                event={"error": "timed out", "error_type": "timeout"},
+            )
+
+        assert op.hitl_summary == {
+            "subject": "Review",
+            "body": None,
+            "options": ["OK"],
+            "defaults": None,
+            "multiple": False,
+            "assigned_users": None,
+            "serialized_params": None,
+            "error_type": "timeout",
+        }
+
+    def test_approval_operator_execute_complete_approved(self) -> None:
+        """Approving sets approved=True in hitl_summary."""
+        op = ApprovalOperator(task_id="test", subject="Deploy?")
+
+        responded_at = timezone.utcnow()
+        op.execute_complete(
+            context={},
+            event={
+                "chosen_options": ["Approve"],
+                "params_input": {},
+                "responded_at": responded_at,
+                "responded_by_user": {"id": "u1", "name": "Alice"},
+            },
+        )
+
+        assert op.hitl_summary == {
+            "subject": "Deploy?",
+            "body": None,
+            "options": ["Approve", "Reject"],
+            "defaults": None,
+            "multiple": False,
+            "assigned_users": None,
+            "serialized_params": None,
+            "ignore_downstream_trigger_rules": False,
+            "fail_on_reject": False,
+            "chosen_options": ["Approve"],
+            "params_input": {},
+            "responded_at": responded_at.isoformat(),
+            "responded_by_user": {"id": "u1", "name": "Alice"},
+            "approved": True,
+        }
+
+    def test_approval_operator_execute_complete_rejected(self) -> None:
+        """Rejecting sets approved=False in hitl_summary."""
+        op = ApprovalOperator(task_id="test", subject="Deploy?")
+
+        responded_at = timezone.utcnow()
+        op.execute_complete(
+            context={},
+            event={
+                "chosen_options": ["Reject"],
+                "params_input": {},
+                "responded_at": responded_at,
+                "responded_by_user": {"id": "u1", "name": "Alice"},
+            },
+        )
+
+        assert op.hitl_summary == {
+            "subject": "Deploy?",
+            "body": None,
+            "options": ["Approve", "Reject"],
+            "defaults": None,
+            "multiple": False,
+            "assigned_users": None,
+            "serialized_params": None,
+            "ignore_downstream_trigger_rules": False,
+            "fail_on_reject": False,
+            "chosen_options": ["Reject"],
+            "params_input": {},
+            "responded_at": responded_at.isoformat(),
+            "responded_by_user": {"id": "u1", "name": "Alice"},
+            "approved": False,
+        }
+
+    def test_hitl_branch_operator_execute_complete_records_branches(
+        self, dag_maker: DagMaker, get_context_from_model_ti: Any
+    ) -> None:
+        """HITLBranchOperator stores branches_to_execute in hitl_summary."""
+        with dag_maker("hitl_summary_dag", serialized=True):
+            op = HITLBranchOperator(
+                task_id="choose",
+                subject="Choose",
+                options=["A", "B"],
+                options_mapping={"A": "task_a", "B": "task_b"},
+            )
+            op >> [EmptyOperator(task_id="task_a"), 
EmptyOperator(task_id="task_b")]
+
+        dr = dag_maker.create_dagrun()
+        ti = dr.get_task_instance("choose")
+
+        responded_at = timezone.utcnow()
+        with pytest.raises(DownstreamTasksSkipped):
+            op.execute_complete(
+                context=get_context_from_model_ti(ti, op),
+                event={
+                    "chosen_options": ["A"],
+                    "params_input": {},
+                    "responded_at": responded_at,
+                    "responded_by_user": {"id": "u1", "name": "Alice"},
+                },
+            )
+
+        assert op.hitl_summary == {
+            "subject": "Choose",
+            "body": None,
+            "options": ["A", "B"],
+            "defaults": None,
+            "multiple": False,
+            "assigned_users": None,
+            "serialized_params": None,
+            "options_mapping": {"A": "task_a", "B": "task_b"},
+            "chosen_options": ["A"],
+            "params_input": {},
+            "responded_at": responded_at.isoformat(),
+            "responded_by_user": {"id": "u1", "name": "Alice"},
+            "branches_to_execute": ["task_a"],
+        }
+
+    def test_full_lifecycle_approval(self) -> None:
+        """Verify exact hitl_summary at each stage: __init__ -> execute -> 
execute_complete."""
+        op = ApprovalOperator(
+            task_id="test",
+            subject="Release v2.0?",
+            body="Please approve the production deployment.",
+            execution_timeout=datetime.timedelta(minutes=30),
+        )
+
+        # -- After __init__: only base + approval keys --
+        assert op.hitl_summary == {
+            "subject": "Release v2.0?",
+            "body": "Please approve the production deployment.",
+            "options": ["Approve", "Reject"],
+            "defaults": None,
+            "multiple": False,
+            "assigned_users": None,
+            "serialized_params": None,
+            "ignore_downstream_trigger_rules": False,
+            "fail_on_reject": False,
+        }
+
+        # -- After execute (mocked defer): timeout_datetime added --
+        with (
+            
patch("airflow.providers.standard.operators.hitl.upsert_hitl_detail"),
+            patch.object(op, "defer"),
+        ):
+            op.execute({"task_instance": MagicMock(id=uuid4())})  # type: 
ignore[arg-type]
+
+        s = op.hitl_summary
+        timeout_dt_str = s["timeout_datetime"]
+        assert timeout_dt_str is not None
+        datetime.datetime.fromisoformat(timeout_dt_str)
+
+        assert s == {
+            "subject": "Release v2.0?",
+            "body": "Please approve the production deployment.",
+            "options": ["Approve", "Reject"],
+            "defaults": None,
+            "multiple": False,
+            "assigned_users": None,
+            "serialized_params": None,
+            "ignore_downstream_trigger_rules": False,
+            "fail_on_reject": False,
+            "timeout_datetime": timeout_dt_str,
+        }
+
+        # -- After execute_complete: response + approved fields added --
+        responded_at = timezone.utcnow()
+        op.execute_complete(
+            context={},
+            event={
+                "chosen_options": ["Approve"],
+                "params_input": {},
+                "responded_at": responded_at,
+                "responded_by_user": {"id": "admin", "name": "Admin"},
+            },
+        )
+
+        assert s == {
+            "subject": "Release v2.0?",
+            "body": "Please approve the production deployment.",
+            "options": ["Approve", "Reject"],
+            "defaults": None,
+            "multiple": False,
+            "assigned_users": None,
+            "serialized_params": None,
+            "ignore_downstream_trigger_rules": False,
+            "fail_on_reject": False,
+            "timeout_datetime": timeout_dt_str,
+            "chosen_options": ["Approve"],
+            "params_input": {},
+            "responded_at": responded_at.isoformat(),
+            "responded_by_user": {"id": "admin", "name": "Admin"},
+            "approved": True,
+        }

Reply via email to