Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-13 Thread via GitHub


ephraimbuddy merged PR #45532:
URL: https://github.com/apache/airflow/pull/45532


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-11 Thread via GitHub


jedcunningham closed pull request #45532: AIP-66: Add support for parsing DAG 
bundles 
URL: https://github.com/apache/airflow/pull/45532


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-11 Thread via GitHub


jedcunningham closed pull request #45532: AIP-66: Add support for parsing DAG 
bundles 
URL: https://github.com/apache/airflow/pull/45532


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-11 Thread via GitHub


jedcunningham closed pull request #45532: AIP-66: Add support for parsing DAG 
bundles 
URL: https://github.com/apache/airflow/pull/45532


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-11 Thread via GitHub


jedcunningham closed pull request #45532: AIP-66: Add support for parsing DAG 
bundles 
URL: https://github.com/apache/airflow/pull/45532


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-11 Thread via GitHub


jedcunningham closed pull request #45532: AIP-66: Add support for parsing DAG 
bundles 
URL: https://github.com/apache/airflow/pull/45532


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-11 Thread via GitHub


jedcunningham closed pull request #45532: AIP-66: Add support for parsing DAG 
bundles 
URL: https://github.com/apache/airflow/pull/45532


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-11 Thread via GitHub


jedcunningham closed pull request #45532: AIP-66: Add support for parsing DAG 
bundles 
URL: https://github.com/apache/airflow/pull/45532


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-10 Thread via GitHub


jedcunningham closed pull request #45532: AIP-66: Add support for parsing DAG 
bundles 
URL: https://github.com/apache/airflow/pull/45532


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-10 Thread via GitHub


jedcunningham closed pull request #45532: AIP-66: Add support for parsing DAG 
bundles 
URL: https://github.com/apache/airflow/pull/45532


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-10 Thread via GitHub


jedcunningham closed pull request #45532: AIP-66: Add support for parsing DAG 
bundles 
URL: https://github.com/apache/airflow/pull/45532


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-10 Thread via GitHub


jedcunningham closed pull request #45532: AIP-66: Add support for parsing DAG 
bundles 
URL: https://github.com/apache/airflow/pull/45532


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-10 Thread via GitHub


jedcunningham closed pull request #45532: AIP-66: Add support for parsing DAG 
bundles 
URL: https://github.com/apache/airflow/pull/45532


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-10 Thread via GitHub


jedcunningham closed pull request #45532: AIP-66: Add support for parsing DAG 
bundles 
URL: https://github.com/apache/airflow/pull/45532


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-10 Thread via GitHub


jedcunningham closed pull request #45532: AIP-66: Add support for parsing DAG 
bundles 
URL: https://github.com/apache/airflow/pull/45532


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-09 Thread via GitHub


jedcunningham closed pull request #45532: AIP-66: Add support for parsing DAG 
bundles 
URL: https://github.com/apache/airflow/pull/45532


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-09 Thread via GitHub


jedcunningham closed pull request #45532: AIP-66: Add support for parsing DAG 
bundles 
URL: https://github.com/apache/airflow/pull/45532


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-09 Thread via GitHub


jedcunningham closed pull request #45532: AIP-66: Add support for parsing DAG 
bundles 
URL: https://github.com/apache/airflow/pull/45532


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-09 Thread via GitHub


jedcunningham closed pull request #45532: AIP-66: Add support for parsing DAG 
bundles 
URL: https://github.com/apache/airflow/pull/45532


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-09 Thread via GitHub


jedcunningham merged PR #45371:
URL: https://github.com/apache/airflow/pull/45371


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-09 Thread via GitHub


jedcunningham commented on code in PR #45371:
URL: https://github.com/apache/airflow/pull/45371#discussion_r1909160053


