uranusjr commented on code in PR #68389:
URL: https://github.com/apache/airflow/pull/68389#discussion_r3411830500
##########
airflow-core/src/airflow/models/serialized_dag.py:
##########
@@ -815,14 +834,22 @@ def read_all_dags(cls, *, session: Session = NEW_SESSION)
-> dict[str, Serialize
:param session: ORM Session
:returns: a dict of DAGs read from database
"""
- latest_serialized_dag_subquery = (
- select(cls.dag_id,
func.max(cls.created_at).label("max_created")).group_by(cls.dag_id).subquery()
+ # Pick the serialized DAG of the max version_number per dag_id
(deterministic and unique,
+ # unlike max(created_at) which can tie). Note: this also fixes the
join condition, which
+ # previously used Python ``and`` (short-circuiting to a single clause)
instead of ``&``.
+ latest_version_subquery = (
+ select(DagVersion.dag_id,
func.max(DagVersion.version_number).label("max_version"))
+ .join(cls, cls.dag_version_id == DagVersion.id)
+ .group_by(DagVersion.dag_id)
+ .subquery()
)
serialized_dags = session.scalars(
- select(cls).join(
- latest_serialized_dag_subquery,
- (cls.dag_id == latest_serialized_dag_subquery.c.dag_id)
- and (cls.created_at ==
latest_serialized_dag_subquery.c.max_created),
+ select(cls)
+ .join(DagVersion, cls.dag_version_id == DagVersion.id)
+ .join(
+ latest_version_subquery,
+ (DagVersion.dag_id == latest_version_subquery.c.dag_id)
+ & (DagVersion.version_number ==
latest_version_subquery.c.max_version),
)
)
Review Comment:
Not sure; it may be a good idea to extract this logic somewhere since it’s
now duplicated in the file in two places.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]