This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ab1418ff94a Decouple remote logging config from core (#67056)
ab1418ff94a is described below
commit ab1418ff94af2e5175b023bb47443ea262f03bd9
Author: Jason(Zhe-You) Liu <[email protected]>
AuthorDate: Thu Jun 4 19:39:58 2026 +0800
Decouple remote logging config from core (#67056)
* Decouple remote logging configuration from core airflow
Fix type annotation
* Align get_default_remote_conn_id with load_remote_conn_id
* Make _build_remote_task_log_from_provider return RemoteLogIO without conn
id
* Split load_logging_config into _get_logging_config and
_load_logging_config
Decouple importing the [logging] logging_config_class dict from
remote-handler resolution so each can run independently. The deprecated
load_logging_config wrapper delegates to both and keeps the old return
shape. Wire the lazy remote-handler load through the shared
resolve_remote_task_log factory.
* Add significant md
* Fix: Respct ("logging", "remote_logging") in shared logging factory
* Addressed Jens comments
---
airflow-core/newsfragments/67056.significant.rst | 101 +++++++
airflow-core/src/airflow/logging_config.py | 69 +++--
airflow-core/src/airflow/provider.yaml.schema.json | 19 ++
airflow-core/src/airflow/provider_info.schema.json | 19 ++
airflow-core/src/airflow/providers_manager.py | 54 ++++
.../tests/unit/always/test_providers_manager.py | 83 ++++++
.../tests/unit/logging/test_logging_config.py | 210 +++++++++++++++
.../logging/src/airflow_shared/logging/factory.py | 182 +++++++++++++
shared/logging/tests/logging/test_factory.py | 289 +++++++++++++++++++++
task-sdk/src/airflow/sdk/log.py | 13 +-
.../src/airflow/sdk/providers_manager_runtime.py | 56 +++-
.../task_sdk/test_providers_manager_runtime.py | 70 ++++-
12 files changed, 1134 insertions(+), 31 deletions(-)
diff --git a/airflow-core/newsfragments/67056.significant.rst
b/airflow-core/newsfragments/67056.significant.rst
new file mode 100644
index 00000000000..266cb2c24e7
--- /dev/null
+++ b/airflow-core/newsfragments/67056.significant.rst
@@ -0,0 +1,101 @@
+Decouple remote logging resolution from ``airflow.logging_config``
+
+Remote task log handler resolution is now owned by the shared
+``airflow_shared.logging.factory`` module and applies a single, well-defined
+precedence rule. ``airflow.logging_config.load_logging_config`` is deprecated:
+loading the ``[logging] logging_config_class`` dict and resolving the remote
+handler are now two independent steps.
+
+**Resolution order (``resolve_remote_task_log``):**
+
+1. **User-defined ``[logging] logging_config_class``** — if the user has set
+ ``logging_config_class`` to a custom module path, and that module exports a
+ ``REMOTE_TASK_LOG`` (or ``DEFAULT_REMOTE_CONN_ID``) attribute, those values
+ win. Existing custom logging configs keep working unchanged.
+2. **ProvidersManager scheme dispatch** — the scheme of ``[logging]
+ remote_base_log_folder`` (e.g. ``s3``, ``gs``, ``wasb``) is looked up in the
+ provider yaml ``remote-logging:`` registry. The matching ``RemoteLogIO``
+ class is imported and instantiated via its ``from_config()`` classmethod.
+ The connection id comes from ``[logging] remote_log_conn_id`` (set
+ explicitly by the user; providers needing a backend default can read it
+ inside ``from_config``).
+3. **Legacy attr-path fallback** — if neither of the above produced a handler,
+ the resolver imports the default logging module
+ (``airflow.config_templates.airflow_local_settings``) and reads its
+ ``REMOTE_TASK_LOG`` / ``DEFAULT_REMOTE_CONN_ID`` attributes. This is the
+ per-scheme ``if/elif`` chain in ``airflow_local_settings.py`` and is
+ transitional — it will be removed once every in-tree provider exposes
+ ``from_config`` in Airflow 4.0.
+
+**``RemoteLogIO.from_config`` contract:**
+
+Provider remote-log handler classes opting into provider dispatch must expose
+a ``from_config`` classmethod. The shape is::
+
+ class MyRemoteLogIO(LoggingMixin):
+ @classmethod
+ def from_config(cls) -> "MyRemoteLogIO":
+ from airflow.providers.common.compat.sdk import conf
+
+ return cls(
+ base_log_folder=conf.get("logging", "base_log_folder"),
+ remote_base=conf.get("logging", "remote_base_log_folder"),
+ delete_local_copy=conf.getboolean("logging",
"delete_local_logs"),
+ # backend-specific keys live in the provider's own config
section
+ ...,
+ )
+
+Key properties:
+
+- Takes no arguments — the shared factory calls ``cls.from_config()`` with no
+ inputs. Providers read ``airflow.providers.common.compat.sdk.conf``
themselves and pick
+ the keys they care about.
+- Returns a fully instantiated ``RemoteLogIO`` (or ``RemoteLogStreamIO``).
+- Failures inside ``from_config`` are logged and treated as "no remote
+ handler" (the factory returns ``None`` and the legacy fallback runs); under
+ ``PYTEST_CURRENT_TEST`` the exception is re-raised so tests fail loudly.
+- Providers that don't yet implement ``from_config`` continue to work via the
+ legacy ``airflow_local_settings.py`` chain (step 3).
+
+**``airflow.logging_config`` API changes:**
+
+- ``_get_logging_config()`` — new private helper that imports and validates
+ the ``[logging] logging_config_class`` dict only. Does not touch remote
+ logging state.
+- ``_load_logging_config()`` — new private helper that calls
+ ``resolve_remote_task_log`` and caches the result on
+ ``_ActiveLoggingConfig``. Used lazily by ``get_remote_task_log`` and
+ ``get_default_remote_conn_id``.
+- ``load_logging_config()`` — deprecated. Emits ``DeprecationWarning`` and
+ delegates to both helpers; still returns ``(logging_config_dict,
+ logging_class_path)`` so existing callers keep working.
+
+**Behaviour changes:**
+
+- ``configure_logging`` no longer eagerly resolves the remote handler.
+ Resolution is now lazy and happens on the first call to
+ ``get_remote_task_log()`` / ``get_default_remote_conn_id()``.
+- Providers that registered a ``remote-logging:`` block but did not implement
+ ``from_config`` will be skipped with a warning; the legacy fallback path
+ takes over.
+
+* Types of change
+
+ * [ ] Dag changes
+ * [ ] Config changes
+ * [ ] API changes
+ * [ ] CLI changes
+ * [x] Behaviour changes
+ * [ ] Plugin changes
+ * [ ] Dependency changes
+ * [x] Code interface changes
+
+* Migration rules needed
+
+ * Replace direct calls to ``airflow.logging_config.load_logging_config()``
+ with ``_get_logging_config()`` (for the logging dict) and/or
+ ``_load_logging_config()`` (to prime the remote-handler cache).
+ * Provider remote-log handler classes should implement a no-argument
+ ``from_config`` classmethod that reads
``airflow.providers.common.compat.sdk.conf``
+ and returns a configured instance. Until they do, resolution falls
+ through to the legacy ``airflow_local_settings.py`` chain.
diff --git a/airflow-core/src/airflow/logging_config.py
b/airflow-core/src/airflow/logging_config.py
index 0da017d9b08..7f4ae605cb2 100644
--- a/airflow-core/src/airflow/logging_config.py
+++ b/airflow-core/src/airflow/logging_config.py
@@ -21,7 +21,7 @@ import logging
import warnings
from typing import TYPE_CHECKING, Any
-from airflow._shared.logging.remote import discover_remote_log_handler
+from airflow._shared.logging.factory import DEFAULT_LOGGING_CONFIG_PATH,
resolve_remote_task_log
from airflow._shared.module_loading import import_string
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException
@@ -49,25 +49,26 @@ class _ActiveLoggingConfig:
def get_remote_task_log() -> RemoteLogIO | None:
if not _ActiveLoggingConfig.logging_config_loaded:
- load_logging_config()
+ _load_logging_config()
return _ActiveLoggingConfig.remote_task_log
def get_default_remote_conn_id() -> str | None:
+ if conn_id := conf.get("logging", "remote_log_conn_id", fallback=None):
+ return conn_id
+
if not _ActiveLoggingConfig.logging_config_loaded:
- load_logging_config()
+ _load_logging_config()
return _ActiveLoggingConfig.default_remote_conn_id
-def load_logging_config() -> tuple[dict[str, Any], str]:
- """Configure & Validate Airflow Logging."""
- fallback =
"airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG"
- logging_class_path = conf.get("logging", "logging_config_class",
fallback=fallback)
-
- # Sometimes we end up with `""` as the value!
- logging_class_path = logging_class_path or fallback
-
- user_defined = logging_class_path != fallback
+def _get_logging_config() -> dict[str, Any]:
+ """Import and validate the ``[logging] logging_config_class`` dict."""
+ logging_class_path = (
+ conf.get("logging", "logging_config_class",
fallback=DEFAULT_LOGGING_CONFIG_PATH)
+ or DEFAULT_LOGGING_CONFIG_PATH
+ )
+ user_defined = logging_class_path != DEFAULT_LOGGING_CONFIG_PATH
try:
logging_config = import_string(logging_class_path)
@@ -78,27 +79,51 @@ def load_logging_config() -> tuple[dict[str, Any], str]:
if user_defined:
log.info("Successfully imported user-defined logging config from
%s", logging_class_path)
-
except Exception as err:
- # Import default logging configurations.
raise ImportError(
f"Unable to load {'custom ' if user_defined else ''}logging config
from {logging_class_path} due "
f"to: {type(err).__name__}:{err}"
)
- else:
- # Load remote logging configuration using shared discovery logic
- remote_task_log, default_remote_conn_id = discover_remote_log_handler(
- logging_class_path, fallback, import_string
- )
- _ActiveLoggingConfig.set(remote_task_log, default_remote_conn_id)
- return logging_config, logging_class_path
+ return logging_config
+
+
+def _load_logging_config() -> None:
+ """Load and cache the remote logging configuration from core config."""
+ from airflow.providers_manager import ProvidersManager
+
+ remote_task_log, default_remote_conn_id = resolve_remote_task_log(
+ conf=conf,
+ providers_manager=ProvidersManager(),
+ import_string=import_string,
+ )
+ _ActiveLoggingConfig.set(remote_task_log, default_remote_conn_id)
+
+
+def load_logging_config() -> tuple[dict[str, Any], str]:
+ """
+ Import the logging config dict and load the remote logging handler.
+
+ .. deprecated::
+ Use :func:`_get_logging_config` for the logging dict and
+ :func:`_load_logging_config` for remote handler setup.
+ """
+ warnings.warn(
+ "load_logging_config is deprecated; use _get_logging_config() for the
logging dict "
+ "and _load_logging_config() for remote handler setup.",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ _load_logging_config()
+ return _get_logging_config(), conf.get(
+ "logging", "logging_config_class", fallback=DEFAULT_LOGGING_CONFIG_PATH
+ ) or DEFAULT_LOGGING_CONFIG_PATH
def configure_logging():
from airflow._shared.logging import configure_logging, init_log_folder,
translate_config_values
- logging_config, logging_class_path = load_logging_config()
+ logging_config = _get_logging_config()
try:
level: str = getattr(
logging_config, "LOG_LEVEL", conf.get("logging", "logging_level",
fallback="INFO")
diff --git a/airflow-core/src/airflow/provider.yaml.schema.json
b/airflow-core/src/airflow/provider.yaml.schema.json
index 2f020b27330..50dacbc7963 100644
--- a/airflow-core/src/airflow/provider.yaml.schema.json
+++ b/airflow-core/src/airflow/provider.yaml.schema.json
@@ -536,6 +536,25 @@
"type": "string"
}
},
+ "remote-logging": {
+ "type": "array",
+ "description": "Remote logging IO handlers contributed by the
provider. Each entry registers a RemoteLogIO implementation that
ProvidersManager dispatches by URL scheme.",
+ "items": {
+ "type": "object",
+ "required": ["classpath", "scheme"],
+ "additionalProperties": false,
+ "properties": {
+ "classpath": {
+ "type": "string",
+ "description": "Fully-qualified class name of the
RemoteLogIO implementation."
+ },
+ "scheme": {
+ "type": "string",
+ "description": "URL scheme (without ://) of [logging]
remote_base_log_folder that this handler claims."
+ }
+ }
+ }
+ },
"auth-backends": {
"type": "array",
"description": "API Auth Backend module names",
diff --git a/airflow-core/src/airflow/provider_info.schema.json
b/airflow-core/src/airflow/provider_info.schema.json
index 86fc726a051..b3e9bf74b62 100644
--- a/airflow-core/src/airflow/provider_info.schema.json
+++ b/airflow-core/src/airflow/provider_info.schema.json
@@ -331,6 +331,25 @@
"type": "string"
}
},
+ "remote-logging": {
+ "type": "array",
+ "description": "Remote logging IO handlers contributed by the
provider. Each entry registers a RemoteLogIO implementation that
ProvidersManager dispatches by URL scheme.",
+ "items": {
+ "type": "object",
+ "required": ["classpath", "scheme"],
+ "additionalProperties": false,
+ "properties": {
+ "classpath": {
+ "type": "string",
+ "description": "Fully-qualified class name of the
RemoteLogIO implementation."
+ },
+ "scheme": {
+ "type": "string",
+ "description": "URL scheme (without ://) of [logging]
remote_base_log_folder that this handler claims."
+ }
+ }
+ }
+ },
"auth-backends": {
"type": "array",
"description": "API Auth Backend module names",
diff --git a/airflow-core/src/airflow/providers_manager.py
b/airflow-core/src/airflow/providers_manager.py
index 6fefcbc39b0..8e270209a8b 100644
--- a/airflow-core/src/airflow/providers_manager.py
+++ b/airflow-core/src/airflow/providers_manager.py
@@ -223,6 +223,14 @@ class NotificationInfo(NamedTuple):
package_name: str
+class RemoteLoggingInfo(NamedTuple):
+ """Remote logging IO handler registered by a provider."""
+
+ classpath: str
+ scheme: str
+ package_name: str
+
+
class PluginInfo(NamedTuple):
"""Plugin class, name and provider it comes from."""
@@ -432,6 +440,8 @@ class ProvidersManager(LoggingMixin):
self._cli_command_provider_name_set: set[str] = set()
self._extra_link_class_name_set: set[str] = set()
self._logging_class_name_set: set[str] = set()
+ self._remote_logging_info_list: list[RemoteLoggingInfo] = []
+ self._remote_logging_by_scheme: dict[str, RemoteLoggingInfo] = {}
self._auth_manager_class_name_set: set[str] = set()
self._auth_manager_without_check_set: set[tuple[str, str]] = set()
self._secrets_backend_class_name_set: set[str] = set()
@@ -571,6 +581,12 @@ class ProvidersManager(LoggingMixin):
self.initialize_providers_list()
self._discover_logging()
+ @provider_info_cache("remote_logging")
+ def initialize_providers_remote_logging(self):
+ """Lazy initialization of providers remote logging IO handlers."""
+ self.initialize_providers_list()
+ self._discover_remote_logging()
+
@provider_info_cache("secrets_backends")
def initialize_providers_secrets_backends(self):
"""Lazy initialization of providers secrets_backends information."""
@@ -1240,6 +1256,31 @@ class ProvidersManager(LoggingMixin):
if _correctness_check(provider_package,
logging_class_name, provider):
self._logging_class_name_set.add(logging_class_name)
+ def _discover_remote_logging(self) -> None:
+ """Retrieve all remote logging IO handlers defined in the providers."""
+ for provider_package, provider in self._provider_dict.items():
+ entries = provider.data.get("remote-logging") or []
+ for entry in entries:
+ classpath = entry["classpath"]
+ if not _correctness_check(provider_package, classpath,
provider):
+ continue
+ info = RemoteLoggingInfo(
+ classpath=classpath,
+ scheme=entry["scheme"],
+ package_name=provider_package,
+ )
+ if (existing :=
self._remote_logging_by_scheme.get(info.scheme)) is not None:
+ log.warning(
+ "Remote logging scheme '%s' is already registered by
%s; ignoring "
+ "duplicate registration from %s.",
+ info.scheme,
+ existing.package_name,
+ info.package_name,
+ )
+ continue
+ self._remote_logging_info_list.append(info)
+ self._remote_logging_by_scheme[info.scheme] = info
+
def _discover_secrets_backends(self) -> None:
"""Retrieve all secrets backends defined in the providers."""
for provider_package, provider in self._provider_dict.items():
@@ -1450,6 +1491,17 @@ class ProvidersManager(LoggingMixin):
self.initialize_providers_logging()
return sorted(self._logging_class_name_set)
+ @property
+ def remote_logging_handlers(self) -> list[RemoteLoggingInfo]:
+ """Return all remote logging IO handlers contributed by providers."""
+ self.initialize_providers_remote_logging()
+ return list(self._remote_logging_info_list)
+
+ def remote_logging_handler_by_scheme(self, scheme: str) ->
RemoteLoggingInfo | None:
+ """Return the remote logging IO handler registered for the given URL
scheme, if any."""
+ self.initialize_providers_remote_logging()
+ return self._remote_logging_by_scheme.get(scheme)
+
@property
def secrets_backend_class_names(self) -> list[str]:
"""Returns set of secret backend class names."""
@@ -1532,6 +1584,8 @@ class ProvidersManager(LoggingMixin):
self._field_behaviours.clear()
self._extra_link_class_name_set.clear()
self._logging_class_name_set.clear()
+ self._remote_logging_info_list.clear()
+ self._remote_logging_by_scheme.clear()
self._auth_manager_class_name_set.clear()
self._auth_manager_without_check_set.clear()
self._secrets_backend_class_name_set.clear()
diff --git a/airflow-core/tests/unit/always/test_providers_manager.py
b/airflow-core/tests/unit/always/test_providers_manager.py
index afa473e80a4..25e3774d4f7 100644
--- a/airflow-core/tests/unit/always/test_providers_manager.py
+++ b/airflow-core/tests/unit/always/test_providers_manager.py
@@ -37,6 +37,7 @@ from airflow.providers_manager import (
PluginInfo,
ProviderInfo,
ProvidersManager,
+ RemoteLoggingInfo,
)
from tests_common.test_utils.markers import
skip_if_force_lowest_dependencies_marker
@@ -47,6 +48,16 @@ if TYPE_CHECKING:
from airflow.cli.cli_config import CLICommand
+class FakeRemoteLogIO:
+ """Importable stub used by remote-logging discovery tests."""
+
+ processors: tuple = ()
+
+ @classmethod
+ def from_config(cls):
+ return cls()
+
+
def test_cleanup_providers_manager(cleanup_providers_manager):
"""Check the cleanup provider manager functionality."""
provider_manager = ProvidersManager()
@@ -163,6 +174,78 @@ class TestProviderManager:
),
)
+ def test_providers_manager_register_remote_logging_by_scheme(self):
+ providers_manager = ProvidersManager()
+ providers_manager._provider_dict = LazyDictWithCache()
+ providers_manager._provider_dict["fake.remote.logging"] = ProviderInfo(
+ version="0.0.1",
+ data={
+ "remote-logging": [
+ {
+ "classpath": f"{__name__}.FakeRemoteLogIO",
+ "scheme": "fake",
+ }
+ ]
+ },
+ )
+ providers_manager._discover_remote_logging()
+ assert len(providers_manager._remote_logging_info_list) == 1
+ assert providers_manager._remote_logging_by_scheme["fake"] ==
RemoteLoggingInfo(
+ classpath=f"{__name__}.FakeRemoteLogIO",
+ scheme="fake",
+ package_name="fake.remote.logging",
+ )
+ assert "unknown" not in providers_manager._remote_logging_by_scheme
+
+ def
test_providers_manager_register_remote_logging_duplicate_scheme_first_wins(self):
+ providers_manager = ProvidersManager()
+ providers_manager._provider_dict = LazyDictWithCache()
+ providers_manager._provider_dict["fake.remote.logging.first"] =
ProviderInfo(
+ version="0.0.1",
+ data={
+ "remote-logging": [
+ {
+ "classpath": f"{__name__}.FakeRemoteLogIO",
+ "scheme": "dup",
+ }
+ ]
+ },
+ )
+ providers_manager._provider_dict["fake.remote.logging.second"] =
ProviderInfo(
+ version="0.0.1",
+ data={
+ "remote-logging": [
+ {
+ "classpath": f"{__name__}.FakeRemoteLogIO",
+ "scheme": "dup",
+ }
+ ]
+ },
+ )
+ providers_manager._discover_remote_logging()
+ winner = providers_manager._remote_logging_by_scheme["dup"]
+ assert winner.package_name == "fake.remote.logging.first"
+ assert len(providers_manager._remote_logging_info_list) == 1
+ assert providers_manager._remote_logging_info_list[0].package_name ==
"fake.remote.logging.first"
+
+ def test_providers_manager_remote_logging_bad_class_filtered(self):
+ providers_manager = ProvidersManager()
+ providers_manager._provider_dict = LazyDictWithCache()
+ providers_manager._provider_dict["fake.remote.logging"] = ProviderInfo(
+ version="0.0.1",
+ data={
+ "remote-logging": [
+ {
+ "classpath":
"fake.module.does.not.exist.FakeRemoteLogIO",
+ "scheme": "bad",
+ }
+ ]
+ },
+ )
+ providers_manager._discover_remote_logging()
+ assert "bad" not in providers_manager._remote_logging_by_scheme
+ assert providers_manager._remote_logging_info_list == []
+
def test_connection_form_widgets(self, yaml_ui_metadata_counts):
yaml_widgets, _ = yaml_ui_metadata_counts
provider_manager = ProvidersManager()
diff --git a/airflow-core/tests/unit/logging/test_logging_config.py
b/airflow-core/tests/unit/logging/test_logging_config.py
new file mode 100644
index 00000000000..04449a67419
--- /dev/null
+++ b/airflow-core/tests/unit/logging/test_logging_config.py
@@ -0,0 +1,210 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import sys
+from types import SimpleNamespace
+from unittest import mock
+
+import pytest
+
+from airflow._shared.logging.factory import DEFAULT_LOGGING_CONFIG_PATH
+from airflow.logging_config import (
+ _ActiveLoggingConfig,
+ _get_logging_config,
+ _load_logging_config,
+ get_default_remote_conn_id,
+ get_remote_task_log,
+ load_logging_config,
+)
+
+
[email protected](autouse=True)
+def _reset_active_logging_config(monkeypatch):
+ monkeypatch.setattr(_ActiveLoggingConfig, "logging_config_loaded", False,
raising=False)
+ monkeypatch.setattr(_ActiveLoggingConfig, "remote_task_log", None,
raising=False)
+ monkeypatch.setattr(_ActiveLoggingConfig, "default_remote_conn_id", None,
raising=False)
+
+
+class TestGetLoggingConfig:
+ def test_returns_default_logging_dict(self):
+ from airflow.config_templates.airflow_local_settings import
DEFAULT_LOGGING_CONFIG
+
+ config = _get_logging_config()
+ assert config == DEFAULT_LOGGING_CONFIG
+
+ def test_user_defined_dict_is_imported(self, monkeypatch):
+ fake_module = "fake_user_logging_module_for_test"
+ custom = {"version": 1, "marker": "user-defined"}
+ monkeypatch.setitem(sys.modules, fake_module,
SimpleNamespace(LOGGING_CONFIG=custom))
+ with mock.patch("airflow.logging_config.conf") as mocked_conf:
+ mocked_conf.get.return_value = f"{fake_module}.LOGGING_CONFIG"
+ config = _get_logging_config()
+ assert config is custom
+
+ def test_empty_string_falls_back_to_default(self):
+ from airflow.config_templates.airflow_local_settings import
DEFAULT_LOGGING_CONFIG
+
+ with mock.patch("airflow.logging_config.conf") as mocked_conf:
+ mocked_conf.get.return_value = ""
+ config = _get_logging_config()
+ assert config == DEFAULT_LOGGING_CONFIG
+
+ @pytest.mark.parametrize(
+ "logging_class_path",
+ [pytest.param("", id="empty-string"), pytest.param(None, id="none")],
+ )
+ def test_falsy_logging_class_path_falls_back_to_default(self,
logging_class_path):
+ from airflow.config_templates.airflow_local_settings import
DEFAULT_LOGGING_CONFIG
+
+ with mock.patch("airflow.logging_config.conf") as mocked_conf:
+ mocked_conf.get.return_value = logging_class_path
+ config = _get_logging_config()
+ assert config == DEFAULT_LOGGING_CONFIG
+
+ def test_raises_import_error_when_path_invalid(self):
+ with mock.patch("airflow.logging_config.conf") as mocked_conf:
+ mocked_conf.get.return_value = "nonexistent.module.LOGGING"
+ with pytest.raises(ImportError, match="Unable to load custom
logging config"):
+ _get_logging_config()
+
+ def test_raises_import_error_when_value_not_dict(self, monkeypatch):
+ fake_module = "fake_user_logging_module_not_dict"
+ monkeypatch.setitem(sys.modules, fake_module,
SimpleNamespace(LOGGING_CONFIG="not-a-dict"))
+ with mock.patch("airflow.logging_config.conf") as mocked_conf:
+ mocked_conf.get.return_value = f"{fake_module}.LOGGING_CONFIG"
+ with pytest.raises(ImportError, match="Logging Config should be of
dict type"):
+ _get_logging_config()
+
+
+class TestLoadLoggingConfigPrivate:
+ def test_caches_resolved_remote_handler_and_conn_id(self):
+ sentinel_handler = object()
+ with (
+ mock.patch("airflow.logging_config.resolve_remote_task_log") as
mock_resolve,
+ mock.patch("airflow.providers_manager.ProvidersManager") as
mock_pm,
+ ):
+ mock_resolve.return_value = (sentinel_handler, "my_conn")
+ _load_logging_config()
+
+ assert _ActiveLoggingConfig.logging_config_loaded is True
+ assert _ActiveLoggingConfig.remote_task_log is sentinel_handler
+ assert _ActiveLoggingConfig.default_remote_conn_id == "my_conn"
+ # resolve_remote_task_log must be called with the core providers
manager.
+ mock_resolve.assert_called_once()
+ assert mock_resolve.call_args.kwargs["providers_manager"] is
mock_pm.return_value
+
+ def test_caches_none_when_resolver_returns_nothing(self):
+ with (
+ mock.patch("airflow.logging_config.resolve_remote_task_log") as
mock_resolve,
+ mock.patch("airflow.providers_manager.ProvidersManager"),
+ ):
+ mock_resolve.return_value = (None, None)
+ _load_logging_config()
+
+ assert _ActiveLoggingConfig.logging_config_loaded is True
+ assert _ActiveLoggingConfig.remote_task_log is None
+ assert _ActiveLoggingConfig.default_remote_conn_id is None
+
+
+class TestLoadLoggingConfigDeprecated:
+ def test_emits_deprecation_warning_and_returns_tuple(self):
+ sentinel_handler = object()
+ with (
+ mock.patch("airflow.logging_config.resolve_remote_task_log") as
mock_resolve,
+ mock.patch("airflow.providers_manager.ProvidersManager"),
+ ):
+ mock_resolve.return_value = (sentinel_handler, "my_conn")
+ with pytest.warns(DeprecationWarning, match="load_logging_config
is deprecated"):
+ logging_config, logging_class_path = load_logging_config()
+
+ assert isinstance(logging_config, dict)
+ assert logging_class_path == DEFAULT_LOGGING_CONFIG_PATH
+ # The deprecated wrapper still primes the remote handler cache.
+ assert _ActiveLoggingConfig.remote_task_log is sentinel_handler
+
+ def test_returns_user_logging_class_path(self, monkeypatch):
+ fake_module = "fake_user_logging_module_deprecated"
+ custom = {"version": 1}
+ custom_path = f"{fake_module}.LOGGING_CONFIG"
+ monkeypatch.setitem(sys.modules, fake_module,
SimpleNamespace(LOGGING_CONFIG=custom))
+
+ with (
+ mock.patch("airflow.logging_config.conf") as mocked_conf,
+ mock.patch("airflow.logging_config.resolve_remote_task_log") as
mock_resolve,
+ mock.patch("airflow.providers_manager.ProvidersManager"),
+ ):
+ mocked_conf.get.return_value = custom_path
+ mock_resolve.return_value = (None, None)
+ with pytest.warns(DeprecationWarning, match="load_logging_config
is deprecated"):
+ _, logging_class_path = load_logging_config()
+
+ assert logging_class_path == custom_path
+
+
+class TestGetRemoteTaskLog:
+ def test_returns_cached_value_without_reload(self):
+ sentinel = object()
+ _ActiveLoggingConfig.set(sentinel, None)
+ with mock.patch("airflow.logging_config._load_logging_config") as
mock_load:
+ result = get_remote_task_log()
+ assert result is sentinel
+ mock_load.assert_not_called()
+
+ def test_triggers_load_when_not_loaded(self):
+ sentinel = object()
+
+ def _fake_load():
+ _ActiveLoggingConfig.set(sentinel, None)
+
+ with mock.patch("airflow.logging_config._load_logging_config",
side_effect=_fake_load) as mock_load:
+ result = get_remote_task_log()
+ mock_load.assert_called_once()
+ assert result is sentinel
+
+
+class TestGetDefaultRemoteConnId:
+ def test_prefers_explicit_conf_value(self):
+ with (
+ mock.patch("airflow.logging_config.conf") as mocked_conf,
+ mock.patch("airflow.logging_config._load_logging_config") as
mock_load,
+ ):
+ mocked_conf.get.return_value = "explicit_conn"
+ assert get_default_remote_conn_id() == "explicit_conn"
+ mock_load.assert_not_called()
+
+ def test_falls_back_to_cached_default(self):
+ def _fake_load():
+ _ActiveLoggingConfig.set(None, "cached_conn")
+
+ with (
+ mock.patch("airflow.logging_config.conf") as mocked_conf,
+ mock.patch("airflow.logging_config._load_logging_config",
side_effect=_fake_load),
+ ):
+ mocked_conf.get.return_value = None
+ assert get_default_remote_conn_id() == "cached_conn"
+
+ def test_skips_reload_when_already_loaded(self):
+ _ActiveLoggingConfig.set(None, "cached_conn")
+ with (
+ mock.patch("airflow.logging_config.conf") as mocked_conf,
+ mock.patch("airflow.logging_config._load_logging_config") as
mock_load,
+ ):
+ mocked_conf.get.return_value = None
+ assert get_default_remote_conn_id() == "cached_conn"
+ mock_load.assert_not_called()
diff --git a/shared/logging/src/airflow_shared/logging/factory.py
b/shared/logging/src/airflow_shared/logging/factory.py
new file mode 100644
index 00000000000..acd016d6273
--- /dev/null
+++ b/shared/logging/src/airflow_shared/logging/factory.py
@@ -0,0 +1,182 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Build a remote logging IO handler from a provider-supplied registry.
+
+This module owns the runtime-independent logic for resolving and instantiating
+``RemoteLogIO`` handlers. Both ``airflow.logging_config`` (core) and
+``airflow.sdk.log._load_logging_config`` (Task SDK) call
+:func:`resolve_remote_task_log` with their own ``ProvidersManager`` instance
and
+``import_string`` implementation; that function in turn delegates to
+:func:`_build_remote_task_log_from_provider` for the provider-dispatch step
and to
+:func:`airflow._shared.logging.remote.discover_remote_log_handler` for the
+user-defined ``logging_config_class`` and legacy fallbacks.
+"""
+
+from __future__ import annotations
+
+import logging
+import os
+from collections.abc import Callable
+from typing import TYPE_CHECKING, Any
+from urllib.parse import urlsplit
+
+from .remote import discover_remote_log_handler
+
+if TYPE_CHECKING:
+ from airflow.configuration import AirflowConfigParser
+ from airflow.logging.remote import RemoteLogIO
+ from airflow.providers_manager import ProvidersManager, RemoteLoggingInfo
as _CoreRemoteLoggingInfo
+ from airflow.sdk.configuration import AirflowSDKConfigParser
+ from airflow.sdk.providers_manager_runtime import (
+ ProvidersManagerTaskRuntime,
+ RemoteLoggingInfo as _SDKRemoteLoggingInfo,
+ )
+
+log = logging.getLogger(__name__)
+
+DEFAULT_LOGGING_CONFIG_PATH =
"airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG"
+# Default ``[logging] logging_config_class`` value when the user has not
overridden it.
+
+
+def _instantiate_handler(
+ info: _CoreRemoteLoggingInfo | _SDKRemoteLoggingInfo,
+ *,
+ import_string: Callable[[str], Any],
+) -> Any | None:
+ try:
+ cls = import_string(info.classpath)
+ except Exception as err:
+ log.info(
+ "Remote task logs will not be available; failed to import %s: %s",
+ info.classpath,
+ err,
+ )
+ if "PYTEST_CURRENT_TEST" in os.environ:
+ raise
+ return None
+
+ factory = getattr(cls, "from_config", None)
+ if factory is None:
+ log.warning(
+ "Provider %s registered %s as a remote logging handler, but the
class does not "
+ "implement classmethod 'from_config'. The handler will be
skipped.",
+ info.package_name,
+ info.classpath,
+ )
+ return None
+
+ try:
+ return factory()
+ except Exception as err:
+ log.info(
+ "Remote task logs will not be available; %s.from_config raised:
%s",
+ info.classpath,
+ err,
+ )
+ if "PYTEST_CURRENT_TEST" in os.environ:
+ raise
+ return None
+
+
+def _build_remote_task_log_from_provider(
+ *,
+ remote_base_log_folder: str | None,
+ providers_manager: ProvidersManager | ProvidersManagerTaskRuntime,
+ import_string: Callable[[str], Any],
+) -> RemoteLogIO | None:
+ """
+ Resolve a remote logging IO handler from its URL scheme.
+
+ The shared layer only routes to the right provider class; each provider's
+ ``from_config`` classmethod imports ``conf`` itself and reads its own
+ backend-specific keys.
+
+ Provider dispatch does not produce a default conn id. Users who need one
+ set ``[logging] remote_log_conn_id`` explicitly; providers that want a
+ backend-specific default can read from their own hook inside
``from_config``.
+
+ :param remote_base_log_folder: Value of ``[logging]
remote_base_log_folder``.
+ Its URL scheme is the dispatch key.
+ :param providers_manager: The ``ProvidersManager`` (core) or
+ ``ProvidersManagerTaskRuntime`` (Task SDK) whose
+ ``remote_logging_handler_by_scheme`` method resolves the scheme.
+ :param import_string: ``import_string`` callable from the caller's runtime.
+ :returns: The instantiated handler, or ``None`` when no provider claims the
+ scheme.
+ """
+ if not remote_base_log_folder:
+ return None
+
+ if not (scheme := urlsplit(remote_base_log_folder).scheme):
+ return None
+
+ if (info := providers_manager.remote_logging_handler_by_scheme(scheme)) is
None:
+ return None
+
+ return _instantiate_handler(info, import_string=import_string)
+
+
+def resolve_remote_task_log(
+ *,
+ conf: AirflowConfigParser | AirflowSDKConfigParser,
+ providers_manager: ProvidersManager | ProvidersManagerTaskRuntime,
+ import_string: Callable[[str], Any],
+) -> tuple[RemoteLogIO | None, str | None]:
+ """
+ Resolve the active remote log handler via the three-tier precedence rule.
+
+ Order:
+
+ 1. A user-defined ``logging_config_class`` module that exports
+ ``REMOTE_TASK_LOG`` (or ``DEFAULT_REMOTE_CONN_ID``) wins — preserves
+ existing custom configs unchanged.
+ 2. ProvidersManager dispatch (provider yaml ``remote-logging:`` blocks).
+ 3. Legacy attr-path against the default logging module. Transitional while
+ ``airflow_local_settings.py`` still carries the per-scheme if/elif
chain;
+ removed when every provider has migrated.
+
+ :param conf: AirflowConfigParser-like object exposing ``get``.
+ :param providers_manager: Caller's ``ProvidersManager`` /
+ ``ProvidersManagerTaskRuntime``.
+ :param import_string: ``import_string`` callable from the caller's runtime.
+ :returns: ``(remote_task_log, default_remote_conn_id)``. Either element may
+ be ``None`` when no path produces a result.
+ """
+ logging_class_path = (
+ conf.get("logging", "logging_config_class",
fallback=DEFAULT_LOGGING_CONFIG_PATH)
+ or DEFAULT_LOGGING_CONFIG_PATH
+ )
+ user_defined = logging_class_path != DEFAULT_LOGGING_CONFIG_PATH
+
+ if user_defined:
+ remote_task_log, default_remote_conn_id = discover_remote_log_handler(
+ logging_class_path, DEFAULT_LOGGING_CONFIG_PATH, import_string
+ )
+ if remote_task_log is not None or default_remote_conn_id is not None:
+ return remote_task_log, default_remote_conn_id
+
+ if conf.getboolean("logging", "remote_logging", fallback=False):
+ remote_task_log = _build_remote_task_log_from_provider(
+ remote_base_log_folder=conf.get("logging",
"remote_base_log_folder", fallback=None),
+ providers_manager=providers_manager,
+ import_string=import_string,
+ )
+ if remote_task_log is not None:
+ return remote_task_log, conf.get("logging", "remote_log_conn_id",
fallback=None)
+
+ return discover_remote_log_handler(logging_class_path,
DEFAULT_LOGGING_CONFIG_PATH, import_string)
diff --git a/shared/logging/tests/logging/test_factory.py
b/shared/logging/tests/logging/test_factory.py
new file mode 100644
index 00000000000..fa81d7fa1c6
--- /dev/null
+++ b/shared/logging/tests/logging/test_factory.py
@@ -0,0 +1,289 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from types import SimpleNamespace
+from typing import Any
+
+import pytest
+
+from airflow_shared.logging.factory import (
+ DEFAULT_LOGGING_CONFIG_PATH,
+ _build_remote_task_log_from_provider,
+ resolve_remote_task_log,
+)
+
+
+class FakeProvidersManager:
+ """Stub manager satisfying the ProvidersManagerLike protocol."""
+
+ def __init__(self, by_scheme: dict[str, Any] | None = None) -> None:
+ self._by_scheme = by_scheme or {}
+
+ def remote_logging_handler_by_scheme(self, scheme: str):
+ return self._by_scheme.get(scheme)
+
+
+class ExplodingProvidersManager:
+ """Stub manager whose lookup must never be called by the test."""
+
+ def remote_logging_handler_by_scheme(self, scheme: str): # pragma: no
cover - sentinel
+ pytest.fail("lookup must not run")
+
+
+class FakeRemoteLogIO:
+ """Importable stub used as the registered handler class."""
+
+ last_called: bool = False
+
+ def __init__(self):
+ pass
+
+ @classmethod
+ def from_config(cls):
+ FakeRemoteLogIO.last_called = True
+ return cls()
+
+
+class FakeRemoteLogIONoFactory:
+ """Stub missing the from_config classmethod."""
+
+
+_LOOKUP: dict[str, Any] = {
+ "fakeremotelogio": FakeRemoteLogIO,
+ "fakeremotelogionofactory": FakeRemoteLogIONoFactory,
+}
+
+
+def _fake_import_string(path: str) -> Any:
+ # Tests pass simple names like "fakeremotelogio"; route them through a dict
+ # to avoid needing real importable modules.
+ return _LOOKUP[path.lower()]
+
+
+def _info(
+ *,
+ classpath: str = "fakeremotelogio",
+ scheme: str = "fake",
+ package_name: str = "fake-provider",
+) -> SimpleNamespace:
+ return SimpleNamespace(
+ classpath=classpath,
+ scheme=scheme,
+ package_name=package_name,
+ )
+
+
[email protected](autouse=True)
+def _reset_fake():
+ FakeRemoteLogIO.last_called = False
+ yield
+ FakeRemoteLogIO.last_called = False
+
+
+def test_provider_dispatch_resolves_by_scheme():
+ info = _info(scheme="fake")
+ handler = _build_remote_task_log_from_provider(
+ remote_base_log_folder="fake://bucket/path",
+ providers_manager=FakeProvidersManager({"fake": info}),
+ import_string=_fake_import_string,
+ )
+ assert isinstance(handler, FakeRemoteLogIO)
+ assert FakeRemoteLogIO.last_called is True
+
+
+def test_provider_dispatch_returns_none_when_scheme_unknown():
+ handler = _build_remote_task_log_from_provider(
+ remote_base_log_folder="unknown://x",
+ providers_manager=FakeProvidersManager(),
+ import_string=_fake_import_string,
+ )
+ assert handler is None
+
+
+def test_provider_dispatch_returns_none_when_remote_base_unset():
+ handler = _build_remote_task_log_from_provider(
+ remote_base_log_folder=None,
+ providers_manager=ExplodingProvidersManager(),
+ import_string=_fake_import_string,
+ )
+ assert handler is None
+
+
+def test_provider_dispatch_returns_none_when_no_scheme_in_url():
+ handler = _build_remote_task_log_from_provider(
+ remote_base_log_folder="/local/path/no/scheme",
+ providers_manager=ExplodingProvidersManager(),
+ import_string=_fake_import_string,
+ )
+ assert handler is None
+
+
+def test_provider_dispatch_skips_handler_without_factory():
+ info = _info(classpath="fakeremotelogionofactory")
+ handler = _build_remote_task_log_from_provider(
+ remote_base_log_folder="fake://b",
+ providers_manager=FakeProvidersManager({"fake": info}),
+ import_string=_fake_import_string,
+ )
+ assert handler is None
+
+
+# ---------------------------------------------------------------------------
+# Tests for resolve_remote_task_log (three-tier precedence orchestrator)
+# ---------------------------------------------------------------------------
+
+_FAKE_LOGGING_DICT: dict[str, Any] = {"version": 1}
+
+
+class FakeConf:
+ """Minimal ConfLike stub keyed by ``(section, key)``."""
+
+ def __init__(self, values: dict[tuple[str, str], Any] | None = None) ->
None:
+ self._values = values or {}
+
+ def get(self, section, key, **kwargs):
+ return self._values.get((section, key), kwargs.get("fallback"))
+
+ def getboolean(self, section, key, **kwargs):
+ value = self._values.get((section, key), kwargs.get("fallback"))
+ if isinstance(value, bool):
+ return value
+ if isinstance(value, str):
+ return value.lower() in ("1", "true", "yes", "on")
+ return bool(value)
+
+
+def test_resolve_user_defined_module_wins():
+ """If the user's logging_config_class module exposes REMOTE_TASK_LOG, it
wins."""
+ user_path = "fake.user.module.DEFAULT_LOGGING_CONFIG"
+ sentinel_remote = object()
+
+ def fake_import_string(path: str):
+ if path == user_path:
+ return _FAKE_LOGGING_DICT
+ if path == "fake.user.module":
+ return SimpleNamespace(REMOTE_TASK_LOG=sentinel_remote,
DEFAULT_REMOTE_CONN_ID="user_conn")
+ raise AssertionError(f"unexpected import: {path}")
+
+ import sys
+
+ sys.modules["fake.user.module"] = SimpleNamespace(
+ REMOTE_TASK_LOG=sentinel_remote, DEFAULT_REMOTE_CONN_ID="user_conn"
+ )
+ try:
+ handler, conn_id = resolve_remote_task_log(
+ conf=FakeConf({("logging", "logging_config_class"): user_path}),
+ providers_manager=ExplodingProvidersManager(), # must not be
consulted
+ import_string=fake_import_string,
+ )
+ finally:
+ sys.modules.pop("fake.user.module", None)
+
+ assert handler is sentinel_remote
+ assert conn_id == "user_conn"
+
+
+def test_resolve_falls_through_to_provider_dispatch():
+ """Default logging_class_path → user_defined is False → provider dispatch
runs."""
+ info = _info(scheme="fake")
+ handler, conn_id = resolve_remote_task_log(
+ conf=FakeConf(
+ {
+ ("logging", "remote_base_log_folder"): "fake://bucket",
+ ("logging", "remote_logging"): True,
+ }
+ ),
+ providers_manager=FakeProvidersManager({"fake": info}),
+ import_string=_fake_import_string,
+ )
+ assert isinstance(handler, FakeRemoteLogIO)
+ assert conn_id is None
+
+
+def test_resolve_skips_provider_dispatch_when_remote_logging_disabled():
+ """``[logging] remote_logging = False`` must short-circuit provider
dispatch."""
+ sentinel_remote = object()
+
+ def fake_import_string(path: str):
+ if path == DEFAULT_LOGGING_CONFIG_PATH:
+ return _FAKE_LOGGING_DICT
+ raise AssertionError(f"unexpected import: {path}")
+
+ import sys
+
+ parent_module = DEFAULT_LOGGING_CONFIG_PATH.rsplit(".", 1)[0]
+ saved = sys.modules.get(parent_module)
+ sys.modules[parent_module] =
SimpleNamespace(REMOTE_TASK_LOG=sentinel_remote, DEFAULT_REMOTE_CONN_ID=None)
+ try:
+ # ExplodingProvidersManager fails the test if provider dispatch runs.
+ handler, conn_id = resolve_remote_task_log(
+ conf=FakeConf(
+ {
+ ("logging", "remote_base_log_folder"): "fake://bucket",
+ ("logging", "remote_logging"): False,
+ }
+ ),
+ providers_manager=ExplodingProvidersManager(),
+ import_string=fake_import_string,
+ )
+ finally:
+ if saved is not None:
+ sys.modules[parent_module] = saved
+ else:
+ sys.modules.pop(parent_module, None)
+
+ assert handler is sentinel_remote
+ assert conn_id is None
+
+
+def test_resolve_falls_back_to_legacy_attr_path_when_dispatch_returns_none():
+ """When provider dispatch yields nothing, the legacy module-attr path is
consulted."""
+ sentinel_remote = object()
+
+ def fake_import_string(path: str):
+ if path == DEFAULT_LOGGING_CONFIG_PATH:
+ return _FAKE_LOGGING_DICT
+ raise AssertionError(f"unexpected import: {path}")
+
+ import sys
+
+ parent_module = DEFAULT_LOGGING_CONFIG_PATH.rsplit(".", 1)[0]
+ saved = sys.modules.get(parent_module)
+ sys.modules[parent_module] = SimpleNamespace(
+ REMOTE_TASK_LOG=sentinel_remote, DEFAULT_REMOTE_CONN_ID="legacy_conn"
+ )
+ try:
+ handler, conn_id = resolve_remote_task_log(
+ conf=FakeConf(
+ {
+ ("logging", "remote_base_log_folder"): "unknown://b",
+ ("logging", "remote_logging"): True,
+ }
+ ),
+ providers_manager=FakeProvidersManager(), # empty registry
+ import_string=fake_import_string,
+ )
+ finally:
+ if saved is not None:
+ sys.modules[parent_module] = saved
+ else:
+ sys.modules.pop(parent_module, None)
+
+ assert handler is sentinel_remote
+ assert conn_id == "legacy_conn"
diff --git a/task-sdk/src/airflow/sdk/log.py b/task-sdk/src/airflow/sdk/log.py
index 9542baa4046..7391335b7f5 100644
--- a/task-sdk/src/airflow/sdk/log.py
+++ b/task-sdk/src/airflow/sdk/log.py
@@ -173,16 +173,15 @@ def init_log_file(local_relative_path: str) -> Path:
def _load_logging_config() -> None:
"""Load and cache the remote logging configuration from SDK config."""
- from airflow.sdk._shared.logging.remote import discover_remote_log_handler
+ from airflow.sdk._shared.logging.factory import resolve_remote_task_log
from airflow.sdk._shared.module_loading import import_string
from airflow.sdk.configuration import conf
+ from airflow.sdk.providers_manager_runtime import
ProvidersManagerTaskRuntime
- fallback =
"airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG"
- logging_class_path = conf.get("logging", "logging_config_class",
fallback=fallback)
-
- # Load remote logging configuration using shared discovery logic
- remote_task_log, default_remote_conn_id = discover_remote_log_handler(
- logging_class_path, fallback, import_string
+ remote_task_log, default_remote_conn_id = resolve_remote_task_log(
+ conf=conf,
+ providers_manager=ProvidersManagerTaskRuntime(),
+ import_string=import_string,
)
_ActiveLoggingConfig.set(remote_task_log, default_remote_conn_id)
diff --git a/task-sdk/src/airflow/sdk/providers_manager_runtime.py
b/task-sdk/src/airflow/sdk/providers_manager_runtime.py
index e28ed3fe14a..ca4d4a94732 100644
--- a/task-sdk/src/airflow/sdk/providers_manager_runtime.py
+++ b/task-sdk/src/airflow/sdk/providers_manager_runtime.py
@@ -24,7 +24,7 @@ import inspect
import traceback
import warnings
from collections.abc import Callable, MutableMapping
-from typing import TYPE_CHECKING, Any
+from typing import TYPE_CHECKING, Any, NamedTuple
from urllib.parse import SplitResult
import structlog
@@ -55,6 +55,14 @@ if TYPE_CHECKING:
log = structlog.getLogger(__name__)
+class RemoteLoggingInfo(NamedTuple):
+ """Remote logging IO handler registered by a provider."""
+
+ classpath: str
+ scheme: str
+ package_name: str
+
+
def _correctness_check(provider_package: str, class_name: str, provider_info:
ProviderInfo) -> Any:
"""
Perform coherence check on provider classes.
@@ -150,6 +158,8 @@ class ProvidersManagerTaskRuntime(LoggingMixin):
# Keeps dict of hooks keyed by connection type. They are lazy
evaluated at access time
self._hooks_lazy_dict: LazyDictWithCache[str, HookInfo | Callable] =
LazyDictWithCache()
self._plugins_set: set[PluginInfo] = set()
+ self._remote_logging_info_list: list[RemoteLoggingInfo] = []
+ self._remote_logging_by_scheme: dict[str, RemoteLoggingInfo] = {}
self._provider_schema_validator =
_create_provider_info_schema_validator()
self._init_airflow_core_hooks()
# Populated by initialize_provider_configs(); holds
provider-contributed config sections.
@@ -214,6 +224,12 @@ class ProvidersManagerTaskRuntime(LoggingMixin):
self.initialize_providers_list()
self._discover_plugins()
+ @provider_info_cache("remote_logging")
+ def initialize_providers_remote_logging(self):
+ """Lazy initialization of providers remote logging IO handlers."""
+ self.initialize_providers_list()
+ self._discover_remote_logging()
+
@provider_info_cache("taskflow_decorators")
def initialize_providers_taskflow_decorator(self):
"""Lazy initialization of providers taskflow decorators."""
@@ -534,6 +550,31 @@ class ProvidersManagerTaskRuntime(LoggingMixin):
)
)
+ def _discover_remote_logging(self) -> None:
+ """Retrieve all remote logging IO handlers defined in the providers."""
+ for provider_package, provider in self._provider_dict.items():
+ entries = provider.data.get("remote-logging") or []
+ for entry in entries:
+ classpath = entry["classpath"]
+ if not _correctness_check(provider_package, classpath,
provider):
+ continue
+ info = RemoteLoggingInfo(
+ classpath=classpath,
+ scheme=entry["scheme"],
+ package_name=provider_package,
+ )
+ if (existing :=
self._remote_logging_by_scheme.get(info.scheme)) is not None:
+ log.warning(
+ "Remote logging scheme '%s' is already registered by
%s; ignoring "
+ "duplicate registration from %s.",
+ info.scheme,
+ existing.package_name,
+ info.package_name,
+ )
+ continue
+ self._remote_logging_info_list.append(info)
+ self._remote_logging_by_scheme[info.scheme] = info
+
def _discover_taskflow_decorators(self) -> None:
for name, info in self._provider_dict.items():
for taskflow_decorator in info.data.get("task-decorators", []):
@@ -611,6 +652,17 @@ class ProvidersManagerTaskRuntime(LoggingMixin):
self.initialize_providers_plugins()
return sorted(self._plugins_set, key=lambda x: x.plugin_class)
+ @property
+ def remote_logging_handlers(self) -> list[RemoteLoggingInfo]:
+ """Return all remote logging IO handlers contributed by providers."""
+ self.initialize_providers_remote_logging()
+ return list(self._remote_logging_info_list)
+
+ def remote_logging_handler_by_scheme(self, scheme: str) ->
RemoteLoggingInfo | None:
+ """Return the remote logging IO handler registered for the given URL
scheme, if any."""
+ self.initialize_providers_remote_logging()
+ return self._remote_logging_by_scheme.get(scheme)
+
@property
def provider_configs(self) -> list[tuple[str, dict[str, Any]]]:
self.initialize_provider_configs()
@@ -640,6 +692,8 @@ class ProvidersManagerTaskRuntime(LoggingMixin):
self._hook_provider_dict.clear()
self._hooks_lazy_dict.clear()
self._plugins_set.clear()
+ self._remote_logging_info_list.clear()
+ self._remote_logging_by_scheme.clear()
self._asset_uri_handlers.clear()
self._asset_factories.clear()
self._asset_to_openlineage_converters.clear()
diff --git a/task-sdk/tests/task_sdk/test_providers_manager_runtime.py
b/task-sdk/tests/task_sdk/test_providers_manager_runtime.py
index 1cae21d53c7..1f22ff6e73f 100644
--- a/task-sdk/tests/task_sdk/test_providers_manager_runtime.py
+++ b/task-sdk/tests/task_sdk/test_providers_manager_runtime.py
@@ -31,7 +31,7 @@ from airflow.sdk._shared.providers_discovery import (
LazyDictWithCache,
ProviderInfo,
)
-from airflow.sdk.providers_manager_runtime import ProvidersManagerTaskRuntime
+from airflow.sdk.providers_manager_runtime import ProvidersManagerTaskRuntime,
RemoteLoggingInfo
from tests_common.test_utils.markers import
skip_if_force_lowest_dependencies_marker, skip_if_not_on_main
from tests_common.test_utils.paths import AIRFLOW_ROOT_PATH
@@ -39,6 +39,16 @@ from tests_common.test_utils.paths import AIRFLOW_ROOT_PATH
PY313 = sys.version_info >= (3, 13)
+class FakeRemoteLogIO:
+ """Importable stub used by remote-logging discovery tests."""
+
+ processors: tuple = ()
+
+ @classmethod
+ def from_config(cls):
+ return cls()
+
+
def test_cleanup_providers_manager_runtime(cleanup_providers_manager):
"""Check the cleanup provider manager functionality."""
provider_manager = ProvidersManagerTaskRuntime()
@@ -277,3 +287,61 @@ class TestProvidersManagerRuntime:
assert conf.get("test_sdk_provider", "test_option") ==
"provider-default"
finally:
conf.invalidate_cache()
+
+ def test_register_remote_logging_by_scheme(self):
+ providers_manager = ProvidersManagerTaskRuntime()
+ providers_manager._provider_dict = LazyDictWithCache()
+ providers_manager._provider_dict["fake.remote.logging"] = ProviderInfo(
+ version="0.0.1",
+ data={
+ "remote-logging": [
+ {
+ "classpath": f"{__name__}.FakeRemoteLogIO",
+ "scheme": "fake",
+ }
+ ]
+ },
+ )
+ providers_manager._discover_remote_logging()
+ assert len(providers_manager._remote_logging_info_list) == 1
+ assert providers_manager._remote_logging_by_scheme["fake"] ==
RemoteLoggingInfo(
+ classpath=f"{__name__}.FakeRemoteLogIO",
+ scheme="fake",
+ package_name="fake.remote.logging",
+ )
+
+ def test_register_remote_logging_duplicate_scheme_first_wins(self):
+ providers_manager = ProvidersManagerTaskRuntime()
+ providers_manager._provider_dict = LazyDictWithCache()
+ providers_manager._provider_dict["fake.remote.logging.first"] =
ProviderInfo(
+ version="0.0.1",
+ data={"remote-logging": [{"classpath":
f"{__name__}.FakeRemoteLogIO", "scheme": "dup"}]},
+ )
+ providers_manager._provider_dict["fake.remote.logging.second"] =
ProviderInfo(
+ version="0.0.1",
+ data={"remote-logging": [{"classpath":
f"{__name__}.FakeRemoteLogIO", "scheme": "dup"}]},
+ )
+ providers_manager._discover_remote_logging()
+ assert providers_manager._remote_logging_by_scheme["dup"].package_name
== (
+ "fake.remote.logging.first"
+ )
+ assert len(providers_manager._remote_logging_info_list) == 1
+ assert providers_manager._remote_logging_info_list[0].package_name ==
"fake.remote.logging.first"
+
+ def test_register_remote_logging_bad_class_filtered(self):
+ providers_manager = ProvidersManagerTaskRuntime()
+ providers_manager._provider_dict = LazyDictWithCache()
+ providers_manager._provider_dict["fake.remote.logging"] = ProviderInfo(
+ version="0.0.1",
+ data={
+ "remote-logging": [
+ {
+ "classpath":
"fake.module.does.not.exist.FakeRemoteLogIO",
+ "scheme": "bad",
+ }
+ ]
+ },
+ )
+ providers_manager._discover_remote_logging()
+ assert "bad" not in providers_manager._remote_logging_by_scheme
+ assert providers_manager._remote_logging_info_list == []