##
airflow/models/dag.py:
##
@@ -1873,7 +1887,10 @@ def sync_to_db(self, session=NEW_SESSION):
 
 :return: None
 """
-self.bulk_write_to_db([self], session=session)
+# TODO: AIP-66 should this be in the model?

Review Comment:
   We can look at this later 👍.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-08 Thread via GitHub


jedcunningham closed pull request #45371: AIP-66: Add support for parsing DAG 
bundles
URL: https://github.com/apache/airflow/pull/45371


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-07 Thread via GitHub


jedcunningham commented on code in PR #45371:
URL: https://github.com/apache/airflow/pull/45371#discussion_r1906473784


##
airflow/dag_processing/collection.py:
##
@@ -74,11 +74,14 @@
 log = logging.getLogger(__name__)
 
 
-def _create_orm_dags(dags: Iterable[MaybeSerializedDAG], *, session: Session) 
-> Iterator[DagModel]:
+def _create_orm_dags(
+bundle_name: str, dags: Iterable[MaybeSerializedDAG], *, session: Session

Review Comment:
   I guess it's not much different than fileloc. Likely it was the way I was 
populating it on the dags - I was probably not doing it in the ideal spot.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-07 Thread via GitHub


dstandish commented on code in PR #45371:
URL: https://github.com/apache/airflow/pull/45371#discussion_r1906162355


##
airflow/dag_processing/manager.py:
##
@@ -631,25 +646,60 @@ def _get_priority_filelocs(cls, session: Session = 
NEW_SESSION):
 session.delete(request)
 return filelocs
 
-def _refresh_dag_dir(self) -> bool:
-"""Refresh file paths from dag dir if we haven't done it for too 
long."""
-now = time.monotonic()
-elapsed_time_since_refresh = now - self.last_dag_dir_refresh_time
-if elapsed_time_since_refresh <= self.dag_dir_list_interval:
-return False
+def _refresh_dag_bundles(self):
+"""Refresh DAG bundles, if required."""
+now = timezone.utcnow()
 
-# Build up a list of Python files that could contain DAGs
-self.log.info("Searching for files in %s", self._dag_directory)
-self._file_paths = list_py_file_paths(self._dag_directory)
-self.last_dag_dir_refresh_time = now
-self.log.info("There are %s files in %s", len(self._file_paths), 
self._dag_directory)
-self.set_file_paths(self._file_paths)
+self.log.info("Refreshing DAG bundles")
+
+for bundle in self._dag_bundles:

Review Comment:
   sure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-07 Thread via GitHub


dstandish commented on code in PR #45371:
URL: https://github.com/apache/airflow/pull/45371#discussion_r1906162084


##
airflow/dag_processing/collection.py:
##
@@ -74,11 +74,14 @@
 log = logging.getLogger(__name__)
 
 
-def _create_orm_dags(dags: Iterable[MaybeSerializedDAG], *, session: Session) 
-> Iterator[DagModel]:
+def _create_orm_dags(
+bundle_name: str, dags: Iterable[MaybeSerializedDAG], *, session: Session

Review Comment:
   why does it feel weird?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-07 Thread via GitHub


jedcunningham commented on code in PR #45371:
URL: https://github.com/apache/airflow/pull/45371#discussion_r1906150874


##
airflow/dag_processing/collection.py:
##
@@ -74,11 +74,14 @@
 log = logging.getLogger(__name__)
 
 
-def _create_orm_dags(dags: Iterable[MaybeSerializedDAG], *, session: Session) 
-> Iterator[DagModel]:
+def _create_orm_dags(
+bundle_name: str, dags: Iterable[MaybeSerializedDAG], *, session: Session

Review Comment:
   An earlier POC had that, but shoving it on a `DAG` (or derivative object) 
felt a little weird to me. But definitely open to refactors here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-07 Thread via GitHub


jedcunningham commented on code in PR #45371:
URL: https://github.com/apache/airflow/pull/45371#discussion_r1906149582


##
airflow/dag_processing/manager.py:
##
@@ -99,6 +102,13 @@ class DagFileStat:
 log = logging.getLogger("airflow.processor_manager")
 
 
+class DagFileInfo(NamedTuple):
+"""Information about a DAG file."""
+
+path: str

Review Comment:
   Left a comment, but there will be some further work in this area so I'll try 
and make sure it's more obvious then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-07 Thread via GitHub


jedcunningham commented on code in PR #45371:
URL: https://github.com/apache/airflow/pull/45371#discussion_r1906146290


##
airflow/dag_processing/manager.py:
##
@@ -572,6 +583,7 @@ def _read_from_direct_scheduler_conn(self, conn: 
MultiprocessingConnection) -> b
 
 def _refresh_requested_filelocs(self) -> None:
 """Refresh filepaths from dag dir as requested by users via APIs."""
+return  # TODO: AIP-66 make bundle aware!

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-07 Thread via GitHub


jedcunningham commented on code in PR #45371:
URL: https://github.com/apache/airflow/pull/45371#discussion_r1906146200


##
airflow/dag_processing/manager.py:
##
@@ -631,25 +646,60 @@ def _get_priority_filelocs(cls, session: Session = 
NEW_SESSION):
 session.delete(request)
 return filelocs
 
-def _refresh_dag_dir(self) -> bool:
-"""Refresh file paths from dag dir if we haven't done it for too 
long."""
-now = time.monotonic()
-elapsed_time_since_refresh = now - self.last_dag_dir_refresh_time
-if elapsed_time_since_refresh <= self.dag_dir_list_interval:
-return False
+def _refresh_dag_bundles(self):
+"""Refresh DAG bundles, if required."""
+now = timezone.utcnow()
 
-# Build up a list of Python files that could contain DAGs
-self.log.info("Searching for files in %s", self._dag_directory)
-self._file_paths = list_py_file_paths(self._dag_directory)
-self.last_dag_dir_refresh_time = now
-self.log.info("There are %s files in %s", len(self._file_paths), 
self._dag_directory)
-self.set_file_paths(self._file_paths)
+self.log.info("Refreshing DAG bundles")
+
+for bundle in self._dag_bundles:

Review Comment:
   I'm slightly hesitant to refactor that right now. Let's do a follow up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-07 Thread via GitHub


jedcunningham commented on code in PR #45371:
URL: https://github.com/apache/airflow/pull/45371#discussion_r1906145684


##
airflow/dag_processing/manager.py:
##
@@ -631,25 +646,60 @@ def _get_priority_filelocs(cls, session: Session = 
NEW_SESSION):
 session.delete(request)
 return filelocs
 
-def _refresh_dag_dir(self) -> bool:
-"""Refresh file paths from dag dir if we haven't done it for too 
long."""
-now = time.monotonic()
-elapsed_time_since_refresh = now - self.last_dag_dir_refresh_time
-if elapsed_time_since_refresh <= self.dag_dir_list_interval:
-return False
+def _refresh_dag_bundles(self):
+"""Refresh DAG bundles, if required."""
+now = timezone.utcnow()
 
-# Build up a list of Python files that could contain DAGs
-self.log.info("Searching for files in %s", self._dag_directory)
-self._file_paths = list_py_file_paths(self._dag_directory)
-self.last_dag_dir_refresh_time = now
-self.log.info("There are %s files in %s", len(self._file_paths), 
self._dag_directory)
-self.set_file_paths(self._file_paths)
+self.log.info("Refreshing DAG bundles")
+
+for bundle in self._dag_bundles:
+# TODO: AIP-66 test to make sure we get a fresh record from the db 
and it's not cached
+with create_session() as session:
+bundle_model = session.query(DagBundleModel).get(bundle.name)
+elapsed_time_since_refresh = (
+now - (bundle_model.last_refreshed or timezone.utc_epoch())
+).total_seconds()
+if not elapsed_time_since_refresh > bundle.refresh_interval:
+# or bundle_model.version != bundle.get_current_version():
+self.log.info("Not time to refresh %s", bundle.name)
+continue
 
-try:
-self.log.debug("Removing old import errors")
-self.clear_nonexistent_import_errors()
-except Exception:
-self.log.exception("Error removing old import errors")
+# TODO: AIP-66 locking / dealing with multiple processors

Review Comment:
   I added another check that I think covers it for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-07 Thread via GitHub


jedcunningham commented on code in PR #45371:
URL: https://github.com/apache/airflow/pull/45371#discussion_r1906145369


##
airflow/dag_processing/manager.py:
##
@@ -631,25 +646,60 @@ def _get_priority_filelocs(cls, session: Session = 
NEW_SESSION):
 session.delete(request)
 return filelocs
 
-def _refresh_dag_dir(self) -> bool:
-"""Refresh file paths from dag dir if we haven't done it for too 
long."""
-now = time.monotonic()
-elapsed_time_since_refresh = now - self.last_dag_dir_refresh_time
-if elapsed_time_since_refresh <= self.dag_dir_list_interval:
-return False
+def _refresh_dag_bundles(self):
+"""Refresh DAG bundles, if required."""
+now = timezone.utcnow()
 
-# Build up a list of Python files that could contain DAGs
-self.log.info("Searching for files in %s", self._dag_directory)
-self._file_paths = list_py_file_paths(self._dag_directory)
-self.last_dag_dir_refresh_time = now
-self.log.info("There are %s files in %s", len(self._file_paths), 
self._dag_directory)
-self.set_file_paths(self._file_paths)
+self.log.info("Refreshing DAG bundles")
+
+for bundle in self._dag_bundles:
+# TODO: AIP-66 test to make sure we get a fresh record from the db 
and it's not cached
+with create_session() as session:
+bundle_model = session.query(DagBundleModel).get(bundle.name)
+elapsed_time_since_refresh = (
+now - (bundle_model.last_refreshed or timezone.utc_epoch())
+).total_seconds()
+if not elapsed_time_since_refresh > bundle.refresh_interval:
+# or bundle_model.version != bundle.get_current_version():
+self.log.info("Not time to refresh %s", bundle.name)
+continue
 
-try:
-self.log.debug("Removing old import errors")
-self.clear_nonexistent_import_errors()
-except Exception:
-self.log.exception("Error removing old import errors")
+# TODO: AIP-66 locking / dealing with multiple processors
+self.log.info("Time to refresh %s", bundle.name)
+old_version = bundle.get_current_version()
+bundle.refresh()
+bundle_model.last_refreshed = now
+
+if old_version != bundle.get_current_version():
+self.log.info(
+"Version changed for %s, new version: %s", bundle.name, 
bundle.get_current_version()
+)
+bundle_file_paths = self._refresh_dag_dir(bundle)
+# remove all files from the bundle, then add the new ones
+self._file_paths = [f for f in self._file_paths if f.bundle_name 
!= bundle_model.name]
+self._file_paths.extend(
+DagFileInfo(path=path, bundle_name=bundle_model.name) for path 
in bundle_file_paths
+)
+
+try:
+self.log.debug("Removing old import errors")
+self.clear_nonexistent_import_errors()
+except Exception:
+self.log.exception("Error removing old import errors")
+
+self._bundle_versions[bundle_model.name] = 
bundle.get_current_version()
+self.log.info("Found %s files for bundle %s", 
len(bundle_file_paths), bundle.name)
+# TODO: AIP-66 detect if version changed and update accordingly
+
+def _refresh_dag_dir(self, bundle: BaseDagBundle) -> list[str]:

Review Comment:
   No, not really :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-07 Thread via GitHub


jedcunningham commented on code in PR #45371:
URL: https://github.com/apache/airflow/pull/45371#discussion_r1906078545


##
airflow/dag_processing/manager.py:
##
@@ -631,25 +646,60 @@ def _get_priority_filelocs(cls, session: Session = 
NEW_SESSION):
 session.delete(request)
 return filelocs
 
-def _refresh_dag_dir(self) -> bool:
-"""Refresh file paths from dag dir if we haven't done it for too 
long."""
-now = time.monotonic()
-elapsed_time_since_refresh = now - self.last_dag_dir_refresh_time
-if elapsed_time_since_refresh <= self.dag_dir_list_interval:
-return False
+def _refresh_dag_bundles(self):
+"""Refresh DAG bundles, if required."""
+now = timezone.utcnow()
 
-# Build up a list of Python files that could contain DAGs
-self.log.info("Searching for files in %s", self._dag_directory)
-self._file_paths = list_py_file_paths(self._dag_directory)
-self.last_dag_dir_refresh_time = now
-self.log.info("There are %s files in %s", len(self._file_paths), 
self._dag_directory)
-self.set_file_paths(self._file_paths)
+self.log.info("Refreshing DAG bundles")
+
+for bundle in self._dag_bundles:
+# TODO: AIP-66 test to make sure we get a fresh record from the db 
and it's not cached
+with create_session() as session:
+bundle_model = session.query(DagBundleModel).get(bundle.name)

Review Comment:
   This does get a single instance, but it's sqla 1.4 style, which is now 
deprecated. Muscle memory for me :)



##
airflow/dag_processing/manager.py:
##
@@ -631,25 +646,60 @@ def _get_priority_filelocs(cls, session: Session = 
NEW_SESSION):
 session.delete(request)
 return filelocs
 
-def _refresh_dag_dir(self) -> bool:
-"""Refresh file paths from dag dir if we haven't done it for too 
long."""
-now = time.monotonic()
-elapsed_time_since_refresh = now - self.last_dag_dir_refresh_time
-if elapsed_time_since_refresh <= self.dag_dir_list_interval:
-return False
+def _refresh_dag_bundles(self):
+"""Refresh DAG bundles, if required."""
+now = timezone.utcnow()
 
-# Build up a list of Python files that could contain DAGs
-self.log.info("Searching for files in %s", self._dag_directory)
-self._file_paths = list_py_file_paths(self._dag_directory)
-self.last_dag_dir_refresh_time = now
-self.log.info("There are %s files in %s", len(self._file_paths), 
self._dag_directory)
-self.set_file_paths(self._file_paths)
+self.log.info("Refreshing DAG bundles")
+
+for bundle in self._dag_bundles:
+# TODO: AIP-66 test to make sure we get a fresh record from the db 
and it's not cached
+with create_session() as session:
+bundle_model = session.query(DagBundleModel).get(bundle.name)

Review Comment:
   Switching this up, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-07 Thread via GitHub


dstandish commented on code in PR #45371:
URL: https://github.com/apache/airflow/pull/45371#discussion_r1905828344


##
airflow/dag_processing/manager.py:
##
@@ -631,25 +646,60 @@ def _get_priority_filelocs(cls, session: Session = 
NEW_SESSION):
 session.delete(request)
 return filelocs
 
-def _refresh_dag_dir(self) -> bool:
-"""Refresh file paths from dag dir if we haven't done it for too 
long."""
-now = time.monotonic()
-elapsed_time_since_refresh = now - self.last_dag_dir_refresh_time
-if elapsed_time_since_refresh <= self.dag_dir_list_interval:
-return False
+def _refresh_dag_bundles(self):
+"""Refresh DAG bundles, if required."""
+now = timezone.utcnow()
 
-# Build up a list of Python files that could contain DAGs
-self.log.info("Searching for files in %s", self._dag_directory)
-self._file_paths = list_py_file_paths(self._dag_directory)
-self.last_dag_dir_refresh_time = now
-self.log.info("There are %s files in %s", len(self._file_paths), 
self._dag_directory)
-self.set_file_paths(self._file_paths)
+self.log.info("Refreshing DAG bundles")
+
+for bundle in self._dag_bundles:
+# TODO: AIP-66 test to make sure we get a fresh record from the db 
and it's not cached
+with create_session() as session:
+bundle_model = session.query(DagBundleModel).get(bundle.name)
+elapsed_time_since_refresh = (
+now - (bundle_model.last_refreshed or timezone.utc_epoch())
+).total_seconds()
+if not elapsed_time_since_refresh > bundle.refresh_interval:
+# or bundle_model.version != bundle.get_current_version():
+self.log.info("Not time to refresh %s", bundle.name)
+continue
 
-try:
-self.log.debug("Removing old import errors")
-self.clear_nonexistent_import_errors()
-except Exception:
-self.log.exception("Error removing old import errors")
+# TODO: AIP-66 locking / dealing with multiple processors
+self.log.info("Time to refresh %s", bundle.name)
+old_version = bundle.get_current_version()
+bundle.refresh()
+bundle_model.last_refreshed = now
+
+if old_version != bundle.get_current_version():
+self.log.info(
+"Version changed for %s, new version: %s", bundle.name, 
bundle.get_current_version()
+)
+bundle_file_paths = self._refresh_dag_dir(bundle)
+# remove all files from the bundle, then add the new ones
+self._file_paths = [f for f in self._file_paths if f.bundle_name 
!= bundle_model.name]
+self._file_paths.extend(
+DagFileInfo(path=path, bundle_name=bundle_model.name) for path 
in bundle_file_paths
+)
+
+try:
+self.log.debug("Removing old import errors")
+self.clear_nonexistent_import_errors()
+except Exception:
+self.log.exception("Error removing old import errors")
+
+self._bundle_versions[bundle_model.name] = 
bundle.get_current_version()
+self.log.info("Found %s files for bundle %s", 
len(bundle_file_paths), bundle.name)
+# TODO: AIP-66 detect if version changed and update accordingly
+
+def _refresh_dag_dir(self, bundle: BaseDagBundle) -> list[str]:

Review Comment:
   seems like some duplicative logging in this function
   
   is the name of this function still good, in a dag bundle world?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-07 Thread via GitHub


dstandish commented on code in PR #45371:
URL: https://github.com/apache/airflow/pull/45371#discussion_r1905827238


##
airflow/dag_processing/manager.py:
##
@@ -631,25 +646,60 @@ def _get_priority_filelocs(cls, session: Session = 
NEW_SESSION):
 session.delete(request)
 return filelocs
 
-def _refresh_dag_dir(self) -> bool:
-"""Refresh file paths from dag dir if we haven't done it for too 
long."""
-now = time.monotonic()
-elapsed_time_since_refresh = now - self.last_dag_dir_refresh_time
-if elapsed_time_since_refresh <= self.dag_dir_list_interval:
-return False
+def _refresh_dag_bundles(self):
+"""Refresh DAG bundles, if required."""
+now = timezone.utcnow()
 
-# Build up a list of Python files that could contain DAGs
-self.log.info("Searching for files in %s", self._dag_directory)
-self._file_paths = list_py_file_paths(self._dag_directory)
-self.last_dag_dir_refresh_time = now
-self.log.info("There are %s files in %s", len(self._file_paths), 
self._dag_directory)
-self.set_file_paths(self._file_paths)
+self.log.info("Refreshing DAG bundles")
+
+for bundle in self._dag_bundles:

Review Comment:
   this section is very long (the scope of each loop) and it's hard to 
understand what it's really doing.  
   
   for sure, you can leave it if you want. it won't be the only place in the 
codebase like this.
   
   but if you want to make it more easily understandable, one way to do it 
might be to make some inner functions that do specific things, and call them. 
or there might be some other means.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-07 Thread via GitHub


dstandish commented on code in PR #45371:
URL: https://github.com/apache/airflow/pull/45371#discussion_r1905824226


##
airflow/dag_processing/manager.py:
##
@@ -631,25 +646,60 @@ def _get_priority_filelocs(cls, session: Session = 
NEW_SESSION):
 session.delete(request)
 return filelocs
 
-def _refresh_dag_dir(self) -> bool:
-"""Refresh file paths from dag dir if we haven't done it for too 
long."""
-now = time.monotonic()
-elapsed_time_since_refresh = now - self.last_dag_dir_refresh_time
-if elapsed_time_since_refresh <= self.dag_dir_list_interval:
-return False
+def _refresh_dag_bundles(self):
+"""Refresh DAG bundles, if required."""
+now = timezone.utcnow()
 
-# Build up a list of Python files that could contain DAGs
-self.log.info("Searching for files in %s", self._dag_directory)
-self._file_paths = list_py_file_paths(self._dag_directory)
-self.last_dag_dir_refresh_time = now
-self.log.info("There are %s files in %s", len(self._file_paths), 
self._dag_directory)
-self.set_file_paths(self._file_paths)
+self.log.info("Refreshing DAG bundles")
+
+for bundle in self._dag_bundles:
+# TODO: AIP-66 test to make sure we get a fresh record from the db 
and it's not cached
+with create_session() as session:
+bundle_model = session.query(DagBundleModel).get(bundle.name)
+elapsed_time_since_refresh = (
+now - (bundle_model.last_refreshed or timezone.utc_epoch())
+).total_seconds()
+if not elapsed_time_since_refresh > bundle.refresh_interval:
+# or bundle_model.version != bundle.get_current_version():
+self.log.info("Not time to refresh %s", bundle.name)
+continue
 
-try:
-self.log.debug("Removing old import errors")
-self.clear_nonexistent_import_errors()
-except Exception:
-self.log.exception("Error removing old import errors")
+# TODO: AIP-66 locking / dealing with multiple processors

Review Comment:
   not sure exactly what this todo means



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-07 Thread via GitHub


dstandish commented on code in PR #45371:
URL: https://github.com/apache/airflow/pull/45371#discussion_r1905823700


##
airflow/dag_processing/manager.py:
##
@@ -631,25 +646,60 @@ def _get_priority_filelocs(cls, session: Session = 
NEW_SESSION):
 session.delete(request)
 return filelocs
 
-def _refresh_dag_dir(self) -> bool:
-"""Refresh file paths from dag dir if we haven't done it for too 
long."""
-now = time.monotonic()
-elapsed_time_since_refresh = now - self.last_dag_dir_refresh_time
-if elapsed_time_since_refresh <= self.dag_dir_list_interval:
-return False
+def _refresh_dag_bundles(self):
+"""Refresh DAG bundles, if required."""
+now = timezone.utcnow()
 
-# Build up a list of Python files that could contain DAGs
-self.log.info("Searching for files in %s", self._dag_directory)
-self._file_paths = list_py_file_paths(self._dag_directory)
-self.last_dag_dir_refresh_time = now
-self.log.info("There are %s files in %s", len(self._file_paths), 
self._dag_directory)
-self.set_file_paths(self._file_paths)
+self.log.info("Refreshing DAG bundles")
+
+for bundle in self._dag_bundles:
+# TODO: AIP-66 test to make sure we get a fresh record from the db 
and it's not cached
+with create_session() as session:
+bundle_model = session.query(DagBundleModel).get(bundle.name)

Review Comment:
   Is this querying all of them? i'm not sure. but, you can just get an object 
from it's PK by doing `session.get(DagBundleModel, bundle.name)` i think



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-07 Thread via GitHub


dstandish commented on code in PR #45371:
URL: https://github.com/apache/airflow/pull/45371#discussion_r1905817946


##
airflow/dag_processing/manager.py:
##
@@ -99,6 +102,13 @@ class DagFileStat:
 log = logging.getLogger("airflow.processor_manager")
 
 
+class DagFileInfo(NamedTuple):
+"""Information about a DAG file."""
+
+path: str

Review Comment:
   we should always try, somehow, whether through the variable name, or through 
docstring, to clarify whether this path is an absolute path, or a path relative 
to the bundle root.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-07 Thread via GitHub


dstandish commented on code in PR #45371:
URL: https://github.com/apache/airflow/pull/45371#discussion_r1905807565


##
airflow/dag_processing/manager.py:
##
@@ -572,6 +583,7 @@ def _read_from_direct_scheduler_conn(self, conn: 
MultiprocessingConnection) -> b
 
 def _refresh_requested_filelocs(self) -> None:
 """Refresh filepaths from dag dir as requested by users via APIs."""
+return  # TODO: AIP-66 make bundle aware!

Review Comment:
   this todo should have more info about what it means.  i don't know what 
you're talking about, and you might forget :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-07 Thread via GitHub


dstandish commented on code in PR #45371:
URL: https://github.com/apache/airflow/pull/45371#discussion_r1905804851


##
airflow/models/dag.py:
##
@@ -1873,7 +1887,10 @@ def sync_to_db(self, session=NEW_SESSION):
 
 :return: None
 """
