potiuk commented on code in PR #66491:
URL: https://github.com/apache/airflow/pull/66491#discussion_r3215213646


##########
airflow-core/src/airflow/dag_processing/bundles/base.py:
##########
@@ -313,9 +329,15 @@ def path(self) -> Path:
         """
 
     @abstractmethod
-    def get_current_version(self) -> str | None:
+    def get_current_version(self) -> str | BundleVersion | None:

Review Comment:
   Heads up on the migration story: the in-tree `GitDagBundle` 
(`providers/git/src/airflow/providers/git/bundles/git.py:318`) has 
`supports_versioning = True` and returns a bare `str` from 
`get_current_version()`. Once this PR lands, every git-bundle deployment hits 
the deprecation path on each refresh tick.
   
   Two options worth considering:
   1. Migrate `GitDagBundle` in this PR — basically `return 
BundleVersion(version=repo.head.commit.hexsha)` at `git.py:322`. One-line 
change that makes the in-tree bundle a model citizen of the new API.
   2. If you'd rather keep this PR scoped, open a tracking issue and link it 
here so the deprecation noise is visible work, not just background log spam.



##########
airflow-core/src/airflow/models/dag_version.py:
##########
@@ -53,6 +53,7 @@ class DagVersion(Base):
     dag_model = relationship("DagModel", back_populates="dag_versions")
     bundle_name: Mapped[str | None] = mapped_column(StringID(), nullable=True)
     bundle_version: Mapped[str | None] = mapped_column(StringID(), 
nullable=True)
+    version_data: Mapped[dict | None] = mapped_column(sa.JSON, nullable=True)

Review Comment:
   Style nit: project convention everywhere else (and in this PR's own 
migration at line 25 of the alembic file) is `sa.JSON()` with parens. Both work 
— SQLAlchemy accepts both type classes and instances — but consistency:
   
   ```python
   version_data: Mapped[dict | None] = mapped_column(sa.JSON(), nullable=True)
   ```



##########
airflow-core/src/airflow/cli/commands/dag_command.py:
##########
@@ -740,4 +741,6 @@ def dag_reserialize(args, session: Session = NEW_SESSION) 
-> None:
             continue
         bundle.initialize()
         dag_bag = BundleDagBag(bundle.path, bundle_path=bundle.path, 
bundle_name=bundle.name)
-        sync_bag_to_db(dag_bag, bundle.name, 
bundle_version=bundle.get_current_version(), session=session)
+        result = bundle.get_current_version()
+        version = result.version if isinstance(result, BundleVersion) else 
result

Review Comment:
   The CLI re-implements the unpack inline and skips the `DeprecationWarning` 
that `DagFileProcessorManager._unpack_bundle_version` emits for legacy string 
returns. Two divergent code paths going forward (one warns, one doesn't), and a 
legacy bundle flowing through `dag reserialize` won't get the deprecation 
signal.
   
   Suggestion: lift `_unpack_bundle_version` to `BaseDagBundle` (e.g. as a 
`get_version_and_data()` instance method) or to module scope in `bundles.base`, 
and call it from both this CLI site and the manager.



##########
airflow-core/src/airflow/cli/commands/dag_command.py:
##########
@@ -740,4 +741,6 @@ def dag_reserialize(args, session: Session = NEW_SESSION) 
-> None:
             continue
         bundle.initialize()
         dag_bag = BundleDagBag(bundle.path, bundle_path=bundle.path, 
bundle_name=bundle.name)
-        sync_bag_to_db(dag_bag, bundle.name, 
bundle_version=bundle.get_current_version(), session=session)
+        result = bundle.get_current_version()

Review Comment:
   Test gap: this new unpack path (and its legacy-string fallback) isn't 
covered. The existing `_unpack_bundle_version` tests cover the helper itself, 
but not this CLI call site. Worth a small unit test of `dag_reserialize` 
against (a) a bundle returning `BundleVersion`, (b) a bundle returning a bare 
string — the path most likely to break silently if someone refactors the 
manager helper without remembering this CLI duplicate.



##########
airflow-core/src/airflow/dag_processing/manager.py:
##########
@@ -753,7 +782,9 @@ def _refresh_dag_bundles(self, known_files: dict[str, 
set[DagFileInfo]]):
             if bundle.supports_versioning:
                 # we will also check the version of the bundle to see if 
another DAG processor has seen
                 # a new version
-                pre_refresh_version = self._bundle_versions.get(bundle.name) 
or bundle.get_current_version()
+                pre_refresh_version = self._bundle_versions.get(bundle.name)
+                if pre_refresh_version is None:
+                    pre_refresh_version, _ = 
self._unpack_bundle_version(bundle.get_current_version(), bundle)

Review Comment:
   Subtle semantic change worth a one-line code comment: previous code was 
`self._bundle_versions.get(bundle.name) or bundle.get_current_version()` 
(falsy-check); new is `is None`-check. Functionally equivalent for the current 
`dict[str, str | None]` shape, but tightens the contract — an empty-string 
return from a bundle would now be cached in `_bundle_versions` as the "seen" 
version instead of being treated as missing. That's probably the right 
behaviour, but a comment explaining the deliberate switch would help future 
readers.



##########
airflow-core/src/airflow/dag_processing/bundles/base.py:
##########
@@ -229,6 +229,22 @@ def remove_stale_bundle_versions(self):
             
self._remove_stale_bundle_versions_for_bundle(bundle_name=bundle.name)
 
 
+@dataclass(frozen=True)
+class BundleVersion:
+    """
+    Version information returned by a DAG bundle.
+
+    Bundles return this from ``get_current_version()`` to provide both a 
version
+    identifier and optional structured data (e.g., a manifest) atomically.
+
+    :param version: A string identifier for this bundle version (e.g., git 
SHA, content hash).
+    :param data: Optional structured data associated with this version (e.g., 
S3 manifest).
+    """
+
+    version: str
+    data: dict | None = None

Review Comment:
   Two small things on this field:
   
   - `dict | None` is loose for a JSON-backed column. `dict[str, Any] | None` 
(or `Mapping[str, Any] | None`) reflects what your serialize/deserialize path 
actually constrains and helps callers reason about what's safe to put in here.
   - `frozen=True` freezes attributes but not contents — `bv.data["new_key"] = 
"..."` still works. Worth a one-line note in the docstring that mutating `data` 
after construction is undefined, or wrapping in `types.MappingProxyType` if you 
want true immutability.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to