This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-0-test by this push:
new 24722ba6079 Propagate AssetEvent through context when Consumer DAG
triggering on AssetAlias (#50182) (#55534)
24722ba6079 is described below
commit 24722ba6079cd65e996366123f314c4584bed519
Author: Wei Lee <[email protected]>
AuthorDate: Sat Sep 13 07:05:46 2025 +0800
Propagate AssetEvent through context when Consumer DAG triggering on
AssetAlias (#50182) (#55534)
Co-authored-by: Kevin Yang <[email protected]>
---
.../src/airflow/jobs/scheduler_job_runner.py | 22 ++++--
airflow-core/tests/unit/jobs/test_scheduler_job.py | 86 +++++++++++++++++++++-
2 files changed, 100 insertions(+), 8 deletions(-)
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 9d7fedbb50c..787209426f9 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -32,7 +32,7 @@ from functools import lru_cache, partial
from itertools import groupby
from typing import TYPE_CHECKING, Any, Callable
-from sqlalchemy import and_, delete, desc, exists, func, inspect, select,
text, tuple_, update
+from sqlalchemy import and_, delete, desc, exists, func, inspect, or_, select,
text, tuple_, update
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import joinedload, lazyload, load_only, make_transient,
selectinload
from sqlalchemy.orm.attributes import NO_VALUE
@@ -51,9 +51,11 @@ from airflow.jobs.job import Job, perform_heartbeat
from airflow.models import Log
from airflow.models.asset import (
AssetActive,
+ AssetAliasModel,
AssetDagRunQueue,
AssetEvent,
AssetModel,
+ DagScheduleAssetAliasReference,
DagScheduleAssetReference,
TaskOutletAssetReference,
)
@@ -1639,14 +1641,22 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
)
.cte()
)
+
asset_events = session.scalars(
select(AssetEvent)
- .join(
- DagScheduleAssetReference,
- AssetEvent.asset_id == DagScheduleAssetReference.asset_id,
- )
.where(
- DagScheduleAssetReference.dag_id == dag.dag_id,
+ or_(
+ AssetEvent.asset_id.in_(
+ select(DagScheduleAssetReference.asset_id).where(
+ DagScheduleAssetReference.dag_id == dag.dag_id
+ )
+ ),
+ AssetEvent.source_aliases.any(
+ AssetAliasModel.consuming_dags.any(
+ DagScheduleAssetAliasReference.dag_id ==
dag.dag_id
+ )
+ ),
+ ),
AssetEvent.timestamp <= triggered_date,
AssetEvent.timestamp >
func.coalesce(cte.c.previous_dag_run_run_after, date.min),
)
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 058504880af..f4a8ab8ad66 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -48,7 +48,13 @@ from airflow.executors.executor_loader import ExecutorLoader
from airflow.executors.executor_utils import ExecutorName
from airflow.jobs.job import Job, run_job
from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
-from airflow.models.asset import AssetActive, AssetDagRunQueue, AssetEvent,
AssetModel
+from airflow.models.asset import (
+ AssetActive,
+ AssetAliasModel,
+ AssetDagRunQueue,
+ AssetEvent,
+ AssetModel,
+)
from airflow.models.backfill import Backfill, _create_backfill
from airflow.models.dag import DAG, DagModel
from airflow.models.dag_version import DagVersion
@@ -63,7 +69,7 @@ from airflow.models.taskinstance import TaskInstance
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk import task
-from airflow.sdk.definitions.asset import Asset
+from airflow.sdk.definitions.asset import Asset, AssetAlias
from airflow.serialization.serialized_objects import LazyDeserializedDAG,
SerializedDAG
from airflow.timetables.base import DataInterval
from airflow.traces.tracer import Trace
@@ -4110,6 +4116,82 @@ class TestSchedulerJob:
assert created_run.creating_job_id == scheduler_job.id
+ @pytest.mark.need_serialized_dag
+ def test_create_dag_runs_asset_alias_with_asset_event_attached(self,
session, dag_maker):
+ """
+ Test Dag Run trigger on AssetAlias includes the corresponding
AssetEvent in `consumed_asset_events`.
+ """
+
+ # Simulate an Asset created at runtime, and it is not an active asset
+ asset1 = Asset(uri="test://asset1", name="test_asset",
group="test_group")
+ # Create an AssetAlias, and the Asset will be attached to this
AssetAlias
+ asset_alias = AssetAlias(name="test_asset_alias_with_asset_event",
group="test_group")
+
+ # Add it to the DB so the event can be created from this Asset
+ asm = AssetModel(name=asset1.name, uri=asset1.uri, group=asset1.group)
+ session.add(asm)
+
+ asam = AssetAliasModel(name=asset_alias.name, group=asset_alias.group)
+
+ # Simulate a Producer dag attach an asset event at runtime to an
AssetAlias
+ # Don't use outlets here because the needs to associate an asset alias
with an asset event in the association table
+ with dag_maker(dag_id="asset-alias-producer",
start_date=timezone.utcnow(), session=session):
+ BashOperator(task_id="simulate-asset-alias-outlet",
bash_command="echo 1")
+ dr = dag_maker.create_dagrun(run_id="asset-alias-producer-run")
+
+ asset1_id =
session.query(AssetModel.id).filter_by(uri=asset1.uri).scalar()
+
+ # Create an AssetEvent, which is associated with the Asset, and it is
attached to the AssetAlias
+ event = AssetEvent(
+ asset_id=asset1_id,
+ source_task_id="simulate-asset-alias-outlet",
+ source_dag_id=dr.dag_id,
+ source_run_id=dr.run_id,
+ source_map_index=-1,
+ )
+ # Attach the Asset and the AssetEvent to the Asset Alias
+ asam.assets.append(asm)
+ asam.asset_events.append(event)
+
+ session.add_all([asam, event])
+ session.flush()
+
+ # Create the Consumer DAG and Trigger it with scheduler
+ with dag_maker(dag_id="asset-alias-consumer", schedule=[asset_alias]):
+ pass
+ consumer_dag = dag_maker.dag
+
+ session = dag_maker.session
+ session.add_all(
+ [
+ AssetDagRunQueue(asset_id=asset1_id,
target_dag_id=consumer_dag.dag_id),
+ ]
+ )
+ session.flush()
+
+ scheduler_job = Job(executor=self.null_exec)
+ self.job_runner = SchedulerJobRunner(job=scheduler_job)
+
+ with create_session() as session:
+ self.job_runner._create_dagruns_for_dags(session, session)
+
+ def dict_from_obj(obj):
+ """Get dict of column attrs from SqlAlchemy object."""
+ return {k.key: obj.__dict__.get(k) for k in
obj.__mapper__.column_attrs}
+
+ created_run = session.query(DagRun).filter(DagRun.dag_id ==
consumer_dag.dag_id).one()
+ assert created_run.state == State.QUEUED
+ assert created_run.start_date is None
+
+ # The AssetEvent should be included in the consumed_asset_events when
the consumer DAG is
+ # triggered on AssetAlias
+ assert list(map(dict_from_obj, created_run.consumed_asset_events)) ==
list(
+ map(dict_from_obj, [event])
+ )
+ assert created_run.data_interval_start is None
+ assert created_run.data_interval_end is None
+ assert created_run.creating_job_id == scheduler_job.id
+
@pytest.mark.need_serialized_dag
@pytest.mark.parametrize(
"disable, enable",