This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 05f935d2a6c utilize more information to deterministically generate 
OpenLineage run_id (#43936)
05f935d2a6c is described below

commit 05f935d2a6c6f4bcd34f0f3d0e7c7f715b55b250
Author: Maciej Obuchowski <[email protected]>
AuthorDate: Mon Nov 25 14:21:34 2024 +0100

    utilize more information to deterministically generate OpenLineage run_id 
(#43936)
    
    Signed-off-by: Maciej Obuchowski <[email protected]>
---
 .../providers/dbt/cloud/utils/openlineage.py       |  1 +
 .../providers/openlineage/plugins/adapter.py       |  3 +-
 .../providers/openlineage/plugins/listener.py      | 11 ++++
 .../providers/openlineage/plugins/macros.py        |  1 +
 .../airflow/providers/openlineage/utils/utils.py   | 34 +++++++++++--
 .../tests/openlineage/plugins/test_adapter.py      | 23 +++++++++
 .../tests/openlineage/plugins/test_execution.py    |  1 -
 .../tests/openlineage/plugins/test_listener.py     | 59 +++++++++-------------
 8 files changed, 91 insertions(+), 42 deletions(-)

diff --git a/providers/src/airflow/providers/dbt/cloud/utils/openlineage.py 
b/providers/src/airflow/providers/dbt/cloud/utils/openlineage.py
index 41b1f9635fd..470c153cf0f 100644
--- a/providers/src/airflow/providers/dbt/cloud/utils/openlineage.py
+++ b/providers/src/airflow/providers/dbt/cloud/utils/openlineage.py
@@ -156,6 +156,7 @@ def generate_openlineage_events_from_dbt_cloud_run(
             task_id=operator.task_id,
             logical_date=_get_logical_date(task_instance),
             try_number=_get_try_number(task_instance),
+            map_index=task_instance.map_index,
         )
 
         parent_job = ParentRunMetadata(
diff --git a/providers/src/airflow/providers/openlineage/plugins/adapter.py 
b/providers/src/airflow/providers/openlineage/plugins/adapter.py
index 688f2d65a54..535618529c1 100644
--- a/providers/src/airflow/providers/openlineage/plugins/adapter.py
+++ b/providers/src/airflow/providers/openlineage/plugins/adapter.py
@@ -129,11 +129,12 @@ class OpenLineageAdapter(LoggingMixin):
         task_id: str,
         try_number: int,
         logical_date: datetime,
+        map_index: int,
     ):
         return str(
             generate_static_uuid(
                 instant=logical_date,
-                
data=f"{conf.namespace()}.{dag_id}.{task_id}.{try_number}".encode(),
+                
data=f"{conf.namespace()}.{dag_id}.{task_id}.{try_number}.{map_index}".encode(),
             )
         )
 
diff --git a/providers/src/airflow/providers/openlineage/plugins/listener.py 
b/providers/src/airflow/providers/openlineage/plugins/listener.py
index 7feece437a6..6a539ea27f4 100644
--- a/providers/src/airflow/providers/openlineage/plugins/listener.py
+++ b/providers/src/airflow/providers/openlineage/plugins/listener.py
@@ -42,6 +42,7 @@ from airflow.providers.openlineage.utils.utils import (
     get_user_provided_run_facets,
     is_operator_disabled,
     is_selective_lineage_enabled,
+    is_ti_rescheduled_already,
     print_warning,
 )
 from airflow.settings import configure_orm
@@ -134,6 +135,11 @@ class OpenLineageListener:
             # we return here because Airflow 2.3 needs task from deferred state
             if task_instance.next_method is not None:
                 return
+
+            if is_ti_rescheduled_already(task_instance):
+                self.log.debug("Skipping this instance of rescheduled task - 
START event was emitted already")
+                return
+
             parent_run_id = self.adapter.build_dag_run_id(
                 dag_id=dag.dag_id,
                 logical_date=dagrun.logical_date,
@@ -143,11 +149,13 @@ class OpenLineageListener:
                 logical_date = task_instance.logical_date
             else:
                 logical_date = task_instance.execution_date
+
             task_uuid = self.adapter.build_task_instance_run_id(
                 dag_id=dag.dag_id,
                 task_id=task.task_id,
                 try_number=task_instance.try_number,
                 logical_date=logical_date,
+                map_index=task_instance.map_index,
             )
             event_type = RunState.RUNNING.value.lower()
             operator_name = task.task_type.lower()
@@ -231,6 +239,7 @@ class OpenLineageListener:
                 task_id=task.task_id,
                 try_number=_get_try_number_success(task_instance),
                 logical_date=logical_date,
+                map_index=task_instance.map_index,
             )
             event_type = RunState.COMPLETE.value.lower()
             operator_name = task.task_type.lower()
@@ -329,11 +338,13 @@ class OpenLineageListener:
                 logical_date = task_instance.logical_date
             else:
                 logical_date = task_instance.execution_date
+
             task_uuid = self.adapter.build_task_instance_run_id(
                 dag_id=dag.dag_id,
                 task_id=task.task_id,
                 try_number=task_instance.try_number,
                 logical_date=logical_date,
+                map_index=task_instance.map_index,
             )
             event_type = RunState.FAIL.value.lower()
             operator_name = task.task_type.lower()
diff --git a/providers/src/airflow/providers/openlineage/plugins/macros.py 
b/providers/src/airflow/providers/openlineage/plugins/macros.py
index 69af17321e4..fd5194d3f32 100644
--- a/providers/src/airflow/providers/openlineage/plugins/macros.py
+++ b/providers/src/airflow/providers/openlineage/plugins/macros.py
@@ -67,6 +67,7 @@ def lineage_run_id(task_instance: TaskInstance):
         task_id=task_instance.task_id,
         try_number=task_instance.try_number,
         logical_date=logical_date,
+        map_index=task_instance.map_index,
     )
 
 
diff --git a/providers/src/airflow/providers/openlineage/utils/utils.py 
b/providers/src/airflow/providers/openlineage/utils/utils.py
index 622f45bd65b..eb1d5984d32 100644
--- a/providers/src/airflow/providers/openlineage/utils/utils.py
+++ b/providers/src/airflow/providers/openlineage/utils/utils.py
@@ -30,6 +30,7 @@ import attrs
 from deprecated import deprecated
 from openlineage.client.utils import RedactMixin
 from packaging.version import Version
+from sqlalchemy import exists
 
 from airflow import __version__ as AIRFLOW_VERSION
 from airflow.exceptions import (
@@ -37,7 +38,7 @@ from airflow.exceptions import (
 )
 
 # TODO: move this maybe to Airflow's logic?
-from airflow.models import DAG, BaseOperator, DagRun, MappedOperator
+from airflow.models import DAG, BaseOperator, DagRun, MappedOperator, 
TaskReschedule
 from airflow.providers.openlineage import __version__ as 
OPENLINEAGE_PROVIDER_VERSION, conf
 from airflow.providers.openlineage.plugins.facets import (
     AirflowDagRunFacet,
@@ -53,6 +54,7 @@ from airflow.providers.openlineage.utils.selective_enable 
import (
     is_dag_lineage_enabled,
     is_task_lineage_enabled,
 )
+from airflow.sensors.base import BaseSensorOperator
 from airflow.serialization.serialized_objects import SerializedBaseOperator
 from airflow.utils.context import AirflowContextDeprecationWarning
 from airflow.utils.log.secrets_masker import (
@@ -62,6 +64,7 @@ from airflow.utils.log.secrets_masker import (
     should_hide_value_for_key,
 )
 from airflow.utils.module_loading import import_string
+from airflow.utils.session import NEW_SESSION, provide_session
 
 if TYPE_CHECKING:
     from openlineage.client.event_v2 import Dataset as OpenLineageDataset
@@ -184,6 +187,28 @@ def is_selective_lineage_enabled(obj: DAG | BaseOperator | 
MappedOperator) -> bo
         raise TypeError("is_selective_lineage_enabled can only be used on DAG 
or Operator objects")
 
 
+@provide_session
+def is_ti_rescheduled_already(ti: TaskInstance, session=NEW_SESSION):
+    if not isinstance(ti.task, BaseSensorOperator):
+        return False
+
+    if not ti.task.reschedule:
+        return False
+
+    return (
+        session.query(
+            exists().where(
+                TaskReschedule.dag_id == ti.dag_id,
+                TaskReschedule.task_id == ti.task_id,
+                TaskReschedule.run_id == ti.run_id,
+                TaskReschedule.map_index == ti.map_index,
+                TaskReschedule.try_number == ti.try_number,
+            )
+        ).scalar()
+        is True
+    )
+
+
 class InfoJsonEncodable(dict):
     """
     Airflow objects might not be json-encodable overall.
@@ -217,6 +242,7 @@ class InfoJsonEncodable(dict):
             self,
             **{field: InfoJsonEncodable._cast_basic_types(getattr(self, 
field)) for field in self._fields},
         )
+        del self.obj
 
     @staticmethod
     def _cast_basic_types(value):
@@ -677,11 +703,11 @@ def print_warning(log):
         def wrapper(*args, **kwargs):
             try:
                 return f(*args, **kwargs)
-            except Exception as e:
+            except Exception:
                 log.warning(
-                    "Note: exception below is being caught: it's printed for 
visibility. However OpenLineage events aren't being emitted. If you see that, 
task has completed successfully despite not getting OL events."
+                    "OpenLineage event emission failed. Exception below is 
being caught: it's printed for visibility. This has no impact on actual task 
execution status.",
+                    exc_info=True,
                 )
-                log.warning(e)
 
         return wrapper
 
diff --git a/providers/tests/openlineage/plugins/test_adapter.py 
b/providers/tests/openlineage/plugins/test_adapter.py
index 576f6df830c..a9787a9399a 100644
--- a/providers/tests/openlineage/plugins/test_adapter.py
+++ b/providers/tests/openlineage/plugins/test_adapter.py
@@ -900,6 +900,7 @@ def test_build_task_instance_run_id_is_valid_uuid():
         task_id="task_id",
         try_number=1,
         logical_date=datetime.datetime.now(),
+        map_index=-1,
     )
     uuid_result = uuid.UUID(result)
     assert uuid_result
@@ -912,28 +913,50 @@ def 
test_build_task_instance_run_id_same_input_gives_same_result():
         task_id="task1",
         try_number=1,
         logical_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
+        map_index=-1,
     )
     result2 = OpenLineageAdapter.build_task_instance_run_id(
         dag_id="dag1",
         task_id="task1",
         try_number=1,
         logical_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
+        map_index=-1,
     )
     assert result1 == result2
 
 
+def 
test_build_task_instance_run_id_different_map_index_gives_different_result():
+    result1 = OpenLineageAdapter.build_task_instance_run_id(
+        dag_id="dag1",
+        task_id="task1",
+        try_number=1,
+        logical_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
+        map_index=1,
+    )
+    result2 = OpenLineageAdapter.build_task_instance_run_id(
+        dag_id="dag1",
+        task_id="task1",
+        try_number=1,
+        logical_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
+        map_index=2,
+    )
+    assert result1 != result2
+
+
 def test_build_task_instance_run_id_different_inputs_gives_different_results():
     result1 = OpenLineageAdapter.build_task_instance_run_id(
         dag_id="dag1",
         task_id="task1",
         try_number=1,
         logical_date=datetime.datetime.now(),
+        map_index=-1,
     )
     result2 = OpenLineageAdapter.build_task_instance_run_id(
         dag_id="dag2",
         task_id="task2",
         try_number=2,
         logical_date=datetime.datetime.now(),
+        map_index=-1,
     )
     assert result1 != result2
 
diff --git a/providers/tests/openlineage/plugins/test_execution.py 
b/providers/tests/openlineage/plugins/test_execution.py
index 58105481212..26aa09078cb 100644
--- a/providers/tests/openlineage/plugins/test_execution.py
+++ b/providers/tests/openlineage/plugins/test_execution.py
@@ -63,7 +63,6 @@ def get_sorted_events(event_dir: str) -> list[str]:
 
 def has_value_in_events(events, chain, value):
     x = [get_from_nullable_chain(event, chain) for event in events]
-    log.error(x)
     y = [z == value for z in x]
     return any(y)
 
diff --git a/providers/tests/openlineage/plugins/test_listener.py 
b/providers/tests/openlineage/plugins/test_listener.py
index d53c23c8b6d..cbc4436ef01 100644
--- a/providers/tests/openlineage/plugins/test_listener.py
+++ b/providers/tests/openlineage/plugins/test_listener.py
@@ -189,13 +189,12 @@ def _create_listener_and_task_instance() -> 
tuple[OpenLineageListener, TaskInsta
     """
 
     def mock_dag_id(dag_id, logical_date):
-        return f"{logical_date}.{dag_id}"
+        return f"{logical_date.isoformat()}.{dag_id}"
 
-    def mock_task_id(dag_id, task_id, try_number, logical_date):
-        return f"{logical_date}.{dag_id}.{task_id}.{try_number}"
+    def mock_task_id(dag_id, task_id, try_number, logical_date, map_index):
+        return 
f"{logical_date.isoformat()}.{dag_id}.{task_id}.{try_number}.{map_index}"
 
     listener = OpenLineageListener()
-    listener.log = mock.Mock()
     listener.extractor_manager = mock.Mock()
 
     metadata = mock.Mock()
@@ -216,22 +215,25 @@ def _create_listener_and_task_instance() -> 
tuple[OpenLineageListener, TaskInsta
     task_instance.dag_run.data_interval_start = None
     task_instance.dag_run.data_interval_end = None
     if AIRFLOW_V_3_0_PLUS:
-        task_instance.dag_run.logical_date = "2020-01-01T01:01:01"
+        task_instance.dag_run.logical_date = dt.datetime(2020, 1, 1, 1, 1, 1)
     else:
-        task_instance.dag_run.execution_date = "2020-01-01T01:01:01"
+        task_instance.dag_run.execution_date = dt.datetime(2020, 1, 1, 1, 1, 1)
     task_instance.task = mock.Mock()
     task_instance.task.task_id = "task_id"
     task_instance.task.dag = mock.Mock()
     task_instance.task.dag.dag_id = "dag_id"
     task_instance.task.dag.description = "Test DAG Description"
     task_instance.task.dag.owner = "Test Owner"
+    task_instance.task.inlets = []
+    task_instance.task.outlets = []
     task_instance.dag_id = "dag_id"
     task_instance.run_id = "dag_run_run_id"
     task_instance.try_number = 1
     task_instance.state = State.RUNNING
     task_instance.start_date = dt.datetime(2023, 1, 1, 13, 1, 1)
     task_instance.end_date = dt.datetime(2023, 1, 3, 13, 1, 1)
-    task_instance.logical_date = "2020-01-01T01:01:01"
+    task_instance.logical_date = dt.datetime(2020, 1, 1, 1, 1, 1)
+    task_instance.map_index = -1
     task_instance.next_method = None  # Ensure this is None to reach start_task
 
     return listener, task_instance
@@ -258,8 +260,8 @@ def test_adapter_start_task_is_called_with_proper_arguments(
     correctly passed to the adapter. It also verifies that custom facets and 
Airflow run facets are
     correctly retrieved and included in the call. This ensures that all 
relevant data, including custom
     and Airflow-specific metadata, is accurately conveyed to the adapter 
during the initialization of a task,
-    reflecting the comprehensive tracking of task execution contexts.
-    """
+    reflecting the comprehensive tracking of task execution contexts."""
+
     listener, task_instance = _create_listener_and_task_instance()
     mock_get_job_name.return_value = "job_name"
     mock_get_airflow_mapped_task_facet.return_value = {"mapped_facet": 1}
@@ -269,7 +271,7 @@ def test_adapter_start_task_is_called_with_proper_arguments(
 
     listener.on_task_instance_running(None, task_instance, None)
     listener.adapter.start_task.assert_called_once_with(
-        run_id="2020-01-01T01:01:01.dag_id.task_id.1",
+        run_id="2020-01-01T01:01:01.dag_id.task_id.1.-1",
         job_name="job_name",
         job_description="Test DAG Description",
         event_time="2023-01-01T13:01:01",
@@ -291,7 +293,6 @@ def test_adapter_start_task_is_called_with_proper_arguments(
 
 @mock.patch("airflow.providers.openlineage.conf.debug_mode", return_value=True)
 
@mock.patch("airflow.providers.openlineage.plugins.listener.is_operator_disabled")
[email protected]("airflow.providers.openlineage.plugins.listener.OpenLineageAdapter")
 
@mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_run_facet")
 
@mock.patch("airflow.providers.openlineage.plugins.listener.get_user_provided_run_facets")
 @mock.patch("airflow.providers.openlineage.plugins.listener.get_job_name")
@@ -300,7 +301,6 @@ def test_adapter_fail_task_is_called_with_proper_arguments(
     mock_get_job_name,
     mock_get_user_provided_run_facets,
     mock_get_airflow_run_facet,
-    mocked_adapter,
     mock_disabled,
     mock_debug_mode,
 ):
@@ -312,17 +312,9 @@ def test_adapter_fail_task_is_called_with_proper_arguments(
     failure events, thus confirming that the adapter's failure handling is 
functioning as expected.
     """
 
-    def mock_dag_id(dag_id, logical_date):
-        return f"{logical_date}.{dag_id}"
-
-    def mock_task_id(dag_id, task_id, try_number, logical_date):
-        return f"{logical_date}.{dag_id}.{task_id}.{try_number}"
-
     listener, task_instance = _create_listener_and_task_instance()
-    task_instance.logical_date = "2020-01-01T01:01:01"
+    task_instance.logical_date = dt.datetime(2020, 1, 1, 1, 1, 1)
     mock_get_job_name.return_value = "job_name"
-    mocked_adapter.build_dag_run_id.side_effect = mock_dag_id
-    mocked_adapter.build_task_instance_run_id.side_effect = mock_task_id
     mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2}
     mock_get_airflow_run_facet.return_value = {"airflow": {"task": "..."}}
     mock_disabled.return_value = False
@@ -339,7 +331,7 @@ def test_adapter_fail_task_is_called_with_proper_arguments(
         job_name="job_name",
         parent_job_name="dag_id",
         parent_run_id="2020-01-01T01:01:01.dag_id",
-        run_id="2020-01-01T01:01:01.dag_id.task_id.1",
+        run_id="2020-01-01T01:01:01.dag_id.task_id.1.-1",
         task=listener.extractor_manager.extract_metadata(),
         run_facets={
             "custom_user_facet": 2,
@@ -372,16 +364,8 @@ def 
test_adapter_complete_task_is_called_with_proper_arguments(
     during the task's lifecycle events.
     """
 
-    def mock_dag_id(dag_id, logical_date):
-        return f"{logical_date}.{dag_id}"
-
-    def mock_task_id(dag_id, task_id, try_number, logical_date):
-        return f"{logical_date}.{dag_id}.{task_id}.{try_number}"
-
     listener, task_instance = _create_listener_and_task_instance()
     mock_get_job_name.return_value = "job_name"
-    listener.adapter.build_dag_run_id.side_effect = mock_dag_id
-    listener.adapter.build_task_instance_run_id.side_effect = mock_task_id
     mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2}
     mock_get_airflow_run_facet.return_value = {"airflow": {"task": "..."}}
     mock_disabled.return_value = False
@@ -396,7 +380,7 @@ def 
test_adapter_complete_task_is_called_with_proper_arguments(
         job_name="job_name",
         parent_job_name="dag_id",
         parent_run_id="2020-01-01T01:01:01.dag_id",
-        run_id=f"2020-01-01T01:01:01.dag_id.task_id.{EXPECTED_TRY_NUMBER_1}",
+        
run_id=f"2020-01-01T01:01:01.dag_id.task_id.{EXPECTED_TRY_NUMBER_1}.-1",
         task=listener.extractor_manager.extract_metadata(),
         run_facets={
             "custom_user_facet": 2,
@@ -419,8 +403,9 @@ def 
test_on_task_instance_running_correctly_calls_openlineage_adapter_run_id_met
     listener.adapter.build_task_instance_run_id.assert_called_once_with(
         dag_id="dag_id",
         task_id="task_id",
-        logical_date="2020-01-01T01:01:01",
+        logical_date=dt.datetime(2020, 1, 1, 1, 1, 1),
         try_number=1,
+        map_index=-1,
     )
 
 
@@ -441,8 +426,9 @@ def 
test_on_task_instance_failed_correctly_calls_openlineage_adapter_run_id_meth
     listener.adapter.build_task_instance_run_id.assert_called_once_with(
         dag_id="dag_id",
         task_id="task_id",
-        logical_date="2020-01-01T01:01:01",
+        logical_date=dt.datetime(2020, 1, 1, 1, 1, 1),
         try_number=1,
+        map_index=-1,
     )
 
 
@@ -459,8 +445,9 @@ def 
test_on_task_instance_success_correctly_calls_openlineage_adapter_run_id_met
     listener.adapter.build_task_instance_run_id.assert_called_once_with(
         dag_id="dag_id",
         task_id="task_id",
-        logical_date="2020-01-01T01:01:01",
+        logical_date=dt.datetime(2020, 1, 1, 1, 1, 1),
         try_number=EXPECTED_TRY_NUMBER_1,
+        map_index=-1,
     )
 
 
@@ -701,8 +688,8 @@ class TestOpenLineageSelectiveEnable:
             run_id=run_id,
             **triggered_by_kwargs,
         )  # type: ignore
-        self.task_instance_1 = TaskInstance(self.task_1, run_id=run_id)
-        self.task_instance_2 = TaskInstance(self.task_2, run_id=run_id)
+        self.task_instance_1 = TaskInstance(self.task_1, run_id=run_id, 
map_index=-1)
+        self.task_instance_2 = TaskInstance(self.task_2, run_id=run_id, 
map_index=-1)
         self.task_instance_1.dag_run = self.task_instance_2.dag_run = 
self.dagrun
 
     @pytest.mark.parametrize(

Reply via email to