This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 823b86e006d Fix remote_task_handler_kwargs passing handler params to
RemoteLogIO (#65957)
823b86e006d is described below
commit 823b86e006d30f118093e0ae76d25aecabd92ecb
Author: Sadha Chilukoori <[email protected]>
AuthorDate: Tue May 5 12:07:11 2026 -0700
Fix remote_task_handler_kwargs passing handler params to RemoteLogIO
(#65957)
* Fix remote_task_handler_kwargs passing handler params to RemoteLogIO
Split remote_task_handler_kwargs into handler-level params (max_bytes,
backup_count, delay) and IO-level params before the provider if/elif
chain. Handler params go to DEFAULT_LOGGING_CONFIG["handlers"]["task"],
IO params go to the RemoteLogIO constructor.
Previously all kwargs were passed to RemoteLogIO (causing TypeError
for attrs-based classes) and then reset to {}, so handler params
never reached FileTaskHandler.
closes: #58770
* Derive FileTaskHandler params dynamically via inspect instead of
hardcoding
Address review feedback: use inspect.signature(FileTaskHandler.__init__)
to determine which remote_task_handler_kwargs belong to the file handler
vs the RemoteLogIO constructor. Remove dead _io_kwargs = {} reset lines.
* Move inspect and FileTaskHandler imports to top level
---------
Co-authored-by: Sadha Chilukoori <[email protected]>
---
.../config_templates/airflow_local_settings.py | 35 ++++---
.../test_airflow_local_settings.py | 107 ++++++++++++++++-----
2 files changed, 103 insertions(+), 39 deletions(-)
diff --git
a/airflow-core/src/airflow/config_templates/airflow_local_settings.py
b/airflow-core/src/airflow/config_templates/airflow_local_settings.py
index 04ca2a32f97..7072d2dd6ea 100644
--- a/airflow-core/src/airflow/config_templates/airflow_local_settings.py
+++ b/airflow-core/src/airflow/config_templates/airflow_local_settings.py
@@ -19,12 +19,14 @@
from __future__ import annotations
+import inspect
import os
from typing import TYPE_CHECKING, Any, cast
from urllib.parse import urlsplit
from airflow.configuration import conf
from airflow.exceptions import AirflowException
+from airflow.utils.log.file_task_handler import FileTaskHandler
if TYPE_CHECKING:
from airflow.logging.remote import RemoteLogIO, RemoteLogStreamIO
@@ -159,7 +161,13 @@ if REMOTE_LOGGING:
"logging/remote_task_handler_kwargs must be a JSON object (a
python dict), we got "
f"{type(remote_task_handler_kwargs)}"
)
- _handler_kwargs = cast("dict[str, Any]", remote_task_handler_kwargs)
+ _all_kwargs = cast("dict[str, Any]", remote_task_handler_kwargs)
+ _fth_params =
frozenset(inspect.signature(FileTaskHandler.__init__).parameters) - {
+ "self",
+ "base_log_folder",
+ }
+ _file_handler_kwargs = {k: v for k, v in _all_kwargs.items() if k in
_fth_params}
+ _io_kwargs = {k: v for k, v in _all_kwargs.items() if k not in _fth_params}
delete_local_copy = conf.getboolean("logging", "delete_local_logs")
if remote_base_log_folder.startswith("s3://"):
@@ -174,10 +182,9 @@ if REMOTE_LOGGING:
"remote_base": remote_base_log_folder,
"delete_local_copy": delete_local_copy,
}
- | _handler_kwargs,
+ | _io_kwargs,
)
)
- _handler_kwargs = {}
elif remote_base_log_folder.startswith("cloudwatch://"):
from airflow.providers.amazon.aws.log.cloudwatch_task_handler import
CloudWatchRemoteLogIO
@@ -193,10 +200,10 @@ if REMOTE_LOGGING:
"delete_local_copy": delete_local_copy,
"log_group_arn": url_parts.netloc + url_parts.path,
}
- | _handler_kwargs,
+ | _io_kwargs,
)
)
- _handler_kwargs = {}
+
elif remote_base_log_folder.startswith("gs://"):
from airflow.providers.google.cloud.log.gcs_task_handler import
GCSRemoteLogIO
@@ -212,10 +219,10 @@ if REMOTE_LOGGING:
"delete_local_copy": delete_local_copy,
"gcp_key_path": key_path,
}
- | _handler_kwargs,
+ | _io_kwargs,
)
)
- _handler_kwargs = {}
+
elif remote_base_log_folder.startswith("wasb"):
from airflow.providers.microsoft.azure.log.wasb_task_handler import
WasbRemoteLogIO
@@ -236,10 +243,10 @@ if REMOTE_LOGGING:
"delete_local_copy": delete_local_copy,
"wasb_container": wasb_log_container,
}
- | _handler_kwargs,
+ | _io_kwargs,
)
)
- _handler_kwargs = {}
+
elif remote_base_log_folder.startswith("stackdriver://"):
key_path = conf.get_mandatory_value("logging", "GOOGLE_KEY_PATH",
fallback=None)
# stackdriver:///airflow-tasks => airflow-tasks
@@ -267,10 +274,10 @@ if REMOTE_LOGGING:
"remote_base": remote_base_log_folder,
"delete_local_copy": delete_local_copy,
}
- | _handler_kwargs,
+ | _io_kwargs,
)
)
- _handler_kwargs = {}
+
elif remote_base_log_folder.startswith("hdfs://"):
from airflow.providers.apache.hdfs.log.hdfs_task_handler import
HdfsRemoteLogIO
@@ -284,10 +291,10 @@ if REMOTE_LOGGING:
"remote_base": urlsplit(remote_base_log_folder).path,
"delete_local_copy": delete_local_copy,
}
- | _handler_kwargs,
+ | _io_kwargs,
)
)
- _handler_kwargs = {}
+
elif ELASTICSEARCH_HOST:
from airflow.providers.elasticsearch.log.es_task_handler import
ElasticsearchRemoteLogIO
@@ -370,4 +377,4 @@ if REMOTE_LOGGING:
"section 'elasticsearch' if you are using Elasticsearch. In the
other case, "
"'remote_base_log_folder' option in the 'logging' section."
)
- DEFAULT_LOGGING_CONFIG["handlers"]["task"].update(_handler_kwargs)
+ DEFAULT_LOGGING_CONFIG["handlers"]["task"].update(_file_handler_kwargs)
diff --git
a/airflow-core/tests/unit/config_templates/test_airflow_local_settings.py
b/airflow-core/tests/unit/config_templates/test_airflow_local_settings.py
index 025dbdf571f..d131efa0026 100644
--- a/airflow-core/tests/unit/config_templates/test_airflow_local_settings.py
+++ b/airflow-core/tests/unit/config_templates/test_airflow_local_settings.py
@@ -19,15 +19,30 @@
from __future__ import annotations
import importlib
+import inspect
import json
from unittest import mock
import pytest
from airflow.config_templates import airflow_local_settings
+from airflow.utils.log.file_task_handler import FileTaskHandler
from tests_common.test_utils.config import conf_vars
+REMOTE_IO_PROVIDERS = [
+ ("s3://bucket/path",
"airflow.providers.amazon.aws.log.s3_task_handler.S3RemoteLogIO"),
+ ("wasb-logs",
"airflow.providers.microsoft.azure.log.wasb_task_handler.WasbRemoteLogIO"),
+ ("gs://bucket/path",
"airflow.providers.google.cloud.log.gcs_task_handler.GCSRemoteLogIO"),
+ (
+ "cloudwatch://arn:aws:logs:us-east-1:0:log-group:foo",
+
"airflow.providers.amazon.aws.log.cloudwatch_task_handler.CloudWatchRemoteLogIO",
+ ),
+ ("oss://bucket/path",
"airflow.providers.alibaba.cloud.log.oss_task_handler.OSSRemoteLogIO"),
+ ("hdfs://host/path",
"airflow.providers.apache.hdfs.log.hdfs_task_handler.HdfsRemoteLogIO"),
+]
+REMOTE_IO_IDS = ["s3", "wasb", "gcs", "cloudwatch", "oss", "hdfs"]
+
@pytest.fixture
def restore_local_settings():
@@ -35,42 +50,84 @@ def restore_local_settings():
importlib.reload(airflow_local_settings)
[email protected](
- ("remote_base", "remote_io_path"),
- [
- ("s3://bucket/path",
"airflow.providers.amazon.aws.log.s3_task_handler.S3RemoteLogIO"),
- ("wasb-logs",
"airflow.providers.microsoft.azure.log.wasb_task_handler.WasbRemoteLogIO"),
- ("gs://bucket/path",
"airflow.providers.google.cloud.log.gcs_task_handler.GCSRemoteLogIO"),
- (
- "cloudwatch://arn:aws:logs:us-east-1:0:log-group:foo",
-
"airflow.providers.amazon.aws.log.cloudwatch_task_handler.CloudWatchRemoteLogIO",
- ),
- ("oss://bucket/path",
"airflow.providers.alibaba.cloud.log.oss_task_handler.OSSRemoteLogIO"),
- ("hdfs://host/path",
"airflow.providers.apache.hdfs.log.hdfs_task_handler.HdfsRemoteLogIO"),
- ],
- ids=["s3", "wasb", "gcs", "cloudwatch", "oss", "hdfs"],
-)
-def test_remote_task_handler_kwargs_not_leaked_to_local_task_handler(
- remote_base, remote_io_path, restore_local_settings
-):
- """Verify remote_task_handler_kwargs are passed to RemoteLogIO and not
leaked to FileTaskHandler."""
[email protected](("remote_base", "remote_io_path"),
REMOTE_IO_PROVIDERS, ids=REMOTE_IO_IDS)
+def test_io_kwargs_forwarded_to_remote_log_io(remote_base, remote_io_path,
restore_local_settings):
+ """IO-level kwargs reach the RemoteLogIO constructor and don't leak into
the handler config."""
pytest.importorskip(remote_io_path.rsplit(".", 1)[0])
- user_kwargs = {"remote_base": "ignored", "custom_key": "v"}
+ io_kwargs = {"remote_base": "ignored", "custom_key": "v"}
with (
mock.patch(remote_io_path) as mock_remote_io,
conf_vars(
{
("logging", "remote_logging"): "True",
("logging", "remote_base_log_folder"): remote_base,
- ("logging", "remote_task_handler_kwargs"):
json.dumps(user_kwargs),
+ ("logging", "remote_task_handler_kwargs"):
json.dumps(io_kwargs),
}
),
):
importlib.reload(airflow_local_settings)
task_cfg =
airflow_local_settings.DEFAULT_LOGGING_CONFIG["handlers"]["task"]
- for k in user_kwargs:
- assert k not in task_cfg, f"{k!r} leaked into task handler for
{remote_base}"
+ for k in io_kwargs:
+ assert k not in task_cfg, f"IO kwarg {k!r} leaked into task
handler config"
- # Verify kwargs were passed to REMOTE_TASK_LOG
- for k, v in user_kwargs.items():
+ for k, v in io_kwargs.items():
assert mock_remote_io.call_args.kwargs[k] == v
+
+
[email protected](("remote_base", "remote_io_path"),
REMOTE_IO_PROVIDERS, ids=REMOTE_IO_IDS)
+def test_handler_kwargs_reach_file_task_handler(remote_base, remote_io_path,
restore_local_settings):
+ """Handler-level kwargs (max_bytes, backup_count, delay) reach the
FileTaskHandler config."""
+ pytest.importorskip(remote_io_path.rsplit(".", 1)[0])
+ handler_kwargs = {"max_bytes": 5_000_000, "backup_count": 5}
+ with (
+ mock.patch(remote_io_path) as mock_remote_io,
+ conf_vars(
+ {
+ ("logging", "remote_logging"): "True",
+ ("logging", "remote_base_log_folder"): remote_base,
+ ("logging", "remote_task_handler_kwargs"):
json.dumps(handler_kwargs),
+ }
+ ),
+ ):
+ importlib.reload(airflow_local_settings)
+ task_cfg =
airflow_local_settings.DEFAULT_LOGGING_CONFIG["handlers"]["task"]
+ for k, v in handler_kwargs.items():
+ assert task_cfg[k] == v, f"Handler kwarg {k!r} not found in task
handler config"
+
+ for k in handler_kwargs:
+ assert k not in mock_remote_io.call_args.kwargs, (
+ f"Handler kwarg {k!r} leaked into RemoteLogIO constructor"
+ )
+
+
[email protected](("remote_base", "remote_io_path"),
REMOTE_IO_PROVIDERS, ids=REMOTE_IO_IDS)
+def test_mixed_kwargs_split_correctly(remote_base, remote_io_path,
restore_local_settings):
+ """When both handler and IO kwargs are present, each goes to the right
place."""
+ pytest.importorskip(remote_io_path.rsplit(".", 1)[0])
+ mixed_kwargs = {"max_bytes": 5_000_000, "backup_count": 5,
"custom_io_key": "val"}
+ with (
+ mock.patch(remote_io_path) as mock_remote_io,
+ conf_vars(
+ {
+ ("logging", "remote_logging"): "True",
+ ("logging", "remote_base_log_folder"): remote_base,
+ ("logging", "remote_task_handler_kwargs"):
json.dumps(mixed_kwargs),
+ }
+ ),
+ ):
+ importlib.reload(airflow_local_settings)
+ task_cfg =
airflow_local_settings.DEFAULT_LOGGING_CONFIG["handlers"]["task"]
+
+ assert task_cfg["max_bytes"] == 5_000_000
+ assert task_cfg["backup_count"] == 5
+ assert "custom_io_key" not in task_cfg
+
+ assert mock_remote_io.call_args.kwargs["custom_io_key"] == "val"
+ assert "max_bytes" not in mock_remote_io.call_args.kwargs
+ assert "backup_count" not in mock_remote_io.call_args.kwargs
+
+
+def test_file_handler_params_introspected_correctly():
+ """The introspected FileTaskHandler params include the expected kwargs."""
+ init_params = set(inspect.signature(FileTaskHandler.__init__).parameters)
- {"self", "base_log_folder"}
+ assert {"max_bytes", "backup_count", "delay"} <= init_params