This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f504ba7e310 fix(APDR): add asset events to partitioned DagRun (#61433)
f504ba7e310 is described below

commit f504ba7e3107a098eb191ef02da099f49248e4b3
Author: Wei Lee <[email protected]>
AuthorDate: Mon Feb 16 09:00:47 2026 +0800

    fix(APDR): add asset events to partitioned DagRun (#61433)
    
    * test: simplify test_partitioned_dag_run_with_customized_mapper
    
    * test(scheduler_job): improve APDR test cases to include 
consumed_asset_event validation
    
    * fix(APDR): add asset events to partitioned DagRun
    
    * fixup! fix(APDR): add asset events to partitioned DagRun
    
    * fixup! test: simplify test_partitioned_dag_run_with_customized_mapper
    
    * fixup! test(scheduler_job): improve APDR test cases to include 
consumed_asset_event validation
    
    * fixup! test(scheduler_job): improve APDR test cases to include 
consumed_asset_event validation
    
    * fixup! test(scheduler_job): improve APDR test cases to include 
consumed_asset_event validation
---
 .../src/airflow/jobs/scheduler_job_runner.py       |  7 +++
 airflow-core/tests/unit/jobs/test_scheduler_job.py | 64 +++++++++-------------
 2 files changed, 34 insertions(+), 37 deletions(-)

diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 24ff17015f7..d592b551d95 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -1848,6 +1848,13 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                 creating_job_id=self.job.id,
                 session=session,
             )
+            asset_events = session.scalars(
+                select(AssetEvent).where(
+                    PartitionedAssetKeyLog.asset_partition_dag_run_id == 
apdr.id,
+                    PartitionedAssetKeyLog.asset_event_id == AssetEvent.id,
+                )
+            )
+            dag_run.consumed_asset_events.extend(asset_events)
             session.flush()
             apdr.created_dag_run_id = dag_run.id
             session.flush()
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index a6ce9ade535..f48ad908e4d 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -8810,46 +8810,15 @@ def test_partitioned_dag_run_with_customized_mapper(
     runner = SchedulerJobRunner(
         job=Job(job_type=SchedulerJobRunner.job_type), 
executors=[MockExecutor(do_update=False)]
     )
-
-    with dag_maker(dag_id="asset-event-producer", schedule=None, 
session=session) as dag:
-        EmptyOperator(task_id="hi", outlets=[asset_1])
-
-    dr = 
dag_maker.create_dagrun(partition_key="this-is-not-key-1-before-mapped", 
session=session)
-    [ti] = dr.get_task_instances(session=session)
-    session.commit()
-
-    serialized_outlets = dag.get_task("hi").outlets
     with custom_partition_mapper_patch():
-        TaskInstance.register_asset_changes_in_db(
-            ti=ti,
-            task_outlets=[o.asprofile() for o in serialized_outlets],
-            outlet_events=[],
+        apdr = _produce_and_register_asset_event(
+            dag_id="asset-event-producer",
+            asset=asset_1,
+            partition_key="this-is-not-key-1-before-mapped",
             session=session,
+            dag_maker=dag_maker,
+            expected_partition_key="key-1",
         )
-    session.commit()
-
-    event = session.scalar(
-        select(AssetEvent).where(
-            AssetEvent.source_dag_id == dag.dag_id,
-            AssetEvent.source_run_id == dr.run_id,
-        )
-    )
-    assert event is not None
-    assert event.partition_key == "this-is-not-key-1-before-mapped"
-
-    apdr = session.scalar(
-        select(AssetPartitionDagRun)
-        .join(
-            PartitionedAssetKeyLog,
-            PartitionedAssetKeyLog.asset_partition_dag_run_id == 
AssetPartitionDagRun.id,
-        )
-        .where(PartitionedAssetKeyLog.asset_event_id == event.id)
-    )
-    assert apdr is not None
-    assert apdr.created_dag_run_id is None
-    assert apdr.partition_key == "key-1"
-
-    with custom_partition_mapper_patch():
         partition_dags = 
runner._create_dagruns_for_partitioned_asset_dags(session=session)
     session.refresh(apdr)
     # Since asset event for Asset(name="asset-2") with key "key-1" has not yet 
been created,
@@ -8858,6 +8827,13 @@ def test_partitioned_dag_run_with_customized_mapper(
     assert len(partition_dags) == 1
     assert partition_dags == {"asset-event-consumer"}
 
+    dag_run = session.scalar(select(DagRun).where(DagRun.id == 
apdr.created_dag_run_id))
+    assert dag_run is not None
+    asset_event = dag_run.consumed_asset_events[0]
+    assert asset_event.source_task_id == "hi"
+    assert asset_event.source_dag_id == "asset-event-producer"
+    assert asset_event.source_run_id == "test"
+
 
 @pytest.mark.need_serialized_dag
 @pytest.mark.usefixtures("clear_asset_partition_rows")
@@ -8931,6 +8907,13 @@ def test_consumer_dag_listen_to_two_partitioned_asset(
     assert len(partition_dags) == 1
     assert partition_dags == {"asset-event-consumer"}
 
+    dag_run = session.scalar(select(DagRun).where(DagRun.id == 
apdr.created_dag_run_id))
+    assert dag_run is not None
+    for asset_event in dag_run.consumed_asset_events:
+        assert asset_event.source_task_id == "hi"
+        assert "asset-event-producer-" in asset_event.source_dag_id
+        assert asset_event.source_run_id == "test"
+
 
 @pytest.mark.need_serialized_dag
 @pytest.mark.usefixtures("clear_asset_partition_rows")
@@ -8995,3 +8978,10 @@ def 
test_consumer_dag_listen_to_two_partitioned_asset_with_key_1_mapper(
     assert apdr.created_dag_run_id is not None
     assert len(partition_dags) == 1
     assert partition_dags == {"asset-event-consumer"}
+
+    dag_run = session.scalar(select(DagRun).where(DagRun.id == 
apdr.created_dag_run_id))
+    assert dag_run is not None
+    for asset_event in dag_run.consumed_asset_events:
+        assert asset_event.source_task_id == "hi"
+        assert "asset-event-producer-" in asset_event.source_dag_id
+        assert asset_event.source_run_id == "test"

Reply via email to