ephraimbuddy commented on code in PR #47275:
URL: https://github.com/apache/airflow/pull/47275#discussion_r2096164941


##########
airflow-core/src/airflow/dag_processing/collection.py:
##########
@@ -265,15 +265,17 @@ def _update_import_errors(
     # Add the errors of the processed files
     for filename, stacktrace in import_errors.items():
         if (filename, bundle_name) in existing_import_error_files:
-            session.query(ParseImportError).where(
-                ParseImportError.filename == filename, 
ParseImportError.bundle_name == bundle_name
-            ).update(
-                {
-                    "filename": filename,
-                    "bundle_name": bundle_name,
-                    "timestamp": utcnow(),
-                    "stacktrace": stacktrace,
-                },
+            session.execute(
+                update(ParseImportError)
+                .where(ParseImportError.filename == filename, 
ParseImportError.bundle_name == bundle_name)
+                .values(
+                    {
+                        "filename": filename,
+                        "bundle_name": bundle_name,
+                        "timestamp": utcnow(),
+                        "stacktrace": stacktrace,

Review Comment:
   values accept keyword args not dictionary



##########
airflow-core/src/airflow/dag_processing/bundles/manager.py:
##########
@@ -149,9 +151,8 @@ def parse_config(self) -> None:
     @provide_session
     def sync_bundles_to_db(self, *, session: Session = NEW_SESSION) -> None:
         self.log.debug("Syncing DAG bundles to the database")
-        stored = {b.name: b for b in session.query(DagBundleModel).all()}
-        active_bundle_names = set(self._bundle_config.keys())
-        for name in active_bundle_names:
+        stored = {b.name: b for b in 
session.scalars(select(DagBundleModel)).all()}
+        for name in self._bundle_config.keys():

Review Comment:
   Looks like you are omitting the active_bundle_names filter?



##########
airflow-core/src/airflow/dag_processing/bundles/manager.py:
##########
@@ -165,13 +166,12 @@ def sync_bundles_to_db(self, *, session: Session = 
NEW_SESSION) -> None:
 
         if inactive_bundle_names and active_bundle_names:
             new_bundle_name = sorted(active_bundle_names)[0]
-            updated_rows = (
-                session.query(DagVersion)
-                .filter(DagVersion.bundle_name.in_(inactive_bundle_names))
-                .update(
-                    {DagVersion.bundle_name: new_bundle_name},
-                    synchronize_session=False,
-                )
+
+            updated_rows = session.execute(
+                update(DagVersion)
+                .where(DagVersion.bundle_name.in_(inactive_bundle_names))
+                .values({DagVersion.bundle_name: new_bundle_name})

Review Comment:
   ```suggestion
                   .values(bundle_name=new_bundle_name)
   ```



##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/xcoms.py:
##########
@@ -270,7 +270,7 @@ def set_xcom(
     if not run_id:
         raise HTTPException(status.HTTP_404_NOT_FOUND, f"Run with ID: 
`{run_id}` was not found")
 
-    dag_run_id = session.query(DagRun.id).filter_by(dag_id=dag_id, 
run_id=run_id).scalar()
+    dag_run_id = session.execute(DagRun.id).where(dag_id=dag_id, 
run_id=run_id).scalar()

Review Comment:
   ```suggestion
       dag_run_id = session.scalar(select(DagRun.id).where(dag_id=dag_id, 
run_id=run_id))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to