This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ab1418ff94a Decouple remote logging config from core (#67056)
ab1418ff94a is described below

commit ab1418ff94af2e5175b023bb47443ea262f03bd9
Author: Jason(Zhe-You) Liu <[email protected]>
AuthorDate: Thu Jun 4 19:39:58 2026 +0800

    Decouple remote logging config from core (#67056)
    
    * Decouple remote logging configuration from core airflow
    
    Fix type annotation
    
    * Align get_default_remote_conn_id with load_remote_conn_id
    
    * Make _build_remote_task_log_from_provider return RemoteLogIO without conn 
id
    
    * Split load_logging_config into _get_logging_config and 
_load_logging_config
    
    Decouple importing the [logging] logging_config_class dict from
    remote-handler resolution so each can run independently. The deprecated
    load_logging_config wrapper delegates to both and keeps the old return
    shape. Wire the lazy remote-handler load through the shared
    resolve_remote_task_log factory.
    
    * Add significant md
    
    * Fix: Respct ("logging", "remote_logging") in shared logging factory
    
    * Addressed Jens comments
---
 airflow-core/newsfragments/67056.significant.rst   | 101 +++++++
 airflow-core/src/airflow/logging_config.py         |  69 +++--
 airflow-core/src/airflow/provider.yaml.schema.json |  19 ++
 airflow-core/src/airflow/provider_info.schema.json |  19 ++
 airflow-core/src/airflow/providers_manager.py      |  54 ++++
 .../tests/unit/always/test_providers_manager.py    |  83 ++++++
 .../tests/unit/logging/test_logging_config.py      | 210 +++++++++++++++
 .../logging/src/airflow_shared/logging/factory.py  | 182 +++++++++++++
 shared/logging/tests/logging/test_factory.py       | 289 +++++++++++++++++++++
 task-sdk/src/airflow/sdk/log.py                    |  13 +-
 .../src/airflow/sdk/providers_manager_runtime.py   |  56 +++-
 .../task_sdk/test_providers_manager_runtime.py     |  70 ++++-
 12 files changed, 1134 insertions(+), 31 deletions(-)

diff --git a/airflow-core/newsfragments/67056.significant.rst 
b/airflow-core/newsfragments/67056.significant.rst
new file mode 100644
index 00000000000..266cb2c24e7
--- /dev/null
+++ b/airflow-core/newsfragments/67056.significant.rst
@@ -0,0 +1,101 @@
+Decouple remote logging resolution from ``airflow.logging_config``
+
+Remote task log handler resolution is now owned by the shared
+``airflow_shared.logging.factory`` module and applies a single, well-defined
+precedence rule. ``airflow.logging_config.load_logging_config`` is deprecated:
+loading the ``[logging] logging_config_class`` dict and resolving the remote
+handler are now two independent steps.
+
+**Resolution order (``resolve_remote_task_log``):**
+
+1. **User-defined ``[logging] logging_config_class``** — if the user has set
+   ``logging_config_class`` to a custom module path, and that module exports a
+   ``REMOTE_TASK_LOG`` (or ``DEFAULT_REMOTE_CONN_ID``) attribute, those values
+   win. Existing custom logging configs keep working unchanged.
+2. **ProvidersManager scheme dispatch** — the scheme of ``[logging]
+   remote_base_log_folder`` (e.g. ``s3``, ``gs``, ``wasb``) is looked up in the
+   provider yaml ``remote-logging:`` registry. The matching ``RemoteLogIO``
+   class is imported and instantiated via its ``from_config()`` classmethod.
+   The connection id comes from ``[logging] remote_log_conn_id`` (set
+   explicitly by the user; providers needing a backend default can read it
+   inside ``from_config``).
+3. **Legacy attr-path fallback** — if neither of the above produced a handler,
+   the resolver imports the default logging module
+   (``airflow.config_templates.airflow_local_settings``) and reads its
+   ``REMOTE_TASK_LOG`` / ``DEFAULT_REMOTE_CONN_ID`` attributes. This is the
+   per-scheme ``if/elif`` chain in ``airflow_local_settings.py`` and is
+   transitional — it will be removed once every in-tree provider exposes
+   ``from_config`` in Airflow 4.0.
+
+**``RemoteLogIO.from_config`` contract:**
+
+Provider remote-log handler classes opting into provider dispatch must expose
+a ``from_config`` classmethod. The shape is::
+
+    class MyRemoteLogIO(LoggingMixin):
+        @classmethod
+        def from_config(cls) -> "MyRemoteLogIO":
+            from airflow.providers.common.compat.sdk import conf
+
+            return cls(
+                base_log_folder=conf.get("logging", "base_log_folder"),
+                remote_base=conf.get("logging", "remote_base_log_folder"),
+                delete_local_copy=conf.getboolean("logging", 
"delete_local_logs"),
+                # backend-specific keys live in the provider's own config 
section
+                ...,
+            )
+
+Key properties:
+
+- Takes no arguments — the shared factory calls ``cls.from_config()`` with no
+  inputs. Providers read ``airflow.providers.common.compat.sdk.conf`` 
themselves and pick
+  the keys they care about.
+- Returns a fully instantiated ``RemoteLogIO`` (or ``RemoteLogStreamIO``).
+- Failures inside ``from_config`` are logged and treated as "no remote
+  handler" (the factory returns ``None`` and the legacy fallback runs); under
+  ``PYTEST_CURRENT_TEST`` the exception is re-raised so tests fail loudly.
+- Providers that don't yet implement ``from_config`` continue to work via the
+  legacy ``airflow_local_settings.py`` chain (step 3).
+
+**``airflow.logging_config`` API changes:**
+
+- ``_get_logging_config()`` — new private helper that imports and validates
+  the ``[logging] logging_config_class`` dict only. Does not touch remote
+  logging state.
+- ``_load_logging_config()`` — new private helper that calls
+  ``resolve_remote_task_log`` and caches the result on
+  ``_ActiveLoggingConfig``. Used lazily by ``get_remote_task_log`` and
+  ``get_default_remote_conn_id``.
+- ``load_logging_config()`` — deprecated. Emits ``DeprecationWarning`` and
+  delegates to both helpers; still returns ``(logging_config_dict,
+  logging_class_path)`` so existing callers keep working.
+
+**Behaviour changes:**
+
+- ``configure_logging`` no longer eagerly resolves the remote handler.
+  Resolution is now lazy and happens on the first call to
+  ``get_remote_task_log()`` / ``get_default_remote_conn_id()``.
+- Providers that registered a ``remote-logging:`` block but did not implement
+  ``from_config`` will be skipped with a warning; the legacy fallback path
+  takes over.
+
+* Types of change
+
+  * [ ] Dag changes
+  * [ ] Config changes
+  * [ ] API changes
+  * [ ] CLI changes
+  * [x] Behaviour changes
+  * [ ] Plugin changes
+  * [ ] Dependency changes
+  * [x] Code interface changes
+
+* Migration rules needed
+
+  * Replace direct calls to ``airflow.logging_config.load_logging_config()``
+    with ``_get_logging_config()`` (for the logging dict) and/or
+    ``_load_logging_config()`` (to prime the remote-handler cache).
+  * Provider remote-log handler classes should implement a no-argument
+    ``from_config`` classmethod that reads 
``airflow.providers.common.compat.sdk.conf``
+    and returns a configured instance. Until they do, resolution falls
+    through to the legacy ``airflow_local_settings.py`` chain.
diff --git a/airflow-core/src/airflow/logging_config.py 
b/airflow-core/src/airflow/logging_config.py
index 0da017d9b08..7f4ae605cb2 100644
--- a/airflow-core/src/airflow/logging_config.py
+++ b/airflow-core/src/airflow/logging_config.py
@@ -21,7 +21,7 @@ import logging
 import warnings
 from typing import TYPE_CHECKING, Any
 
-from airflow._shared.logging.remote import discover_remote_log_handler
+from airflow._shared.logging.factory import DEFAULT_LOGGING_CONFIG_PATH, 
resolve_remote_task_log
 from airflow._shared.module_loading import import_string
 from airflow.configuration import conf
 from airflow.exceptions import AirflowConfigException
@@ -49,25 +49,26 @@ class _ActiveLoggingConfig:
 
 def get_remote_task_log() -> RemoteLogIO | None:
     if not _ActiveLoggingConfig.logging_config_loaded:
-        load_logging_config()
+        _load_logging_config()
     return _ActiveLoggingConfig.remote_task_log
 
 
 def get_default_remote_conn_id() -> str | None:
+    if conn_id := conf.get("logging", "remote_log_conn_id", fallback=None):
+        return conn_id
+
     if not _ActiveLoggingConfig.logging_config_loaded:
-        load_logging_config()
+        _load_logging_config()
     return _ActiveLoggingConfig.default_remote_conn_id
 
 
-def load_logging_config() -> tuple[dict[str, Any], str]:
-    """Configure & Validate Airflow Logging."""
-    fallback = 
"airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG"
-    logging_class_path = conf.get("logging", "logging_config_class", 
fallback=fallback)
-
-    # Sometimes we end up with `""` as the value!
-    logging_class_path = logging_class_path or fallback
-
-    user_defined = logging_class_path != fallback
+def _get_logging_config() -> dict[str, Any]:
+    """Import and validate the ``[logging] logging_config_class`` dict."""
+    logging_class_path = (
+        conf.get("logging", "logging_config_class", 
fallback=DEFAULT_LOGGING_CONFIG_PATH)
+        or DEFAULT_LOGGING_CONFIG_PATH
+    )
+    user_defined = logging_class_path != DEFAULT_LOGGING_CONFIG_PATH
 
     try:
         logging_config = import_string(logging_class_path)
@@ -78,27 +79,51 @@ def load_logging_config() -> tuple[dict[str, Any], str]:
 
         if user_defined:
             log.info("Successfully imported user-defined logging config from 
%s", logging_class_path)
-
     except Exception as err:
-        # Import default logging configurations.
         raise ImportError(
             f"Unable to load {'custom ' if user_defined else ''}logging config 
from {logging_class_path} due "
             f"to: {type(err).__name__}:{err}"
         )
-    else:
-        # Load remote logging configuration using shared discovery logic
-        remote_task_log, default_remote_conn_id = discover_remote_log_handler(
-            logging_class_path, fallback, import_string
-        )
-        _ActiveLoggingConfig.set(remote_task_log, default_remote_conn_id)
 
-    return logging_config, logging_class_path
+    return logging_config
+
+
+def _load_logging_config() -> None:
+    """Load and cache the remote logging configuration from core config."""
+    from airflow.providers_manager import ProvidersManager
+
+    remote_task_log, default_remote_conn_id = resolve_remote_task_log(
+        conf=conf,
+        providers_manager=ProvidersManager(),
+        import_string=import_string,
+    )
+    _ActiveLoggingConfig.set(remote_task_log, default_remote_conn_id)
+
+
+def load_logging_config() -> tuple[dict[str, Any], str]:
+    """
+    Import the logging config dict and load the remote logging handler.
+
+    .. deprecated::
+        Use :func:`_get_logging_config` for the logging dict and
+        :func:`_load_logging_config` for remote handler setup.
+    """
+    warnings.warn(
+        "load_logging_config is deprecated; use _get_logging_config() for the 
logging dict "
+        "and _load_logging_config() for remote handler setup.",
+        DeprecationWarning,
+        stacklevel=2,
+    )
+    _load_logging_config()
+    return _get_logging_config(), conf.get(
+        "logging", "logging_config_class", fallback=DEFAULT_LOGGING_CONFIG_PATH
+    ) or DEFAULT_LOGGING_CONFIG_PATH
 
 
 def configure_logging():
     from airflow._shared.logging import configure_logging, init_log_folder, 
translate_config_values
 
-    logging_config, logging_class_path = load_logging_config()
+    logging_config = _get_logging_config()
     try:
         level: str = getattr(
             logging_config, "LOG_LEVEL", conf.get("logging", "logging_level", 
fallback="INFO")
diff --git a/airflow-core/src/airflow/provider.yaml.schema.json 
b/airflow-core/src/airflow/provider.yaml.schema.json
index 2f020b27330..50dacbc7963 100644
--- a/airflow-core/src/airflow/provider.yaml.schema.json
+++ b/airflow-core/src/airflow/provider.yaml.schema.json
@@ -536,6 +536,25 @@
                 "type": "string"
             }
         },
+        "remote-logging": {
+            "type": "array",
+            "description": "Remote logging IO handlers contributed by the 
provider. Each entry registers a RemoteLogIO implementation that 
ProvidersManager dispatches by URL scheme.",
+            "items": {
+                "type": "object",
+                "required": ["classpath", "scheme"],
+                "additionalProperties": false,
+                "properties": {
+                    "classpath": {
+                        "type": "string",
+                        "description": "Fully-qualified class name of the 
RemoteLogIO implementation."
+                    },
+                    "scheme": {
+                        "type": "string",
+                        "description": "URL scheme (without ://) of [logging] 
remote_base_log_folder that this handler claims."
+                    }
+                }
+            }
+        },
         "auth-backends": {
             "type": "array",
             "description": "API Auth Backend module names",
diff --git a/airflow-core/src/airflow/provider_info.schema.json 
b/airflow-core/src/airflow/provider_info.schema.json
index 86fc726a051..b3e9bf74b62 100644
--- a/airflow-core/src/airflow/provider_info.schema.json
+++ b/airflow-core/src/airflow/provider_info.schema.json
@@ -331,6 +331,25 @@
                 "type": "string"
             }
         },
+        "remote-logging": {
+            "type": "array",
+            "description": "Remote logging IO handlers contributed by the 
provider. Each entry registers a RemoteLogIO implementation that 
ProvidersManager dispatches by URL scheme.",
+            "items": {
+                "type": "object",
+                "required": ["classpath", "scheme"],
+                "additionalProperties": false,
+                "properties": {
+                    "classpath": {
+                        "type": "string",
+                        "description": "Fully-qualified class name of the 
RemoteLogIO implementation."
+                    },
+                    "scheme": {
+                        "type": "string",
+                        "description": "URL scheme (without ://) of [logging] 
remote_base_log_folder that this handler claims."
+                    }
+                }
+            }
+        },
         "auth-backends": {
             "type": "array",
             "description": "API Auth Backend module names",
diff --git a/airflow-core/src/airflow/providers_manager.py 
b/airflow-core/src/airflow/providers_manager.py
index 6fefcbc39b0..8e270209a8b 100644
--- a/airflow-core/src/airflow/providers_manager.py
+++ b/airflow-core/src/airflow/providers_manager.py
@@ -223,6 +223,14 @@ class NotificationInfo(NamedTuple):
     package_name: str
 
 
+class RemoteLoggingInfo(NamedTuple):
+    """Remote logging IO handler registered by a provider."""
+
+    classpath: str
+    scheme: str
+    package_name: str
+
+
 class PluginInfo(NamedTuple):
     """Plugin class, name and provider it comes from."""
 
@@ -432,6 +440,8 @@ class ProvidersManager(LoggingMixin):
         self._cli_command_provider_name_set: set[str] = set()
         self._extra_link_class_name_set: set[str] = set()
         self._logging_class_name_set: set[str] = set()
+        self._remote_logging_info_list: list[RemoteLoggingInfo] = []
+        self._remote_logging_by_scheme: dict[str, RemoteLoggingInfo] = {}
         self._auth_manager_class_name_set: set[str] = set()
         self._auth_manager_without_check_set: set[tuple[str, str]] = set()
         self._secrets_backend_class_name_set: set[str] = set()
@@ -571,6 +581,12 @@ class ProvidersManager(LoggingMixin):
         self.initialize_providers_list()
         self._discover_logging()
 
+    @provider_info_cache("remote_logging")
+    def initialize_providers_remote_logging(self):
+        """Lazy initialization of providers remote logging IO handlers."""
+        self.initialize_providers_list()
+        self._discover_remote_logging()
+
     @provider_info_cache("secrets_backends")
     def initialize_providers_secrets_backends(self):
         """Lazy initialization of providers secrets_backends information."""
@@ -1240,6 +1256,31 @@ class ProvidersManager(LoggingMixin):
                     if _correctness_check(provider_package, 
logging_class_name, provider):
                         self._logging_class_name_set.add(logging_class_name)
 
+    def _discover_remote_logging(self) -> None:
+        """Retrieve all remote logging IO handlers defined in the providers."""
+        for provider_package, provider in self._provider_dict.items():
+            entries = provider.data.get("remote-logging") or []
+            for entry in entries:
+                classpath = entry["classpath"]
+                if not _correctness_check(provider_package, classpath, 
provider):
+                    continue
+                info = RemoteLoggingInfo(
+                    classpath=classpath,
+                    scheme=entry["scheme"],
+                    package_name=provider_package,
+                )
+                if (existing := 
self._remote_logging_by_scheme.get(info.scheme)) is not None:
+                    log.warning(
+                        "Remote logging scheme '%s' is already registered by 
%s; ignoring "
+                        "duplicate registration from %s.",
+                        info.scheme,
+                        existing.package_name,
+                        info.package_name,
+                    )
+                    continue
+                self._remote_logging_info_list.append(info)
+                self._remote_logging_by_scheme[info.scheme] = info
+
     def _discover_secrets_backends(self) -> None:
         """Retrieve all secrets backends defined in the providers."""
         for provider_package, provider in self._provider_dict.items():
@@ -1450,6 +1491,17 @@ class ProvidersManager(LoggingMixin):
         self.initialize_providers_logging()
         return sorted(self._logging_class_name_set)
 
+    @property
+    def remote_logging_handlers(self) -> list[RemoteLoggingInfo]:
+        """Return all remote logging IO handlers contributed by providers."""
+        self.initialize_providers_remote_logging()
+        return list(self._remote_logging_info_list)
+
+    def remote_logging_handler_by_scheme(self, scheme: str) -> 
RemoteLoggingInfo | None:
+        """Return the remote logging IO handler registered for the given URL 
scheme, if any."""
+        self.initialize_providers_remote_logging()
+        return self._remote_logging_by_scheme.get(scheme)
+
     @property
     def secrets_backend_class_names(self) -> list[str]:
         """Returns set of secret backend class names."""
@@ -1532,6 +1584,8 @@ class ProvidersManager(LoggingMixin):
         self._field_behaviours.clear()
         self._extra_link_class_name_set.clear()
         self._logging_class_name_set.clear()
+        self._remote_logging_info_list.clear()
+        self._remote_logging_by_scheme.clear()
         self._auth_manager_class_name_set.clear()
         self._auth_manager_without_check_set.clear()
         self._secrets_backend_class_name_set.clear()
diff --git a/airflow-core/tests/unit/always/test_providers_manager.py 
b/airflow-core/tests/unit/always/test_providers_manager.py
index afa473e80a4..25e3774d4f7 100644
--- a/airflow-core/tests/unit/always/test_providers_manager.py
+++ b/airflow-core/tests/unit/always/test_providers_manager.py
@@ -37,6 +37,7 @@ from airflow.providers_manager import (
     PluginInfo,
     ProviderInfo,
     ProvidersManager,
+    RemoteLoggingInfo,
 )
 
 from tests_common.test_utils.markers import 
skip_if_force_lowest_dependencies_marker
@@ -47,6 +48,16 @@ if TYPE_CHECKING:
     from airflow.cli.cli_config import CLICommand
 
 
+class FakeRemoteLogIO:
+    """Importable stub used by remote-logging discovery tests."""
+
+    processors: tuple = ()
+
+    @classmethod
+    def from_config(cls):
+        return cls()
+
+
 def test_cleanup_providers_manager(cleanup_providers_manager):
     """Check the cleanup provider manager functionality."""
     provider_manager = ProvidersManager()
@@ -163,6 +174,78 @@ class TestProviderManager:
             ),
         )
 
