This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-0-test by this push:
     new 24722ba6079 Propagate AssetEvent through context when Consumer DAG 
triggering on AssetAlias (#50182) (#55534)
24722ba6079 is described below

commit 24722ba6079cd65e996366123f314c4584bed519
Author: Wei Lee <[email protected]>
AuthorDate: Sat Sep 13 07:05:46 2025 +0800

    Propagate AssetEvent through context when Consumer DAG triggering on 
AssetAlias (#50182) (#55534)
    
    Co-authored-by: Kevin Yang <[email protected]>
---
 .../src/airflow/jobs/scheduler_job_runner.py       | 22 ++++--
 airflow-core/tests/unit/jobs/test_scheduler_job.py | 86 +++++++++++++++++++++-
 2 files changed, 100 insertions(+), 8 deletions(-)

diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 9d7fedbb50c..787209426f9 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -32,7 +32,7 @@ from functools import lru_cache, partial
 from itertools import groupby
 from typing import TYPE_CHECKING, Any, Callable
 
-from sqlalchemy import and_, delete, desc, exists, func, inspect, select, 
text, tuple_, update
+from sqlalchemy import and_, delete, desc, exists, func, inspect, or_, select, 
text, tuple_, update
 from sqlalchemy.exc import OperationalError
 from sqlalchemy.orm import joinedload, lazyload, load_only, make_transient, 
selectinload
 from sqlalchemy.orm.attributes import NO_VALUE
@@ -51,9 +51,11 @@ from airflow.jobs.job import Job, perform_heartbeat
 from airflow.models import Log
 from airflow.models.asset import (
     AssetActive,
+    AssetAliasModel,
     AssetDagRunQueue,
     AssetEvent,
     AssetModel,
+    DagScheduleAssetAliasReference,
     DagScheduleAssetReference,
     TaskOutletAssetReference,
 )
@@ -1639,14 +1641,22 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                 )
                 .cte()
             )