-self.bulk_write_to_db([self], session=session)
+# TODO: AIP-66 should this be in the model?

Review Comment:
   you mean like, on the dag object, so we don't have to pass it along 
everywhere? yes, i think so



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-07 Thread via GitHub


dstandish commented on code in PR #45371:
URL: https://github.com/apache/airflow/pull/45371#discussion_r1905803723


##
airflow/models/dag.py:
##
@@ -1847,7 +1859,9 @@ def bulk_write_to_db(
 from airflow.dag_processing.collection import AssetModelOperation, 
DagModelOperation
 
 log.info("Sync %s DAGs", len(dags))
-dag_op = DagModelOperation({dag.dag_id: dag for dag in dags})  # type: 
ignore[misc]
+dag_op = DagModelOperation(
+bundle_name=bundle_name, bundle_version=bundle_version, 
dags={dag.dag_id: dag for dag in dags}

Review Comment:
   ```suggestion
   bundle_name=bundle_name, bundle_version=bundle_version, 
dags={d.dag_id: d for d in dags}
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-07 Thread via GitHub


dstandish commented on code in PR #45371:
URL: https://github.com/apache/airflow/pull/45371#discussion_r1905801545


##
airflow/dag_processing/collection.py:
##
@@ -74,11 +74,14 @@
 log = logging.getLogger(__name__)
 
 
-def _create_orm_dags(dags: Iterable[MaybeSerializedDAG], *, session: Session) 
-> Iterator[DagModel]:
+def _create_orm_dags(
+bundle_name: str, dags: Iterable[MaybeSerializedDAG], *, session: Session

Review Comment:
   seems it might make sense to just add bundle info on the dag itself instead 
of having to pass separately just a thought



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-06 Thread via GitHub


jedcunningham commented on code in PR #45371:
URL: https://github.com/apache/airflow/pull/45371#discussion_r1904525192


##
airflow/dag_processing/manager.py:
##
@@ -631,25 +646,60 @@ def _get_priority_filelocs(cls, session: Session = 
NEW_SESSION):
 session.delete(request)
 return filelocs
 
-def _refresh_dag_dir(self) -> bool:
-"""Refresh file paths from dag dir if we haven't done it for too 
long."""
-now = time.monotonic()
-elapsed_time_since_refresh = now - self.last_dag_dir_refresh_time
-if elapsed_time_since_refresh <= self.dag_dir_list_interval:
-return False
+def _refresh_dag_bundles(self):
+"""Refresh DAG bundles, if required."""
+now = timezone.utcnow()
 
-# Build up a list of Python files that could contain DAGs
-self.log.info("Searching for files in %s", self._dag_directory)
-self._file_paths = list_py_file_paths(self._dag_directory)
-self.last_dag_dir_refresh_time = now
-self.log.info("There are %s files in %s", len(self._file_paths), 
self._dag_directory)
-self.set_file_paths(self._file_paths)
+self.log.info("Refreshing DAG bundles")
+
+for bundle in self._dag_bundles:
+# TODO: AIP-66 test to make sure we get a fresh record from the db 
and it's not cached
+with create_session() as session:
+bundle_model = session.query(DagBundleModel).get(bundle.name)

Review Comment:
   We used to: 
https://github.com/apache/airflow/pull/45371#discussion_r1901498321
   
   But I think longer term having our own session will be useful for locking 
anyways.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-06 Thread via GitHub


ephraimbuddy commented on code in PR #45371:
URL: https://github.com/apache/airflow/pull/45371#discussion_r1904515815


##
airflow/dag_processing/manager.py:
##
@@ -631,25 +646,60 @@ def _get_priority_filelocs(cls, session: Session = 
NEW_SESSION):
 session.delete(request)
 return filelocs
 
-def _refresh_dag_dir(self) -> bool:
-"""Refresh file paths from dag dir if we haven't done it for too 
long."""
-now = time.monotonic()
-elapsed_time_since_refresh = now - self.last_dag_dir_refresh_time
-if elapsed_time_since_refresh <= self.dag_dir_list_interval:
-return False
+def _refresh_dag_bundles(self):
+"""Refresh DAG bundles, if required."""
+now = timezone.utcnow()
 
-# Build up a list of Python files that could contain DAGs
-self.log.info("Searching for files in %s", self._dag_directory)
-self._file_paths = list_py_file_paths(self._dag_directory)
-self.last_dag_dir_refresh_time = now
-self.log.info("There are %s files in %s", len(self._file_paths), 
self._dag_directory)
-self.set_file_paths(self._file_paths)
+self.log.info("Refreshing DAG bundles")
+
+for bundle in self._dag_bundles:
+# TODO: AIP-66 test to make sure we get a fresh record from the db 
and it's not cached
+with create_session() as session:
+bundle_model = session.query(DagBundleModel).get(bundle.name)

Review Comment:
   Should we have `DagBundleModel.get_bundle` method instead of using 
`create_session` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-03 Thread via GitHub


jedcunningham commented on code in PR #45371:
URL: https://github.com/apache/airflow/pull/45371#discussion_r1901938254


##
airflow/dag_processing/manager.py:
##
@@ -99,6 +102,13 @@ class DagFileStat:
 log = logging.getLogger("airflow.processor_manager")
 
 
+class DagFilePath(NamedTuple):

Review Comment:
   Went with that name for now :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-03 Thread via GitHub


jedcunningham commented on code in PR #45371:
URL: https://github.com/apache/airflow/pull/45371#discussion_r1901927901


##
airflow/models/dagbundle.py:
##
@@ -41,3 +47,8 @@ class DagBundleModel(Base):
 
 def __init__(self, *, name: str):
 self.name = name
+
+@staticmethod
+@provide_session
+def get(name: str, session: Session = NEW_SESSION) -> DagBundleModel:
+return session.query(DagBundleModel).get(name)

Review Comment:
   In fact, we'd likely need to do this sooner than later anyways 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-03 Thread via GitHub


jedcunningham commented on code in PR #45371:
URL: https://github.com/apache/airflow/pull/45371#discussion_r1901548009


##
airflow/models/dagbundle.py:
##
@@ -41,3 +47,8 @@ class DagBundleModel(Base):
 
 def __init__(self, *, name: str):
 self.name = name
+
+@staticmethod
+@provide_session
+def get(name: str, session: Session = NEW_SESSION) -> DagBundleModel:
+return session.query(DagBundleModel).get(name)

Review Comment:
   Good call. I'll refactor this tomorrow morning.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-03 Thread via GitHub


jedcunningham commented on code in PR #45371:
URL: https://github.com/apache/airflow/pull/45371#discussion_r1901546199


##
airflow/dag_processing/manager.py:
##
@@ -99,6 +102,13 @@ class DagFileStat:
 log = logging.getLogger("airflow.processor_manager")
 
 
+class DagFilePath(NamedTuple):

Review Comment:
   Yes, probably. I know @dstandish had a suggestion too, but I can't recall 
what it was?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-02 Thread via GitHub


uranusjr commented on code in PR #45371:
URL: https://github.com/apache/airflow/pull/45371#discussion_r1901498321


##
airflow/models/dagbundle.py:
##
@@ -41,3 +47,8 @@ class DagBundleModel(Base):
 
 def __init__(self, *, name: str):
 self.name = name
+
+@staticmethod
+@provide_session
+def get(name: str, session: Session = NEW_SESSION) -> DagBundleModel:
+return session.query(DagBundleModel).get(name)

Review Comment:
   ```suggestion
   @classmethod
   @provide_session
   def get(cls, name: str, session: Session = NEW_SESSION) -> 
DagBundleModel:
   return session.get(cls, name)
   ```
   
   Is this function useful? It looks like the only advantage is we don’t need 
to create a session explicitly. But I only see this function used once, in 
which case just calling `create_session` when needed is actually less code…



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-66: Add support for parsing DAG bundles [airflow]

2025-01-02 Thread via GitHub


uranusjr commented on code in PR #45371:
URL: https://github.com/apache/airflow/pull/45371#discussion_r1901494803


##
airflow/dag_processing/manager.py:
##
@@ -99,6 +102,13 @@ class DagFileStat:
 log = logging.getLogger("airflow.processor_manager")
 
 
+class DagFilePath(NamedTuple):

Review Comment:
   Should this be named something like DagFileInfo instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]