This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 060eeb73d36 Rename latest_version to version in bundle orm / code 
clarification (#45720)
060eeb73d36 is described below

commit 060eeb73d3636f7ff692acb6c61c7ff948c3574f
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Thu Jan 16 15:24:08 2025 -0800

    Rename latest_version to version in bundle orm / code clarification (#45720)
    
    1. did a little work to try to make the code a little clearer in the dag 
processor where we compare versions.  Instead of current and new, which is 
confusing, I call it pre-refresh and after-refresh, essentially, which makes 
the logic a little more intelligible.  Also I provide a var `was seen` to show 
the intention of the `name in list` check.
    
    2. I propose (and do so here) renaming `latest_version` to `version` in 
bundle for reasons similar to https://github.com/apache/airflow/pull/45719.  I 
think it makes sense to think of the orm object as _itself_ representing the 
latest or current or last refreshed version. So the latest version cannot 
itself _have_ a latest version -- the latest version just has a version.  
That's sorta conceptually the issue.  In any event, by saying less, we can let 
the docs explain the nuance.
    
    ---------
    
    Co-authored-by: Jed Cunningham 
<66968678+jedcunning...@users.noreply.github.com>
---
 airflow/dag_processing/manager.py                  | 24 +++++++++++++---------
 .../versions/0050_3_0_0_add_dagbundlemodel.py      |  2 +-
 airflow/models/dagbundle.py                        |  4 ++--
 docs/apache-airflow/img/airflow_erd.sha256         |  2 +-
 docs/apache-airflow/img/airflow_erd.svg            |  6 +++---
 tests/dag_processing/test_manager.py               |  4 ++--
 6 files changed, 23 insertions(+), 19 deletions(-)

diff --git a/airflow/dag_processing/manager.py 
b/airflow/dag_processing/manager.py
index 8d2cf782349..4a2418a7f53 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -656,15 +656,16 @@ class DagFileProcessorManager:
                 bundle.initialize()
             # TODO: AIP-66 test to make sure we get a fresh record from the db 
and it's not cached
             with create_session() as session:
-                bundle_model = session.get(DagBundleModel, bundle.name)
+                bundle_model: DagBundleModel = session.get(DagBundleModel, 
bundle.name)
                 elapsed_time_since_refresh = (
                     now - (bundle_model.last_refreshed or timezone.utc_epoch())
                 ).total_seconds()
-                current_version = bundle.get_current_version()
+                pre_refresh_version = bundle.get_current_version()
+                previously_seen = bundle.name in self._bundle_versions
                 if (
                     elapsed_time_since_refresh < bundle.refresh_interval
-                    and bundle_model.latest_version == current_version
-                    and bundle.name in self._bundle_versions
+                    and bundle_model.version == pre_refresh_version
+                    and previously_seen
                 ):
                     self.log.info("Not time to refresh %s", bundle.name)
                     continue
@@ -677,17 +678,20 @@ class DagFileProcessorManager:
 
                 bundle_model.last_refreshed = now
 
-                new_version = bundle.get_current_version()
+                version_after_refresh = bundle.get_current_version()
                 if bundle.supports_versioning:
-                    # We can short-circuit the rest of the refresh if the 
version hasn't changed
-                    # and we've already fully "refreshed" this bundle before 
in this dag processor.
-                    if current_version == new_version and bundle.name in 
self._bundle_versions:
+                    # We can short-circuit the rest of this if (1) bundle was 
seen before by
+                    # this dag processor and (2) the version of the bundle did 
not change
+                    # after refreshing it
+                    if previously_seen and pre_refresh_version == 
version_after_refresh:
                         self.log.debug("Bundle %s version not changed after 
refresh", bundle.name)
                         continue
 
-                    bundle_model.latest_version = new_version
+                    bundle_model.version = version_after_refresh
 
-                    self.log.info("Version changed for %s, new version: %s", 
bundle.name, new_version)
+                    self.log.info(
+                        "Version changed for %s, new version: %s", 
bundle.name, version_after_refresh
+                    )
 
             bundle_file_paths = self._find_files_in_bundle(bundle)
 
diff --git a/airflow/migrations/versions/0050_3_0_0_add_dagbundlemodel.py 
b/airflow/migrations/versions/0050_3_0_0_add_dagbundlemodel.py
index 2a8a7a6d99e..054e6eb03c7 100644
--- a/airflow/migrations/versions/0050_3_0_0_add_dagbundlemodel.py
+++ b/airflow/migrations/versions/0050_3_0_0_add_dagbundlemodel.py
@@ -43,7 +43,7 @@ def upgrade():
         "dag_bundle",
         sa.Column("name", sa.String(length=250), nullable=False),
         sa.Column("active", sa.Boolean(), nullable=True),
-        sa.Column("latest_version", sa.String(length=200), nullable=True),
+        sa.Column("version", sa.String(length=200), nullable=True),
         sa.Column("last_refreshed", UtcDateTime(timezone=True), nullable=True),
         sa.PrimaryKeyConstraint("name", name=op.f("dag_bundle_pkey")),
     )
diff --git a/airflow/models/dagbundle.py b/airflow/models/dagbundle.py
index 08429db0b0b..b07e84f3bee 100644
--- a/airflow/models/dagbundle.py
+++ b/airflow/models/dagbundle.py
@@ -29,14 +29,14 @@ class DagBundleModel(Base):
     We track the following information about each bundle, as it can be useful 
for
     informational purposes and for debugging:
      - active: Is the bundle currently found in configuration?
-     - latest_version: The latest version Airflow has seen for the bundle.
+     - version: The latest version Airflow has seen for the bundle.
      - last_refreshed: When the bundle was last refreshed.
     """
 
     __tablename__ = "dag_bundle"
     name = Column(StringID(), primary_key=True)
     active = Column(Boolean, default=True)
-    latest_version = Column(String(200), nullable=True)
+    version = Column(String(200), nullable=True)
     last_refreshed = Column(UtcDateTime, nullable=True)
 
     def __init__(self, *, name: str):
diff --git a/docs/apache-airflow/img/airflow_erd.sha256 
b/docs/apache-airflow/img/airflow_erd.sha256
index 99c908819cc..5626cf9708b 100644
--- a/docs/apache-airflow/img/airflow_erd.sha256
+++ b/docs/apache-airflow/img/airflow_erd.sha256
@@ -1 +1 @@
-a877009126cad78bdd6336ac298e42d76dbb29dc88d5ecb9e5344f95dfe9c2b7
\ No newline at end of file
+cb858681fdc7a596db20c1c5dbf93812fd011a6df1e0b5322a49a51c8476bb93
\ No newline at end of file
diff --git a/docs/apache-airflow/img/airflow_erd.svg 
b/docs/apache-airflow/img/airflow_erd.svg
index 198c05c1147..3fa6369924c 100644
--- a/docs/apache-airflow/img/airflow_erd.svg
+++ b/docs/apache-airflow/img/airflow_erd.svg
@@ -1410,9 +1410,9 @@
 <text text-anchor="start" x="133.5" y="-1873.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
 <text text-anchor="start" x="138.5" y="-1873.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
 <polygon fill="none" stroke="black" points="30.5,-1839 30.5,-1864 282.5,-1864 
282.5,-1839 30.5,-1839"/>
-<text text-anchor="start" x="35.5" y="-1848.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">latest_version</text>
-<text text-anchor="start" x="132.5" y="-1848.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="137.5" y="-1848.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(200)]</text>
+<text text-anchor="start" x="35.5" y="-1848.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">version</text>
+<text text-anchor="start" x="87.5" y="-1848.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="92.5" y="-1848.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(200)]</text>
 </g>
 <!-- dag -->
 <g id="node24" class="node">
diff --git a/tests/dag_processing/test_manager.py 
b/tests/dag_processing/test_manager.py
index 1f10ed3eb1b..42e97512779 100644
--- a/tests/dag_processing/test_manager.py
+++ b/tests/dag_processing/test_manager.py
@@ -888,7 +888,7 @@ class TestDagFileProcessorManager:
                     # will believe another processor had seen a new version
                     with create_session() as session:
                         bundletwo_model = session.get(DagBundleModel, 
"bundletwo")
-                        bundletwo_model.latest_version = "123"
+                        bundletwo_model.version = "123"
 
                 bundletwo.refresh.side_effect = _update_bundletwo_version
                 manager = DagFileProcessorManager(max_runs=2)
@@ -922,7 +922,7 @@ class TestDagFileProcessorManager:
 
         with create_session() as session:
             model = session.get(DagBundleModel, "bundleone")
-            assert model.latest_version == "123"
+            assert model.version == "123"
 
 
 class TestDagFileProcessorAgent:

Reply via email to