This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f504ba7e310 fix(APDR): add asset events to partitioned DagRun (#61433)
f504ba7e310 is described below
commit f504ba7e3107a098eb191ef02da099f49248e4b3
Author: Wei Lee <[email protected]>
AuthorDate: Mon Feb 16 09:00:47 2026 +0800
fix(APDR): add asset events to partitioned DagRun (#61433)
* test: simplify test_partitioned_dag_run_with_customized_mapper
* test(scheduler_job): improve APDR test cases to include
consumed_asset_event validation
* fix(APDR): add asset events to partitioned DagRun
* fixup! fix(APDR): add asset events to partitioned DagRun
* fixup! test: simplify test_partitioned_dag_run_with_customized_mapper
* fixup! test(scheduler_job): improve APDR test cases to include
consumed_asset_event validation
* fixup! test(scheduler_job): improve APDR test cases to include
consumed_asset_event validation
* fixup! test(scheduler_job): improve APDR test cases to include
consumed_asset_event validation
---
.../src/airflow/jobs/scheduler_job_runner.py | 7 +++
airflow-core/tests/unit/jobs/test_scheduler_job.py | 64 +++++++++-------------
2 files changed, 34 insertions(+), 37 deletions(-)
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 24ff17015f7..d592b551d95 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -1848,6 +1848,13 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
creating_job_id=self.job.id,
session=session,
)
+ asset_events = session.scalars(
+ select(AssetEvent).where(
+ PartitionedAssetKeyLog.asset_partition_dag_run_id ==
apdr.id,
+ PartitionedAssetKeyLog.asset_event_id == AssetEvent.id,
+ )
+ )
+ dag_run.consumed_asset_events.extend(asset_events)
session.flush()
apdr.created_dag_run_id = dag_run.id
session.flush()
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index a6ce9ade535..f48ad908e4d 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -8810,46 +8810,15 @@ def test_partitioned_dag_run_with_customized_mapper(
runner = SchedulerJobRunner(
job=Job(job_type=SchedulerJobRunner.job_type),
executors=[MockExecutor(do_update=False)]
)
-
- with dag_maker(dag_id="asset-event-producer", schedule=None,
session=session) as dag:
- EmptyOperator(task_id="hi", outlets=[asset_1])
-
- dr =
dag_maker.create_dagrun(partition_key="this-is-not-key-1-before-mapped",
session=session)
- [ti] = dr.get_task_instances(session=session)
- session.commit()
-
- serialized_outlets = dag.get_task("hi").outlets
with custom_partition_mapper_patch():
- TaskInstance.register_asset_changes_in_db(
- ti=ti,
- task_outlets=[o.asprofile() for o in serialized_outlets],
- outlet_events=[],
+ apdr = _produce_and_register_asset_event(
+ dag_id="asset-event-producer",
+ asset=asset_1,
+ partition_key="this-is-not-key-1-before-mapped",
session=session,
+ dag_maker=dag_maker,
+ expected_partition_key="key-1",
)
- session.commit()
-
- event = session.scalar(
- select(AssetEvent).where(
- AssetEvent.source_dag_id == dag.dag_id,
- AssetEvent.source_run_id == dr.run_id,
- )
- )
- assert event is not None
- assert event.partition_key == "this-is-not-key-1-before-mapped"
-
- apdr = session.scalar(
- select(AssetPartitionDagRun)
- .join(
- PartitionedAssetKeyLog,
- PartitionedAssetKeyLog.asset_partition_dag_run_id ==
AssetPartitionDagRun.id,
- )
- .where(PartitionedAssetKeyLog.asset_event_id == event.id)
- )
- assert apdr is not None
- assert apdr.created_dag_run_id is None
- assert apdr.partition_key == "key-1"
-
- with custom_partition_mapper_patch():
partition_dags =
runner._create_dagruns_for_partitioned_asset_dags(session=session)
session.refresh(apdr)
# Since asset event for Asset(name="asset-2") with key "key-1" has not yet
been created,
@@ -8858,6 +8827,13 @@ def test_partitioned_dag_run_with_customized_mapper(
assert len(partition_dags) == 1
assert partition_dags == {"asset-event-consumer"}
+ dag_run = session.scalar(select(DagRun).where(DagRun.id ==
apdr.created_dag_run_id))
+ assert dag_run is not None
+ asset_event = dag_run.consumed_asset_events[0]
+ assert asset_event.source_task_id == "hi"
+ assert asset_event.source_dag_id == "asset-event-producer"
+ assert asset_event.source_run_id == "test"
+
@pytest.mark.need_serialized_dag
@pytest.mark.usefixtures("clear_asset_partition_rows")
@@ -8931,6 +8907,13 @@ def test_consumer_dag_listen_to_two_partitioned_asset(
assert len(partition_dags) == 1
assert partition_dags == {"asset-event-consumer"}
+ dag_run = session.scalar(select(DagRun).where(DagRun.id ==
apdr.created_dag_run_id))
+ assert dag_run is not None
+ for asset_event in dag_run.consumed_asset_events:
+ assert asset_event.source_task_id == "hi"
+ assert "asset-event-producer-" in asset_event.source_dag_id
+ assert asset_event.source_run_id == "test"
+
@pytest.mark.need_serialized_dag
@pytest.mark.usefixtures("clear_asset_partition_rows")
@@ -8995,3 +8978,10 @@ def
test_consumer_dag_listen_to_two_partitioned_asset_with_key_1_mapper(
assert apdr.created_dag_run_id is not None
assert len(partition_dags) == 1
assert partition_dags == {"asset-event-consumer"}
+
+ dag_run = session.scalar(select(DagRun).where(DagRun.id ==
apdr.created_dag_run_id))
+ assert dag_run is not None
+ for asset_event in dag_run.consumed_asset_events:
+ assert asset_event.source_task_id == "hi"
+ assert "asset-event-producer-" in asset_event.source_dag_id
+ assert asset_event.source_run_id == "test"