+    def test_providers_manager_register_remote_logging_by_scheme(self):
+        providers_manager = ProvidersManager()
+        providers_manager._provider_dict = LazyDictWithCache()
+        providers_manager._provider_dict["fake.remote.logging"] = ProviderInfo(
+            version="0.0.1",
+            data={
+                "remote-logging": [
+                    {
+                        "classpath": f"{__name__}.FakeRemoteLogIO",
+                        "scheme": "fake",
+                    }
+                ]
+            },
+        )
+        providers_manager._discover_remote_logging()
+        assert len(providers_manager._remote_logging_info_list) == 1
+        assert providers_manager._remote_logging_by_scheme["fake"] == 
RemoteLoggingInfo(
+            classpath=f"{__name__}.FakeRemoteLogIO",
+            scheme="fake",
+            package_name="fake.remote.logging",
+        )
+        assert "unknown" not in providers_manager._remote_logging_by_scheme
+
+    def 
test_providers_manager_register_remote_logging_duplicate_scheme_first_wins(self):
+        providers_manager = ProvidersManager()
+        providers_manager._provider_dict = LazyDictWithCache()
+        providers_manager._provider_dict["fake.remote.logging.first"] = 
ProviderInfo(
+            version="0.0.1",
+            data={
+                "remote-logging": [
+                    {
+                        "classpath": f"{__name__}.FakeRemoteLogIO",
+                        "scheme": "dup",
+                    }
+                ]
+            },
+        )
+        providers_manager._provider_dict["fake.remote.logging.second"] = 
ProviderInfo(
+            version="0.0.1",
+            data={
+                "remote-logging": [
+                    {
+                        "classpath": f"{__name__}.FakeRemoteLogIO",
+                        "scheme": "dup",
+                    }
+                ]
+            },
+        )
+        providers_manager._discover_remote_logging()
+        winner = providers_manager._remote_logging_by_scheme["dup"]
+        assert winner.package_name == "fake.remote.logging.first"
+        assert len(providers_manager._remote_logging_info_list) == 1
+        assert providers_manager._remote_logging_info_list[0].package_name == 
"fake.remote.logging.first"
+
+    def test_providers_manager_remote_logging_bad_class_filtered(self):
+        providers_manager = ProvidersManager()
+        providers_manager._provider_dict = LazyDictWithCache()
+        providers_manager._provider_dict["fake.remote.logging"] = ProviderInfo(
+            version="0.0.1",
+            data={
+                "remote-logging": [
+                    {
+                        "classpath": 
"fake.module.does.not.exist.FakeRemoteLogIO",
+                        "scheme": "bad",
+                    }
+                ]
+            },
+        )
+        providers_manager._discover_remote_logging()
+        assert "bad" not in providers_manager._remote_logging_by_scheme
+        assert providers_manager._remote_logging_info_list == []
+
     def test_connection_form_widgets(self, yaml_ui_metadata_counts):
         yaml_widgets, _ = yaml_ui_metadata_counts
         provider_manager = ProvidersManager()
