hkc-8010 commented on code in PR #66854:
URL: https://github.com/apache/airflow/pull/66854#discussion_r3267306164


##########
airflow-core/tests/unit/assets/test_manager.py:
##########
@@ -162,6 +163,90 @@ def test_register_asset_change_with_alias(
         )
         assert 
session.scalar(select(func.count()).select_from(AssetDagRunQueue)) == 2
 
+    def test_register_asset_change_with_alias_no_lazy_load(
+        self, session, mock_task_instance, testing_dag_bundle
+    ):
+        """Regression: alias-event association must use a direct INSERT, not 
ORM .append().
+
+        ORM .append() lazy-loads the entire asset_events collection before 
writing.
+        On long-running deployments with thousands of past events, this query 
runs
+        while the task_instance row lock is held in ti_update_state, causing 
idle-in-transaction
+        pile-up that exhausts API server memory and triggers OOMKill.
+        """
+        from sqlalchemy import insert as sa_insert

Review Comment:
   Done. Moved `insert` to the top-level `from sqlalchemy import delete, func, 
insert, select` and dropped the `sa_insert` alias. No name collision exists in 
this file so the rename had no purpose.



##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -457,6 +457,12 @@ def ti_update_state(
                 extra=json.dumps({"host_name": hostname}) if hostname else 
None,
             )
         )
+        # Commit the TI state update now to release the task_instance row lock 
before
+        # running asset-event queries. Asset registration can hold the lock 
for seconds
+        # under high concurrency (many aliases with large event histories), 
causing
+        # idle-in-transaction pile-up that exhausts API server memory and 
triggers OOMKill.
+        # The task outcome is durable from this point on.
+        session.commit()

Review Comment:
   The early `session.commit()` is still needed even with the direct-INSERT fix 
in manager.py. The manager.py change only eliminates the O(n) lazy-load SELECT 
on the alias-event table. `register_asset_changes_in_db` also queries scheduled 
dags and inserts `AssetDagRunQueue` rows, all of which would otherwise hold the 
row lock and cause the same idle-in-transaction pile-up.
   
   For the silent-drop concern: swallowing the exception here is intentional. 
By the time `register_asset_changes_in_db` runs, the TI state update is already 
committed and durable. Returning HTTP 500 at this point would cause the 
task-SDK worker to retry a state update for a task that has already 
successfully completed, which is worse than a silent failure. I've improved the 
comment on both blocks to make this design intent explicit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to