+
             asset_events = session.scalars(
                 select(AssetEvent)
-                .join(
-                    DagScheduleAssetReference,
-                    AssetEvent.asset_id == DagScheduleAssetReference.asset_id,
-                )
                 .where(
-                    DagScheduleAssetReference.dag_id == dag.dag_id,
+                    or_(
+                        AssetEvent.asset_id.in_(
+                            select(DagScheduleAssetReference.asset_id).where(
+                                DagScheduleAssetReference.dag_id == dag.dag_id
+                            )
+                        ),
+                        AssetEvent.source_aliases.any(
+                            AssetAliasModel.consuming_dags.any(
+                                DagScheduleAssetAliasReference.dag_id == 
dag.dag_id
+                            )
+                        ),
+                    ),
                     AssetEvent.timestamp <= triggered_date,
                     AssetEvent.timestamp > 
func.coalesce(cte.c.previous_dag_run_run_after, date.min),
                 )
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 058504880af..f4a8ab8ad66 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -48,7 +48,13 @@ from airflow.executors.executor_loader import ExecutorLoader
 from airflow.executors.executor_utils import ExecutorName
 from airflow.jobs.job import Job, run_job
 from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
-from airflow.models.asset import AssetActive, AssetDagRunQueue, AssetEvent, 
AssetModel
+from airflow.models.asset import (
+    AssetActive,
+    AssetAliasModel,
+    AssetDagRunQueue,
+    AssetEvent,
+    AssetModel,
+)
 from airflow.models.backfill import Backfill, _create_backfill
 from airflow.models.dag import DAG, DagModel
 from airflow.models.dag_version import DagVersion
@@ -63,7 +69,7 @@ from airflow.models.taskinstance import TaskInstance
 from airflow.providers.standard.operators.bash import BashOperator
 from airflow.providers.standard.operators.empty import EmptyOperator
 from airflow.sdk import task
-from airflow.sdk.definitions.asset import Asset
+from airflow.sdk.definitions.asset import Asset, AssetAlias
 from airflow.serialization.serialized_objects import LazyDeserializedDAG, 
SerializedDAG
 from airflow.timetables.base import DataInterval
 from airflow.traces.tracer import Trace
@@ -4110,6 +4116,82 @@ class TestSchedulerJob:
 
         assert created_run.creating_job_id == scheduler_job.id
 
+    @pytest.mark.need_serialized_dag
+    def test_create_dag_runs_asset_alias_with_asset_event_attached(self, 
session, dag_maker):
+        """
+        Test Dag Run trigger on AssetAlias includes the corresponding 
AssetEvent in `consumed_asset_events`.
+        """
+
+        # Simulate an Asset created at runtime, and it is not an active asset
+        asset1 = Asset(uri="test://asset1", name="test_asset", 
group="test_group")
+        # Create an AssetAlias, and the Asset will be attached to this 
AssetAlias
+        asset_alias = AssetAlias(name="test_asset_alias_with_asset_event", 
group="test_group")
+
+        # Add it to the DB so the event can be created from this Asset
+        asm = AssetModel(name=asset1.name, uri=asset1.uri, group=asset1.group)
+        session.add(asm)
+
+        asam = AssetAliasModel(name=asset_alias.name, group=asset_alias.group)
+
+        # Simulate a Producer dag attach an asset event at runtime to an 
AssetAlias
+        # Don't use outlets here because the needs to associate an asset alias 
with an asset event in the association table
+        with dag_maker(dag_id="asset-alias-producer", 
start_date=timezone.utcnow(), session=session):
+            BashOperator(task_id="simulate-asset-alias-outlet", 
bash_command="echo 1")
+        dr = dag_maker.create_dagrun(run_id="asset-alias-producer-run")
+
+        asset1_id = 
session.query(AssetModel.id).filter_by(uri=asset1.uri).scalar()
+
+        # Create an AssetEvent, which is associated with the Asset, and it is 
attached to the AssetAlias
+        event = AssetEvent(
+            asset_id=asset1_id,
+            source_task_id="simulate-asset-alias-outlet",
+            source_dag_id=dr.dag_id,
+            source_run_id=dr.run_id,
+            source_map_index=-1,
+        )
+        # Attach the Asset and the AssetEvent to the Asset Alias
+        asam.assets.append(asm)
+        asam.asset_events.append(event)
+
+        session.add_all([asam, event])
+        session.flush()
+
+        # Create the Consumer DAG and Trigger it with scheduler
+        with dag_maker(dag_id="asset-alias-consumer", schedule=[asset_alias]):
+            pass
+        consumer_dag = dag_maker.dag
+
+        session = dag_maker.session
+        session.add_all(
+            [
+                AssetDagRunQueue(asset_id=asset1_id, 
target_dag_id=consumer_dag.dag_id),
+            ]
+        )
+        session.flush()
+
+        scheduler_job = Job(executor=self.null_exec)
+        self.job_runner = SchedulerJobRunner(job=scheduler_job)
+
+        with create_session() as session:
+            self.job_runner._create_dagruns_for_dags(session, session)
+
+        def dict_from_obj(obj):
+            """Get dict of column attrs from SqlAlchemy object."""
+            return {k.key: obj.__dict__.get(k) for k in 
obj.__mapper__.column_attrs}
+
+        created_run = session.query(DagRun).filter(DagRun.dag_id == 
consumer_dag.dag_id).one()
+        assert created_run.state == State.QUEUED
+        assert created_run.start_date is None
+
+        # The AssetEvent should be included in the consumed_asset_events when 
the consumer DAG is
+        # triggered on AssetAlias
+        assert list(map(dict_from_obj, created_run.consumed_asset_events)) == 
list(
+            map(dict_from_obj, [event])
+        )
+        assert created_run.data_interval_start is None
+        assert created_run.data_interval_end is None
+        assert created_run.creating_job_id == scheduler_job.id
+
     @pytest.mark.need_serialized_dag
     @pytest.mark.parametrize(
         "disable, enable",

Reply via email to