This is an automated email from the ASF dual-hosted git repository.

jason810496 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 41f20d5f6c5 Remote logging fix (#68370)
41f20d5f6c5 is described below

commit 41f20d5f6c5504c62cffe09f24abfbd88edd252d
Author: Andrei Leib <[email protected]>
AuthorDate: Mon Jun 15 23:25:45 2026 -0400

    Remote logging fix (#68370)
---
 .../src/airflow/sdk/execution_time/supervisor.py   |  2 +-
 .../task_sdk/execution_time/test_supervisor.py     | 32 ++++++++++++++++++++++
 2 files changed, 33 insertions(+), 1 deletion(-)

diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py 
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index c4b9c72420a..d4617efbbae 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -1180,10 +1180,10 @@ def _fetch_remote_logging_conn(conn_id: str, client: 
Client) -> Connection | Non
         from airflow.sdk.definitions.connection import Connection
 
         result: Connection | None = 
Connection(**conn_result.model_dump(exclude={"type"}, by_alias=True))
+        _REMOTE_LOGGING_CONN_CACHE[conn_id] = result
     else:
         result = None
 
-    _REMOTE_LOGGING_CONN_CACHE[conn_id] = result
     return result
 
 
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py 
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index 57ac9f39b50..48932581954 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -61,6 +61,7 @@ from airflow.sdk.api.datamodels._generated import (
     AssetEventResponse,
     AssetProfile,
     AssetResponse,
+    ConnectionResponse,
     DagRun,
     DagRunState,
     DagRunType,
@@ -3819,6 +3820,37 @@ def 
test_remote_logging_conn_caches_connection_not_client(monkeypatch):
         assert all(ref() is None for ref in clients), "Client instances should 
be garbage collected"
 
 
+def test_fetch_remote_logging_conn_does_not_cache_none_result(mocker):
+    """Test that connection caching doesn't cache failed lookups as None."""
+    conn_id = "test_conn"
+    client = mocker.Mock()
+    mocker.patch.object(supervisor, "ensure_secrets_backend_loaded", 
return_value=[])
+    mocker.patch.dict(supervisor._REMOTE_LOGGING_CONN_CACHE, {}, clear=True)
+    client.connections.get.side_effect = [
+        ErrorResponse(error=ErrorType.PERMISSION_DENIED),
+        ConnectionResponse(
+            conn_id=conn_id,
+            conn_type="example",
+            host=None,
+            schema_=None,
+            login=None,
+            password=None,
+            port=None,
+            extra=None,
+        ),
+    ]
+
+    assert supervisor._fetch_remote_logging_conn(conn_id, client) is None
+    assert conn_id not in supervisor._REMOTE_LOGGING_CONN_CACHE
+
+    second_call_result = supervisor._fetch_remote_logging_conn(conn_id, client)
+    assert second_call_result is not None
+    assert second_call_result.conn_id == conn_id
+    assert supervisor._REMOTE_LOGGING_CONN_CACHE[conn_id] is not None
+    # The first call resulted in None and was not cached, so the second fetch 
calls the API again.
+    assert client.connections.get.call_count == 2
+
+
 def test_process_log_messages_from_subprocess(monkeypatch, caplog):
     from airflow.sdk._shared.logging.structlog import PER_LOGGER_LEVELS
 

Reply via email to