diff --git a/airflow-core/tests/unit/logging/test_logging_config.py 
b/airflow-core/tests/unit/logging/test_logging_config.py
new file mode 100644
index 00000000000..04449a67419
--- /dev/null
+++ b/airflow-core/tests/unit/logging/test_logging_config.py
@@ -0,0 +1,210 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import sys
+from types import SimpleNamespace
+from unittest import mock
+
+import pytest
+
+from airflow._shared.logging.factory import DEFAULT_LOGGING_CONFIG_PATH
+from airflow.logging_config import (
+    _ActiveLoggingConfig,
+    _get_logging_config,
+    _load_logging_config,
+    get_default_remote_conn_id,
+    get_remote_task_log,
+    load_logging_config,
+)
+
+
[email protected](autouse=True)
+def _reset_active_logging_config(monkeypatch):
+    monkeypatch.setattr(_ActiveLoggingConfig, "logging_config_loaded", False, 
raising=False)
+    monkeypatch.setattr(_ActiveLoggingConfig, "remote_task_log", None, 
raising=False)
+    monkeypatch.setattr(_ActiveLoggingConfig, "default_remote_conn_id", None, 
raising=False)
+
+
+class TestGetLoggingConfig:
+    def test_returns_default_logging_dict(self):
+        from airflow.config_templates.airflow_local_settings import 
DEFAULT_LOGGING_CONFIG
+
+        config = _get_logging_config()
+        assert config == DEFAULT_LOGGING_CONFIG
+
+    def test_user_defined_dict_is_imported(self, monkeypatch):
+        fake_module = "fake_user_logging_module_for_test"
+        custom = {"version": 1, "marker": "user-defined"}
+        monkeypatch.setitem(sys.modules, fake_module, 
SimpleNamespace(LOGGING_CONFIG=custom))
+        with mock.patch("airflow.logging_config.conf") as mocked_conf:
+            mocked_conf.get.return_value = f"{fake_module}.LOGGING_CONFIG"
+            config = _get_logging_config()
+        assert config is custom
+
+    def test_empty_string_falls_back_to_default(self):
+        from airflow.config_templates.airflow_local_settings import 
DEFAULT_LOGGING_CONFIG
+
+        with mock.patch("airflow.logging_config.conf") as mocked_conf:
+            mocked_conf.get.return_value = ""
+            config = _get_logging_config()
+        assert config == DEFAULT_LOGGING_CONFIG
+
+    @pytest.mark.parametrize(
+        "logging_class_path",
+        [pytest.param("", id="empty-string"), pytest.param(None, id="none")],
+    )
+    def test_falsy_logging_class_path_falls_back_to_default(self, 
logging_class_path):
+        from airflow.config_templates.airflow_local_settings import 
DEFAULT_LOGGING_CONFIG
+
+        with mock.patch("airflow.logging_config.conf") as mocked_conf:
+            mocked_conf.get.return_value = logging_class_path
+            config = _get_logging_config()
+        assert config == DEFAULT_LOGGING_CONFIG
+
+    def test_raises_import_error_when_path_invalid(self):
+        with mock.patch("airflow.logging_config.conf") as mocked_conf:
+            mocked_conf.get.return_value = "nonexistent.module.LOGGING"
+            with pytest.raises(ImportError, match="Unable to load custom 
logging config"):
+                _get_logging_config()
+
+    def test_raises_import_error_when_value_not_dict(self, monkeypatch):
+        fake_module = "fake_user_logging_module_not_dict"
+        monkeypatch.setitem(sys.modules, fake_module, 
SimpleNamespace(LOGGING_CONFIG="not-a-dict"))
+        with mock.patch("airflow.logging_config.conf") as mocked_conf:
+            mocked_conf.get.return_value = f"{fake_module}.LOGGING_CONFIG"
+            with pytest.raises(ImportError, match="Logging Config should be of 
dict type"):
+                _get_logging_config()
+
+
+class TestLoadLoggingConfigPrivate:
+    def test_caches_resolved_remote_handler_and_conn_id(self):
+        sentinel_handler = object()
+        with (
+            mock.patch("airflow.logging_config.resolve_remote_task_log") as 
mock_resolve,
+            mock.patch("airflow.providers_manager.ProvidersManager") as 
mock_pm,
+        ):
+            mock_resolve.return_value = (sentinel_handler, "my_conn")
+            _load_logging_config()
+
+        assert _ActiveLoggingConfig.logging_config_loaded is True
+        assert _ActiveLoggingConfig.remote_task_log is sentinel_handler
+        assert _ActiveLoggingConfig.default_remote_conn_id == "my_conn"
+        # resolve_remote_task_log must be called with the core providers 
manager.
+        mock_resolve.assert_called_once()
+        assert mock_resolve.call_args.kwargs["providers_manager"] is 
mock_pm.return_value
+
+    def test_caches_none_when_resolver_returns_nothing(self):
+        with (
+            mock.patch("airflow.logging_config.resolve_remote_task_log") as 
mock_resolve,
+            mock.patch("airflow.providers_manager.ProvidersManager"),
+        ):
+            mock_resolve.return_value = (None, None)
+            _load_logging_config()
+
+        assert _ActiveLoggingConfig.logging_config_loaded is True
+        assert _ActiveLoggingConfig.remote_task_log is None
+        assert _ActiveLoggingConfig.default_remote_conn_id is None
+
+
+class TestLoadLoggingConfigDeprecated:
+    def test_emits_deprecation_warning_and_returns_tuple(self):
+        sentinel_handler = object()
+        with (
+            mock.patch("airflow.logging_config.resolve_remote_task_log") as 
mock_resolve,
+            mock.patch("airflow.providers_manager.ProvidersManager"),
+        ):
+            mock_resolve.return_value = (sentinel_handler, "my_conn")
+            with pytest.warns(DeprecationWarning, match="load_logging_config 
is deprecated"):
+                logging_config, logging_class_path = load_logging_config()
+
+        assert isinstance(logging_config, dict)
+        assert logging_class_path == DEFAULT_LOGGING_CONFIG_PATH
+        # The deprecated wrapper still primes the remote handler cache.
+        assert _ActiveLoggingConfig.remote_task_log is sentinel_handler
+
+    def test_returns_user_logging_class_path(self, monkeypatch):
+        fake_module = "fake_user_logging_module_deprecated"
+        custom = {"version": 1}
+        custom_path = f"{fake_module}.LOGGING_CONFIG"
+        monkeypatch.setitem(sys.modules, fake_module, 
SimpleNamespace(LOGGING_CONFIG=custom))
+
+        with (
+            mock.patch("airflow.logging_config.conf") as mocked_conf,
+            mock.patch("airflow.logging_config.resolve_remote_task_log") as 
mock_resolve,
+            mock.patch("airflow.providers_manager.ProvidersManager"),
+        ):
+            mocked_conf.get.return_value = custom_path
+            mock_resolve.return_value = (None, None)
+            with pytest.warns(DeprecationWarning, match="load_logging_config 
is deprecated"):
+                _, logging_class_path = load_logging_config()
+
+        assert logging_class_path == custom_path
+
+
+class TestGetRemoteTaskLog:
+    def test_returns_cached_value_without_reload(self):
+        sentinel = object()
+        _ActiveLoggingConfig.set(sentinel, None)
+        with mock.patch("airflow.logging_config._load_logging_config") as 
mock_load:
+            result = get_remote_task_log()
+        assert result is sentinel
+        mock_load.assert_not_called()
+
+    def test_triggers_load_when_not_loaded(self):
+        sentinel = object()
+
+        def _fake_load():
+            _ActiveLoggingConfig.set(sentinel, None)
+
+        with mock.patch("airflow.logging_config._load_logging_config", 
side_effect=_fake_load) as mock_load:
+            result = get_remote_task_log()
+        mock_load.assert_called_once()
+        assert result is sentinel
+
+
+class TestGetDefaultRemoteConnId:
+    def test_prefers_explicit_conf_value(self):
+        with (
+            mock.patch("airflow.logging_config.conf") as mocked_conf,
+            mock.patch("airflow.logging_config._load_logging_config") as 
mock_load,
+        ):
+            mocked_conf.get.return_value = "explicit_conn"
+            assert get_default_remote_conn_id() == "explicit_conn"
+        mock_load.assert_not_called()
+
+    def test_falls_back_to_cached_default(self):
+        def _fake_load():
+            _ActiveLoggingConfig.set(None, "cached_conn")
+
+        with (
+            mock.patch("airflow.logging_config.conf") as mocked_conf,
+            mock.patch("airflow.logging_config._load_logging_config", 
side_effect=_fake_load),
+        ):
+            mocked_conf.get.return_value = None
+            assert get_default_remote_conn_id() == "cached_conn"
+
+    def test_skips_reload_when_already_loaded(self):
+        _ActiveLoggingConfig.set(None, "cached_conn")
+        with (
+            mock.patch("airflow.logging_config.conf") as mocked_conf,
+            mock.patch("airflow.logging_config._load_logging_config") as 
mock_load,
+        ):
+            mocked_conf.get.return_value = None
+            assert get_default_remote_conn_id() == "cached_conn"
+        mock_load.assert_not_called()
diff --git a/shared/logging/src/airflow_shared/logging/factory.py 
b/shared/logging/src/airflow_shared/logging/factory.py
new file mode 100644
index 00000000000..acd016d6273
--- /dev/null
+++ b/shared/logging/src/airflow_shared/logging/factory.py
@@ -0,0 +1,182 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Build a remote logging IO handler from a provider-supplied registry.
+
+This module owns the runtime-independent logic for resolving and instantiating
+``RemoteLogIO`` handlers. Both ``airflow.logging_config`` (core) and
+``airflow.sdk.log._load_logging_config`` (Task SDK) call
+:func:`resolve_remote_task_log` with their own ``ProvidersManager`` instance 
and
+``import_string`` implementation; that function in turn delegates to
+:func:`_build_remote_task_log_from_provider` for the provider-dispatch step 
and to
+:func:`airflow._shared.logging.remote.discover_remote_log_handler` for the
+user-defined ``logging_config_class`` and legacy fallbacks.
+"""
+
+from __future__ import annotations
+
+import logging
+import os
+from collections.abc import Callable
+from typing import TYPE_CHECKING, Any
+from urllib.parse import urlsplit
+
+from .remote import discover_remote_log_handler
+
+if TYPE_CHECKING:
+    from airflow.configuration import AirflowConfigParser
+    from airflow.logging.remote import RemoteLogIO
+    from airflow.providers_manager import ProvidersManager, RemoteLoggingInfo 
as _CoreRemoteLoggingInfo
+    from airflow.sdk.configuration import AirflowSDKConfigParser
+    from airflow.sdk.providers_manager_runtime import (
+        ProvidersManagerTaskRuntime,
+        RemoteLoggingInfo as _SDKRemoteLoggingInfo,
+    )
+
+log = logging.getLogger(__name__)
+
+DEFAULT_LOGGING_CONFIG_PATH = 
"airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG"
+# Default ``[logging] logging_config_class`` value when the user has not 
overridden it.
+
+
+def _instantiate_handler(
+    info: _CoreRemoteLoggingInfo | _SDKRemoteLoggingInfo,
+    *,
+    import_string: Callable[[str], Any],
+) -> Any | None:
+    try:
+        cls = import_string(info.classpath)
+    except Exception as err:
+        log.info(
+            "Remote task logs will not be available; failed to import %s: %s",
+            info.classpath,
+            err,
+        )
+        if "PYTEST_CURRENT_TEST" in os.environ:
+            raise
+        return None
+
+    factory = getattr(cls, "from_config", None)
+    if factory is None:
+        log.warning(
+            "Provider %s registered %s as a remote logging handler, but the 
class does not "
+            "implement classmethod 'from_config'. The handler will be 
skipped.",
+            info.package_name,
+            info.classpath,
+        )
+        return None
+
+    try:
+        return factory()
+    except Exception as err:
+        log.info(
+            "Remote task logs will not be available; %s.from_config raised: 
%s",
+            info.classpath,
+            err,
+        )
+        if "PYTEST_CURRENT_TEST" in os.environ:
+            raise
+        return None
+
+
+def _build_remote_task_log_from_provider(
+    *,
+    remote_base_log_folder: str | None,
+    providers_manager: ProvidersManager | ProvidersManagerTaskRuntime,
+    import_string: Callable[[str], Any],
+) -> RemoteLogIO | None:
+    """
+    Resolve a remote logging IO handler from its URL scheme.
+
+    The shared layer only routes to the right provider class; each provider's
+    ``from_config`` classmethod imports ``conf`` itself and reads its own
+    backend-specific keys.
+
+    Provider dispatch does not produce a default conn id. Users who need one
+    set ``[logging] remote_log_conn_id`` explicitly; providers that want a
+    backend-specific default can read from their own hook inside 
``from_config``.
+
+    :param remote_base_log_folder: Value of ``[logging] 
remote_base_log_folder``.
+        Its URL scheme is the dispatch key.
+    :param providers_manager: The ``ProvidersManager`` (core) or
+        ``ProvidersManagerTaskRuntime`` (Task SDK) whose
+        ``remote_logging_handler_by_scheme`` method resolves the scheme.
+    :param import_string: ``import_string`` callable from the caller's runtime.
+    :returns: The instantiated handler, or ``None`` when no provider claims the
+        scheme.
+    """
+    if not remote_base_log_folder:
+        return None
+
+    if not (scheme := urlsplit(remote_base_log_folder).scheme):
+        return None
+
+    if (info := providers_manager.remote_logging_handler_by_scheme(scheme)) is 
None:
+        return None
+
+    return _instantiate_handler(info, import_string=import_string)
+
+
+def resolve_remote_task_log(
+    *,
+    conf: AirflowConfigParser | AirflowSDKConfigParser,
+    providers_manager: ProvidersManager | ProvidersManagerTaskRuntime,
+    import_string: Callable[[str], Any],
+) -> tuple[RemoteLogIO | None, str | None]:
+    """
+    Resolve the active remote log handler via the three-tier precedence rule.
+
+    Order:
+
+    1. A user-defined ``logging_config_class`` module that exports
+       ``REMOTE_TASK_LOG`` (or ``DEFAULT_REMOTE_CONN_ID``) wins — preserves
+       existing custom configs unchanged.
+    2. ProvidersManager dispatch (provider yaml ``remote-logging:`` blocks).
+    3. Legacy attr-path against the default logging module. Transitional while
+       ``airflow_local_settings.py`` still carries the per-scheme if/elif 
chain;
+       removed when every provider has migrated.
+
+    :param conf: AirflowConfigParser-like object exposing ``get``.
+    :param providers_manager: Caller's ``ProvidersManager`` /
+        ``ProvidersManagerTaskRuntime``.
+    :param import_string: ``import_string`` callable from the caller's runtime.
+    :returns: ``(remote_task_log, default_remote_conn_id)``. Either element may
+        be ``None`` when no path produces a result.
+    """
+    logging_class_path = (
+        conf.get("logging", "logging_config_class", 
fallback=DEFAULT_LOGGING_CONFIG_PATH)
+        or DEFAULT_LOGGING_CONFIG_PATH
+    )
+    user_defined = logging_class_path != DEFAULT_LOGGING_CONFIG_PATH
+
+    if user_defined:
+        remote_task_log, default_remote_conn_id = discover_remote_log_handler(
+            logging_class_path, DEFAULT_LOGGING_CONFIG_PATH, import_string
+        )
+        if remote_task_log is not None or default_remote_conn_id is not None:
+            return remote_task_log, default_remote_conn_id
+
+    if conf.getboolean("logging", "remote_logging", fallback=False):
+        remote_task_log = _build_remote_task_log_from_provider(
+            remote_base_log_folder=conf.get("logging", 
"remote_base_log_folder", fallback=None),
+            providers_manager=providers_manager,
+            import_string=import_string,
+        )
+        if remote_task_log is not None:
+            return remote_task_log, conf.get("logging", "remote_log_conn_id", 
fallback=None)
+
+    return discover_remote_log_handler(logging_class_path, 
DEFAULT_LOGGING_CONFIG_PATH, import_string)
diff --git a/shared/logging/tests/logging/test_factory.py 
b/shared/logging/tests/logging/test_factory.py
new file mode 100644
index 00000000000..fa81d7fa1c6
--- /dev/null
+++ b/shared/logging/tests/logging/test_factory.py
@@ -0,0 +1,289 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from types import SimpleNamespace
+from typing import Any
+
+import pytest
+
+from airflow_shared.logging.factory import (
+    DEFAULT_LOGGING_CONFIG_PATH,
+    _build_remote_task_log_from_provider,
+    resolve_remote_task_log,
+)
+
+
+class FakeProvidersManager:
+    """Stub manager satisfying the ProvidersManagerLike protocol."""
+
+    def __init__(self, by_scheme: dict[str, Any] | None = None) -> None:
+        self._by_scheme = by_scheme or {}
+
+    def remote_logging_handler_by_scheme(self, scheme: str):
+        return self._by_scheme.get(scheme)
+
+
+class ExplodingProvidersManager:
+    """Stub manager whose lookup must never be called by the test."""
+
+    def remote_logging_handler_by_scheme(self, scheme: str):  # pragma: no 
cover - sentinel
+        pytest.fail("lookup must not run")
+
+
+class FakeRemoteLogIO:
+    """Importable stub used as the registered handler class."""
+
+    last_called: bool = False
+
+    def __init__(self):
+        pass
+
+    @classmethod
+    def from_config(cls):
+        FakeRemoteLogIO.last_called = True
+        return cls()
+
+
+class FakeRemoteLogIONoFactory:
+    """Stub missing the from_config classmethod."""
+
+
+_LOOKUP: dict[str, Any] = {
+    "fakeremotelogio": FakeRemoteLogIO,
+    "fakeremotelogionofactory": FakeRemoteLogIONoFactory,
+}
+
+
+def _fake_import_string(path: str) -> Any:
+    # Tests pass simple names like "fakeremotelogio"; route them through a dict
+    # to avoid needing real importable modules.
+    return _LOOKUP[path.lower()]
+
+
+def _info(
+    *,
+    classpath: str = "fakeremotelogio",
+    scheme: str = "fake",
+    package_name: str = "fake-provider",
+) -> SimpleNamespace:
+    return SimpleNamespace(
+        classpath=classpath,
+        scheme=scheme,
+        package_name=package_name,
+    )
+
+
[email protected](autouse=True)
+def _reset_fake():
+    FakeRemoteLogIO.last_called = False
+    yield
+    FakeRemoteLogIO.last_called = False
+
+
+def test_provider_dispatch_resolves_by_scheme():
+    info = _info(scheme="fake")
+    handler = _build_remote_task_log_from_provider(
+        remote_base_log_folder="fake://bucket/path",
+        providers_manager=FakeProvidersManager({"fake": info}),
+        import_string=_fake_import_string,
+    )
+    assert isinstance(handler, FakeRemoteLogIO)
+    assert FakeRemoteLogIO.last_called is True
+
+
+def test_provider_dispatch_returns_none_when_scheme_unknown():
+    handler = _build_remote_task_log_from_provider(
+        remote_base_log_folder="unknown://x",
+        providers_manager=FakeProvidersManager(),
+        import_string=_fake_import_string,
+    )
+    assert handler is None
+
+
+def test_provider_dispatch_returns_none_when_remote_base_unset():
+    handler = _build_remote_task_log_from_provider(
+        remote_base_log_folder=None,
+        providers_manager=ExplodingProvidersManager(),
+        import_string=_fake_import_string,
+    )
+    assert handler is None
+
+
+def test_provider_dispatch_returns_none_when_no_scheme_in_url():
+    handler = _build_remote_task_log_from_provider(
+        remote_base_log_folder="/local/path/no/scheme",
+        providers_manager=ExplodingProvidersManager(),
+        import_string=_fake_import_string,
+    )
+    assert handler is None
+
+
+def test_provider_dispatch_skips_handler_without_factory():
+    info = _info(classpath="fakeremotelogionofactory")
+    handler = _build_remote_task_log_from_provider(
+        remote_base_log_folder="fake://b",
+        providers_manager=FakeProvidersManager({"fake": info}),
+        import_string=_fake_import_string,
+    )
+    assert handler is None
+
+
+# ---------------------------------------------------------------------------
+# Tests for resolve_remote_task_log (three-tier precedence orchestrator)
+# ---------------------------------------------------------------------------
+
+_FAKE_LOGGING_DICT: dict[str, Any] = {"version": 1}
+
+
+class FakeConf:
+    """Minimal ConfLike stub keyed by ``(section, key)``."""
+
+    def __init__(self, values: dict[tuple[str, str], Any] | None = None) -> 
None:
+        self._values = values or {}
+
+    def get(self, section, key, **kwargs):
+        return self._values.get((section, key), kwargs.get("fallback"))
+
+    def getboolean(self, section, key, **kwargs):
+        value = self._values.get((section, key), kwargs.get("fallback"))
+        if isinstance(value, bool):
+            return value
+        if isinstance(value, str):
+            return value.lower() in ("1", "true", "yes", "on")
+        return bool(value)
+
+
+def test_resolve_user_defined_module_wins():
+    """If the user's logging_config_class module exposes REMOTE_TASK_LOG, it 
wins."""
+    user_path = "fake.user.module.DEFAULT_LOGGING_CONFIG"
+    sentinel_remote = object()
+
+    def fake_import_string(path: str):
+        if path == user_path:
+            return _FAKE_LOGGING_DICT
+        if path == "fake.user.module":
+            return SimpleNamespace(REMOTE_TASK_LOG=sentinel_remote, 
DEFAULT_REMOTE_CONN_ID="user_conn")
+        raise AssertionError(f"unexpected import: {path}")
+
+    import sys
+
+    sys.modules["fake.user.module"] = SimpleNamespace(
+        REMOTE_TASK_LOG=sentinel_remote, DEFAULT_REMOTE_CONN_ID="user_conn"
+    )
+    try:
+        handler, conn_id = resolve_remote_task_log(
+            conf=FakeConf({("logging", "logging_config_class"): user_path}),
+            providers_manager=ExplodingProvidersManager(),  # must not be 
consulted
+            import_string=fake_import_string,
+        )
+    finally:
+        sys.modules.pop("fake.user.module", None)
+
+    assert handler is sentinel_remote
+    assert conn_id == "user_conn"
+
+
+def test_resolve_falls_through_to_provider_dispatch():
+    """Default logging_class_path → user_defined is False → provider dispatch 
runs."""
+    info = _info(scheme="fake")
+    handler, conn_id = resolve_remote_task_log(
+        conf=FakeConf(
+            {
+                ("logging", "remote_base_log_folder"): "fake://bucket",
+                ("logging", "remote_logging"): True,
+            }
+        ),
+        providers_manager=FakeProvidersManager({"fake": info}),
+        import_string=_fake_import_string,
+    )
+    assert isinstance(handler, FakeRemoteLogIO)
+    assert conn_id is None
+
+
+def test_resolve_skips_provider_dispatch_when_remote_logging_disabled():
+    """``[logging] remote_logging = False`` must short-circuit provider 
dispatch."""
+    sentinel_remote = object()
+
+    def fake_import_string(path: str):
+        if path == DEFAULT_LOGGING_CONFIG_PATH:
+            return _FAKE_LOGGING_DICT
+        raise AssertionError(f"unexpected import: {path}")
+
+    import sys
+
+    parent_module = DEFAULT_LOGGING_CONFIG_PATH.rsplit(".", 1)[0]
+    saved = sys.modules.get(parent_module)
+    sys.modules[parent_module] = 
SimpleNamespace(REMOTE_TASK_LOG=sentinel_remote, DEFAULT_REMOTE_CONN_ID=None)
+    try:
+        # ExplodingProvidersManager fails the test if provider dispatch runs.
+        handler, conn_id = resolve_remote_task_log(
+            conf=FakeConf(
+                {
+                    ("logging", "remote_base_log_folder"): "fake://bucket",
+                    ("logging", "remote_logging"): False,
+                }
+            ),
+            providers_manager=ExplodingProvidersManager(),
+            import_string=fake_import_string,
+        )
+    finally:
+        if saved is not None:
+            sys.modules[parent_module] = saved
+        else:
+            sys.modules.pop(parent_module, None)
+
+    assert handler is sentinel_remote
+    assert conn_id is None
+
+
+def test_resolve_falls_back_to_legacy_attr_path_when_dispatch_returns_none():
+    """When provider dispatch yields nothing, the legacy module-attr path is 
consulted."""
+    sentinel_remote = object()
+
+    def fake_import_string(path: str):
+        if path == DEFAULT_LOGGING_CONFIG_PATH:
+            return _FAKE_LOGGING_DICT
+        raise AssertionError(f"unexpected import: {path}")
+
+    import sys
+
+    parent_module = DEFAULT_LOGGING_CONFIG_PATH.rsplit(".", 1)[0]
+    saved = sys.modules.get(parent_module)
+    sys.modules[parent_module] = SimpleNamespace(
+        REMOTE_TASK_LOG=sentinel_remote, DEFAULT_REMOTE_CONN_ID="legacy_conn"
+    )
+    try:
+        handler, conn_id = resolve_remote_task_log(
+            conf=FakeConf(
+                {
+                    ("logging", "remote_base_log_folder"): "unknown://b",
+                    ("logging", "remote_logging"): True,
+                }
+            ),
+            providers_manager=FakeProvidersManager(),  # empty registry
+            import_string=fake_import_string,
+        )
+    finally:
+        if saved is not None:
+            sys.modules[parent_module] = saved
+        else:
+            sys.modules.pop(parent_module, None)
+
+    assert handler is sentinel_remote
+    assert conn_id == "legacy_conn"
diff --git a/task-sdk/src/airflow/sdk/log.py b/task-sdk/src/airflow/sdk/log.py
index 9542baa4046..7391335b7f5 100644
--- a/task-sdk/src/airflow/sdk/log.py
+++ b/task-sdk/src/airflow/sdk/log.py
@@ -173,16 +173,15 @@ def init_log_file(local_relative_path: str) -> Path:
 
 def _load_logging_config() -> None:
     """Load and cache the remote logging configuration from SDK config."""
-    from airflow.sdk._shared.logging.remote import discover_remote_log_handler
+    from airflow.sdk._shared.logging.factory import resolve_remote_task_log
     from airflow.sdk._shared.module_loading import import_string
     from airflow.sdk.configuration import conf
+    from airflow.sdk.providers_manager_runtime import 
ProvidersManagerTaskRuntime
 
-    fallback = 
"airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG"
-    logging_class_path = conf.get("logging", "logging_config_class", 
fallback=fallback)
-
-    # Load remote logging configuration using shared discovery logic
-    remote_task_log, default_remote_conn_id = discover_remote_log_handler(
-        logging_class_path, fallback, import_string
+    remote_task_log, default_remote_conn_id = resolve_remote_task_log(
+        conf=conf,
+        providers_manager=ProvidersManagerTaskRuntime(),
+        import_string=import_string,
     )
     _ActiveLoggingConfig.set(remote_task_log, default_remote_conn_id)
 
diff --git a/task-sdk/src/airflow/sdk/providers_manager_runtime.py 
b/task-sdk/src/airflow/sdk/providers_manager_runtime.py
index e28ed3fe14a..ca4d4a94732 100644
--- a/task-sdk/src/airflow/sdk/providers_manager_runtime.py
+++ b/task-sdk/src/airflow/sdk/providers_manager_runtime.py
@@ -24,7 +24,7 @@ import inspect
 import traceback
 import warnings
 from collections.abc import Callable, MutableMapping
-from typing import TYPE_CHECKING, Any
+from typing import TYPE_CHECKING, Any, NamedTuple
 from urllib.parse import SplitResult
 
 import structlog
@@ -55,6 +55,14 @@ if TYPE_CHECKING:
 log = structlog.getLogger(__name__)
 
 
+class RemoteLoggingInfo(NamedTuple):
+    """Remote logging IO handler registered by a provider."""
+
+    classpath: str
+    scheme: str
+    package_name: str
+
+
 def _correctness_check(provider_package: str, class_name: str, provider_info: 
ProviderInfo) -> Any:
     """
     Perform coherence check on provider classes.
@@ -150,6 +158,8 @@ class ProvidersManagerTaskRuntime(LoggingMixin):
         # Keeps dict of hooks keyed by connection type. They are lazy 
evaluated at access time
         self._hooks_lazy_dict: LazyDictWithCache[str, HookInfo | Callable] = 
LazyDictWithCache()
         self._plugins_set: set[PluginInfo] = set()
+        self._remote_logging_info_list: list[RemoteLoggingInfo] = []
+        self._remote_logging_by_scheme: dict[str, RemoteLoggingInfo] = {}
         self._provider_schema_validator = 
_create_provider_info_schema_validator()
         self._init_airflow_core_hooks()
         # Populated by initialize_provider_configs(); holds 
provider-contributed config sections.
@@ -214,6 +224,12 @@ class ProvidersManagerTaskRuntime(LoggingMixin):
         self.initialize_providers_list()
         self._discover_plugins()
 
+    @provider_info_cache("remote_logging")
+    def initialize_providers_remote_logging(self):
+        """Lazy initialization of providers remote logging IO handlers."""
+        self.initialize_providers_list()
+        self._discover_remote_logging()
+
     @provider_info_cache("taskflow_decorators")
     def initialize_providers_taskflow_decorator(self):
         """Lazy initialization of providers taskflow decorators."""
@@ -534,6 +550,31 @@ class ProvidersManagerTaskRuntime(LoggingMixin):
                     )
                 )
 
+    def _discover_remote_logging(self) -> None:
+        """Retrieve all remote logging IO handlers defined in the providers."""
+        for provider_package, provider in self._provider_dict.items():
+            entries = provider.data.get("remote-logging") or []
+            for entry in entries:
+                classpath = entry["classpath"]
+                if not _correctness_check(provider_package, classpath, 
provider):
+                    continue
+                info = RemoteLoggingInfo(
+                    classpath=classpath,
+                    scheme=entry["scheme"],
+                    package_name=provider_package,
+                )
+                if (existing := 
self._remote_logging_by_scheme.get(info.scheme)) is not None:
+                    log.warning(
+                        "Remote logging scheme '%s' is already registered by 
%s; ignoring "
+                        "duplicate registration from %s.",
+                        info.scheme,
+                        existing.package_name,
+                        info.package_name,
+                    )
+                    continue
+                self._remote_logging_info_list.append(info)
+                self._remote_logging_by_scheme[info.scheme] = info
+
     def _discover_taskflow_decorators(self) -> None:
         for name, info in self._provider_dict.items():
             for taskflow_decorator in info.data.get("task-decorators", []):
@@ -611,6 +652,17 @@ class ProvidersManagerTaskRuntime(LoggingMixin):
         self.initialize_providers_plugins()
         return sorted(self._plugins_set, key=lambda x: x.plugin_class)
 
+    @property
+    def remote_logging_handlers(self) -> list[RemoteLoggingInfo]:
+        """Return all remote logging IO handlers contributed by providers."""
+        self.initialize_providers_remote_logging()
+        return list(self._remote_logging_info_list)
+
+    def remote_logging_handler_by_scheme(self, scheme: str) -> 
RemoteLoggingInfo | None:
+        """Return the remote logging IO handler registered for the given URL 
scheme, if any."""
+        self.initialize_providers_remote_logging()
+        return self._remote_logging_by_scheme.get(scheme)
+
     @property
     def provider_configs(self) -> list[tuple[str, dict[str, Any]]]:
         self.initialize_provider_configs()
@@ -640,6 +692,8 @@ class ProvidersManagerTaskRuntime(LoggingMixin):
         self._hook_provider_dict.clear()
         self._hooks_lazy_dict.clear()
         self._plugins_set.clear()
+        self._remote_logging_info_list.clear()
+        self._remote_logging_by_scheme.clear()
         self._asset_uri_handlers.clear()
         self._asset_factories.clear()
         self._asset_to_openlineage_converters.clear()
diff --git a/task-sdk/tests/task_sdk/test_providers_manager_runtime.py 
b/task-sdk/tests/task_sdk/test_providers_manager_runtime.py
index 1cae21d53c7..1f22ff6e73f 100644
--- a/task-sdk/tests/task_sdk/test_providers_manager_runtime.py
+++ b/task-sdk/tests/task_sdk/test_providers_manager_runtime.py
@@ -31,7 +31,7 @@ from airflow.sdk._shared.providers_discovery import (
     LazyDictWithCache,
     ProviderInfo,
 )
-from airflow.sdk.providers_manager_runtime import ProvidersManagerTaskRuntime
+from airflow.sdk.providers_manager_runtime import ProvidersManagerTaskRuntime, 
RemoteLoggingInfo
 
 from tests_common.test_utils.markers import 
skip_if_force_lowest_dependencies_marker, skip_if_not_on_main
 from tests_common.test_utils.paths import AIRFLOW_ROOT_PATH
@@ -39,6 +39,16 @@ from tests_common.test_utils.paths import AIRFLOW_ROOT_PATH
 PY313 = sys.version_info >= (3, 13)
 
 
+class FakeRemoteLogIO:
+    """Importable stub used by remote-logging discovery tests."""
+
+    processors: tuple = ()
+
+    @classmethod
+    def from_config(cls):
+        return cls()
+
+
 def test_cleanup_providers_manager_runtime(cleanup_providers_manager):
     """Check the cleanup provider manager functionality."""
     provider_manager = ProvidersManagerTaskRuntime()
@@ -277,3 +287,61 @@ class TestProvidersManagerRuntime:
             assert conf.get("test_sdk_provider", "test_option") == 
"provider-default"
         finally:
             conf.invalidate_cache()
+
+    def test_register_remote_logging_by_scheme(self):
+        providers_manager = ProvidersManagerTaskRuntime()
+        providers_manager._provider_dict = LazyDictWithCache()
+        providers_manager._provider_dict["fake.remote.logging"] = ProviderInfo(
+            version="0.0.1",
+            data={
+                "remote-logging": [
+                    {
+                        "classpath": f"{__name__}.FakeRemoteLogIO",
+                        "scheme": "fake",
+                    }
+                ]
+            },
+        )
+        providers_manager._discover_remote_logging()
+        assert len(providers_manager._remote_logging_info_list) == 1
+        assert providers_manager._remote_logging_by_scheme["fake"] == 
RemoteLoggingInfo(
+            classpath=f"{__name__}.FakeRemoteLogIO",
+            scheme="fake",
+            package_name="fake.remote.logging",
+        )
+
+    def test_register_remote_logging_duplicate_scheme_first_wins(self):
+        providers_manager = ProvidersManagerTaskRuntime()
+        providers_manager._provider_dict = LazyDictWithCache()
+        providers_manager._provider_dict["fake.remote.logging.first"] = 
ProviderInfo(
+            version="0.0.1",
+            data={"remote-logging": [{"classpath": 
f"{__name__}.FakeRemoteLogIO", "scheme": "dup"}]},
+        )
+        providers_manager._provider_dict["fake.remote.logging.second"] = 
ProviderInfo(
+            version="0.0.1",
+            data={"remote-logging": [{"classpath": 
f"{__name__}.FakeRemoteLogIO", "scheme": "dup"}]},
+        )
+        providers_manager._discover_remote_logging()
+        assert providers_manager._remote_logging_by_scheme["dup"].package_name 
== (
+            "fake.remote.logging.first"
+        )
+        assert len(providers_manager._remote_logging_info_list) == 1
+        assert providers_manager._remote_logging_info_list[0].package_name == 
"fake.remote.logging.first"
+
+    def test_register_remote_logging_bad_class_filtered(self):
+        providers_manager = ProvidersManagerTaskRuntime()
+        providers_manager._provider_dict = LazyDictWithCache()
+        providers_manager._provider_dict["fake.remote.logging"] = ProviderInfo(
+            version="0.0.1",
+            data={
+                "remote-logging": [
+                    {
+                        "classpath": 
"fake.module.does.not.exist.FakeRemoteLogIO",
+                        "scheme": "bad",
+                    }
+                ]
+            },
+        )
+        providers_manager._discover_remote_logging()
+        assert "bad" not in providers_manager._remote_logging_by_scheme
+        assert providers_manager._remote_logging_info_list == []

Reply via email to