ephraimbuddy commented on code in PR #47275: URL: https://github.com/apache/airflow/pull/47275#discussion_r2096164941
########## airflow-core/src/airflow/dag_processing/collection.py: ########## @@ -265,15 +265,17 @@ def _update_import_errors( # Add the errors of the processed files for filename, stacktrace in import_errors.items(): if (filename, bundle_name) in existing_import_error_files: - session.query(ParseImportError).where( - ParseImportError.filename == filename, ParseImportError.bundle_name == bundle_name - ).update( - { - "filename": filename, - "bundle_name": bundle_name, - "timestamp": utcnow(), - "stacktrace": stacktrace, - }, + session.execute( + update(ParseImportError) + .where(ParseImportError.filename == filename, ParseImportError.bundle_name == bundle_name) + .values( + { + "filename": filename, + "bundle_name": bundle_name, + "timestamp": utcnow(), + "stacktrace": stacktrace, Review Comment: values accept keyword args not dictionary ########## airflow-core/src/airflow/dag_processing/bundles/manager.py: ########## @@ -149,9 +151,8 @@ def parse_config(self) -> None: @provide_session def sync_bundles_to_db(self, *, session: Session = NEW_SESSION) -> None: self.log.debug("Syncing DAG bundles to the database") - stored = {b.name: b for b in session.query(DagBundleModel).all()} - active_bundle_names = set(self._bundle_config.keys()) - for name in active_bundle_names: + stored = {b.name: b for b in session.scalars(select(DagBundleModel)).all()} + for name in self._bundle_config.keys(): Review Comment: Looks like you are omitting the active_bundle_names filter? ########## airflow-core/src/airflow/dag_processing/bundles/manager.py: ########## @@ -165,13 +166,12 @@ def sync_bundles_to_db(self, *, session: Session = NEW_SESSION) -> None: if inactive_bundle_names and active_bundle_names: new_bundle_name = sorted(active_bundle_names)[0] - updated_rows = ( - session.query(DagVersion) - .filter(DagVersion.bundle_name.in_(inactive_bundle_names)) - .update( - {DagVersion.bundle_name: new_bundle_name}, - synchronize_session=False, - ) + + updated_rows = session.execute( + update(DagVersion) + .where(DagVersion.bundle_name.in_(inactive_bundle_names)) + .values({DagVersion.bundle_name: new_bundle_name}) Review Comment: ```suggestion .values(bundle_name=new_bundle_name) ``` ########## airflow-core/src/airflow/api_fastapi/execution_api/routes/xcoms.py: ########## @@ -270,7 +270,7 @@ def set_xcom( if not run_id: raise HTTPException(status.HTTP_404_NOT_FOUND, f"Run with ID: `{run_id}` was not found") - dag_run_id = session.query(DagRun.id).filter_by(dag_id=dag_id, run_id=run_id).scalar() + dag_run_id = session.execute(DagRun.id).where(dag_id=dag_id, run_id=run_id).scalar() Review Comment: ```suggestion dag_run_id = session.scalar(select(DagRun.id).where(dag_id=dag_id, run_id=run_id)) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org