This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b13129fcd20 Fix Alibaba OSS provider: configurable endpoint and task
handler log … (#66479)
b13129fcd20 is described below
commit b13129fcd2063a794be3a8a65f28efdd42a57242
Author: HasanGzc <[email protected]>
AuthorDate: Tue May 12 01:30:24 2026 +0300
Fix Alibaba OSS provider: configurable endpoint and task handler log …
(#66479)
* Fix Alibaba OSS provider: configurable endpoint and task handler log
reading
Co-authored-by: Cursor <[email protected]>
* Simplify endpoint test to use MagicMock instead of Connection object
---------
Co-authored-by: Cursor <[email protected]>
---
.../airflow/providers/alibaba/cloud/hooks/oss.py | 3 ++-
.../alibaba/cloud/log/oss_task_handler.py | 4 ++--
.../tests/unit/alibaba/cloud/hooks/test_oss.py | 26 ++++++++++++++++++++++
.../alibaba/cloud/log/test_oss_task_handler.py | 21 +++++++++++++++++
4 files changed, 51 insertions(+), 3 deletions(-)
diff --git a/providers/alibaba/src/airflow/providers/alibaba/cloud/hooks/oss.py
b/providers/alibaba/src/airflow/providers/alibaba/cloud/hooks/oss.py
index c1e7b5b2924..b7cec0e00ae 100644
--- a/providers/alibaba/src/airflow/providers/alibaba/cloud/hooks/oss.py
+++ b/providers/alibaba/src/airflow/providers/alibaba/cloud/hooks/oss.py
@@ -97,7 +97,8 @@ class OSSHook(BaseHook):
def _get_client(self) -> oss.Client:
config = oss.config.load_default()
config.region = self.region
- config.endpoint = f"oss-{self.region}.aliyuncs.com"
+ extra_config = self.oss_conn.extra_dejson
+ config.endpoint = extra_config.get("endpoint",
f"oss-{self.region}.aliyuncs.com")
config.credentials_provider = self.get_credential()
return oss.Client(config)
diff --git
a/providers/alibaba/src/airflow/providers/alibaba/cloud/log/oss_task_handler.py
b/providers/alibaba/src/airflow/providers/alibaba/cloud/log/oss_task_handler.py
index 4a50538fc9a..d09261b3df9 100644
---
a/providers/alibaba/src/airflow/providers/alibaba/cloud/log/oss_task_handler.py
+++
b/providers/alibaba/src/airflow/providers/alibaba/cloud/log/oss_task_handler.py
@@ -245,11 +245,11 @@ class OSSTaskHandler(FileTaskHandler, LoggingMixin):
log_relative_path = self._render_filename(ti, try_number)
remote_loc = log_relative_path
- if not self.oss_log_exists(remote_loc):
+ if not self.io.oss_log_exists(remote_loc):
return super()._read(ti, try_number, metadata)
# If OSS remote file exists, we do not fetch logs from task instance
# local machine even if there are errors reading remote logs, as
# returned remote_log will contain error messages.
- remote_log = self.oss_read(remote_loc, return_error=True)
+ remote_log = self.io.oss_read(remote_loc, return_error=True)
log = f"*** Reading remote log from {remote_loc}.\n{remote_log}\n"
return log, {"end_of_log": True}
diff --git a/providers/alibaba/tests/unit/alibaba/cloud/hooks/test_oss.py
b/providers/alibaba/tests/unit/alibaba/cloud/hooks/test_oss.py
index 19d95cae4e7..3860dd35ed5 100644
--- a/providers/alibaba/tests/unit/alibaba/cloud/hooks/test_oss.py
+++ b/providers/alibaba/tests/unit/alibaba/cloud/hooks/test_oss.py
@@ -198,3 +198,29 @@ class TestOSSHook:
def test_get_default_region(self):
assert self.hook.get_default_region() == "mock_region"
+
+ @mock.patch(OSS_STRING.format("oss.config.load_default"))
+ def test_get_client_uses_default_endpoint(self, mock_load_default):
+ mock_config = mock.MagicMock()
+ mock_load_default.return_value = mock_config
+
+ self.hook._get_client()
+
+ assert mock_config.endpoint == f"oss-{self.hook.region}.aliyuncs.com"
+
+ @mock.patch(OSS_STRING.format("oss.config.load_default"))
+ def test_get_client_uses_custom_endpoint_from_connection(self,
mock_load_default):
+ mock_config = mock.MagicMock()
+ mock_load_default.return_value = mock_config
+
+ custom_ep = "oss-eu-central-1-internal.aliyuncs.com"
+
+ mock_conn = mock.MagicMock()
+ mock_conn.extra_dejson = {"endpoint": custom_ep}
+
+ self.hook.oss_conn = mock_conn
+ self.hook.get_credential = mock.MagicMock()
+
+ self.hook._get_client()
+
+ assert mock_config.endpoint == custom_ep
diff --git
a/providers/alibaba/tests/unit/alibaba/cloud/log/test_oss_task_handler.py
b/providers/alibaba/tests/unit/alibaba/cloud/log/test_oss_task_handler.py
index cfdcdcab480..24131d44346 100644
--- a/providers/alibaba/tests/unit/alibaba/cloud/log/test_oss_task_handler.py
+++ b/providers/alibaba/tests/unit/alibaba/cloud/log/test_oss_task_handler.py
@@ -208,3 +208,24 @@ class TestOSSTaskHandler:
def test_filename_template_for_backward_compatibility(self):
# filename_template arg support for running the latest provider on
airflow 2
OSSTaskHandler(self.base_log_folder, self.oss_log_folder,
filename_template=None)
+
+ @mock.patch(OSS_TASK_HANDLER_STRING.format("OSSRemoteLogIO.oss_read"))
+
@mock.patch(OSS_TASK_HANDLER_STRING.format("OSSRemoteLogIO.oss_log_exists"))
+ def test_read_calls_oss_methods_via_io(self, mock_log_exists,
mock_oss_read):
+ mock_log_exists.return_value = True
+ mock_oss_read.return_value = "mock log content"
+
+ log, metadata = self.oss_task_handler._read(self.ti,
self.ti.try_number)
+
+ mock_log_exists.assert_called_once()
+ mock_oss_read.assert_called_once()
+ assert "mock log content" in log
+ assert metadata == {"end_of_log": True}
+
+
@mock.patch(OSS_TASK_HANDLER_STRING.format("OSSRemoteLogIO.oss_log_exists"))
+ def test_read_falls_back_to_local_when_no_remote_log(self,
mock_log_exists):
+ mock_log_exists.return_value = False
+
+ self.oss_task_handler._read(self.ti, self.ti.try_number)
+
+ mock_log_exists.assert_called_once()