dingo4dev commented on code in PR #62501:
URL: https://github.com/apache/airflow/pull/62501#discussion_r3417761758


##########
airflow-core/src/airflow/assets/manager.py:
##########
@@ -136,6 +137,40 @@ def _add_one(asset: SerializedAsset) -> AssetModel:
 
         return [_add_one(a) for a in assets]
 
+    @classmethod
+    def create_asset_event(cls, *, event_kwargs: dict, session: Session) -> 
AssetEvent:
+        """
+        Persist an :class:`AssetEvent` row and return it, bound to *session*.
+
+        For non-SQLite backends a short-lived independent session is used so
+        that the row is committed (and therefore visible to the scheduler's
+        session) before the caller continues.  SQLite does not support
+        concurrent connections, so the event is added directly to the caller's
+        *session* and flushed instead.
+        """
+        if get_dialect_name(session) == "sqlite":
+            # SQLite cannot have two concurrent connections to the same file, 
so
+            # opening a second session would deadlock.  Add directly and flush 
so
+            # the object gets an id without committing the outer transaction.
+            asset_event = AssetEvent(**event_kwargs)
+            session.add(asset_event)
+            session.flush()
+            return asset_event
+
+        # Create a short-lived session to populate asset event in db.
+        # This is to ensure the asset event is committed and visible to other 
sessions.
+        # e.g. Scheduler's session when it looks for new asset events to 
trigger dags via ADRQ.
+        # Use ``scoped=False`` to get a truly independent session with its own 
connection/transaction.
+        with create_session(scoped=False) as ae_session:
+            _asset_event = AssetEvent(**event_kwargs)
+            ae_session.add(_asset_event)
+            ae_session.flush()
+            asset_event_id = _asset_event.id
+
+        # Re-load the now-committed AssetEvent into the caller's session so 
that
+        # subsequent relationship operations work correctly.
+        return session.get_one(AssetEvent, asset_event_id)

Review Comment:
   added newsfragement and PR comment to remind durability tradeoff



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to