dingo4dev commented on code in PR #62501:
URL: https://github.com/apache/airflow/pull/62501#discussion_r3302227467


##########
airflow-core/src/airflow/assets/manager.py:
##########
@@ -553,14 +571,46 @@ def _queue_dagrun_if_needed(dag: DagModel) -> str | None:
             cls.logger().debug("consuming dag ids %s", queued_dag_ids)
 
     @classmethod
-    def _queue_dagruns_nonpartitioned_postgres(
-        cls, asset_id: int, dags_to_queue: set[DagModel], session: Session
+    def _queue_dagruns_nonpartitioned_mysql(
+        cls, asset_id: int, dags_to_queue: set[DagModel], event: AssetEvent, 
session: Session
+    ) -> None:
+        from sqlalchemy import case
+        from sqlalchemy.dialects.mysql import insert
+
+        values = [{"target_dag_id": dag.dag_id} for dag in dags_to_queue]
+        stmt = insert(AssetDagRunQueue).values(asset_id=asset_id, 
created_at=event.timestamp)
+
+        update_stmt = stmt.on_duplicate_key_update(
+            created_at=case(
+                (stmt.inserted.created_at >= AssetDagRunQueue.created_at, 
stmt.inserted.created_at),
+                else_=AssetDagRunQueue.created_at,
+            )
+        )
+        session.execute(update_stmt, values)
+
+    @classmethod
+    def _queue_dagruns_nonpartitioned_conflict_update(
+        cls,
+        asset_id: int,
+        dags_to_queue: set[DagModel],
+        event: AssetEvent,
+        session: Session,
+        dialect_name: str,
     ) -> None:
-        from sqlalchemy.dialects.postgresql import insert
+        """Handle ON CONFLICT DO UPDATE upsert for dialects that support it 
(postgresql, sqlite)."""
+        if dialect_name == "postgresql":
+            from sqlalchemy.dialects.postgresql import insert
+        else:
+            from sqlalchemy.dialects.sqlite import insert

Review Comment:
   These imports are intentionally kept local because they are database dialect 
specific, it is only invoked when that backend is in use. we'd like to avoid 
importing unmatched SQLAlchemy code when running different database



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to