This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new be1dc88a00d Add retries for `_write_logs` method in 
`KubernetesPodOperator` (#64471)
be1dc88a00d is described below

commit be1dc88a00d7847cf12e89ab8d2fb8f5933321e7
Author: Daniel Wolf <[email protected]>
AuthorDate: Wed Apr 1 14:49:25 2026 +0200

    Add retries for `_write_logs` method in `KubernetesPodOperator` (#64471)
    
    * Add retries for `_write_logs` method in `KubernetesPodOperator`
    
    * Reset changes to timezone
---
 .../providers/cncf/kubernetes/operators/pod.py     | 61 ++++++++++++---------
 .../unit/cncf/kubernetes/operators/test_pod.py     | 64 +++++++++++++++++++++-
 2 files changed, 98 insertions(+), 27 deletions(-)

diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
index 04a9de539b5..67494995f0a 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -101,6 +101,8 @@ if TYPE_CHECKING:
     from airflow.providers.cncf.kubernetes.secret import Secret
     from airflow.sdk import Context
 
+log = logging.getLogger(__name__)
+
 alphanum_lower = string.ascii_lowercase + string.digits
 
 KUBE_CONFIG_ENV_VAR = "KUBECONFIG"
@@ -971,7 +973,14 @@ class KubernetesPodOperator(BaseOperator):
 
             if event["status"] in ("error", "failed", "timeout", "success"):
                 if self.get_logs:
-                    self._write_logs(self.pod, follow=follow, 
since_time=last_log_time)
+                    try:
+                        self._write_logs(self.pod, follow=follow, 
since_time=last_log_time)
+                    except (HTTPError, ApiException) as e:
+                        self.log.warning(
+                            "Reading of logs interrupted with error %r. "
+                            "Set log level to DEBUG for traceback.",
+                            e if not isinstance(e, ApiException) else e.reason,
+                        )
 
                 for callback in self.callbacks:
                     callback.on_pod_completion(
@@ -1034,32 +1043,32 @@ class KubernetesPodOperator(BaseOperator):
                 result=result,
             )
 
+    @tenacity.retry(
+        stop=tenacity.stop_after_attempt(3),
+        wait=tenacity.wait_exponential(max=15),
+        retry=tenacity.retry_if_exception_type((HTTPError, ApiException)),
+        before_sleep=tenacity.before_sleep_log(log, logging.WARNING),
+        reraise=True,
+    )
     def _write_logs(self, pod: k8s.V1Pod, follow: bool = False, since_time: 
DateTime | None = None) -> None:
-        try:
-            since_seconds = (
-                math.ceil((datetime.datetime.now(tz=datetime.timezone.utc) - 
since_time).total_seconds())
-                if since_time
-                else None
-            )
-            logs = self.client.read_namespaced_pod_log(
-                name=pod.metadata.name,
-                namespace=pod.metadata.namespace,
-                container=self.base_container_name,
-                follow=follow,
-                timestamps=False,
-                since_seconds=since_seconds,
-                _preload_content=False,
-            )
-            for raw_line in logs:
-                line = raw_line.decode("utf-8", 
errors="backslashreplace").rstrip("\n")
-                if line:
-                    self.log.info("[%s] logs: %s", self.base_container_name, 
line)
-        except (HTTPError, ApiException) as e:
-            self.log.warning(
-                "Reading of logs interrupted with error %r; will retry. "
-                "Set log level to DEBUG for traceback.",
-                e if not isinstance(e, ApiException) else e.reason,
-            )
+        since_seconds = (
+            math.ceil((datetime.datetime.now(tz=datetime.timezone.utc) - 
since_time).total_seconds())
+            if since_time
+            else None
+        )
+        logs = self.client.read_namespaced_pod_log(
+            name=pod.metadata.name,
+            namespace=pod.metadata.namespace,
+            container=self.base_container_name,
+            follow=follow,
+            timestamps=False,
+            since_seconds=since_seconds,
+            _preload_content=False,
+        )
+        for raw_line in logs:
+            line = raw_line.decode("utf-8", 
errors="backslashreplace").rstrip("\n")
+            if line:
+                self.log.info("[%s] logs: %s", self.base_container_name, line)
 
     def post_complete_action(
         self, *, pod: k8s.V1Pod, remote_pod: k8s.V1Pod, context: Context, 
result: dict | None, **kwargs
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
index bc9742688ac..412e0e08a63 100644
--- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
@@ -25,6 +25,7 @@ from unittest.mock import MagicMock, mock_open, patch
 
 import pendulum
 import pytest
+import tenacity
 from kubernetes.client import ApiClient, V1Pod, V1PodSecurityContext, 
V1PodStatus, models as k8s
 from kubernetes.client.exceptions import ApiException
 
@@ -2764,7 +2765,7 @@ class TestKubernetesPodOperatorAsync:
     @patch(HOOK_CLASS)
     @patch(KUB_OP_PATH.format("pod_manager"))
     def test_async_write_logs_handler_api_exception(
-        self, mock_manager, mocked_hook, mock_extract_xcom, 
post_complete_action, mocked_client
+        self, mock_manager, mocked_hook, mock_extract_xcom, mocked_client, 
post_complete_action
     ):
         mocked_client.read_namespaced_pod_log.side_effect = 
ApiException(status=404)
         mock_manager.await_pod_completion.side_effect = 
ApiException(status=404)
@@ -2777,9 +2778,70 @@ class TestKubernetesPodOperatorAsync:
             get_logs=True,
             deferrable=True,
         )
+        # Patch tenacity wait to avoid real delays from _write_logs retries
+        k._write_logs.retry.wait = tenacity.wait_none()
         self.run_pod_async(k)
         post_complete_action.assert_not_called()
 
+    @patch(KUB_OP_PATH.format("post_complete_action"))
+    @patch(KUB_OP_PATH.format("client"))
+    @patch(HOOK_CLASS)
+    @patch(KUB_OP_PATH.format("pod_manager"))
+    def test_write_logs_retries_on_api_exception(
+        self, mock_manager, mocked_hook, mocked_client, post_complete_action
+    ):
+        """Test that _write_logs retries on ApiException and succeeds on 
subsequent attempt."""
+        test_logs = b"log line\n"
+        mocked_client.read_namespaced_pod_log.side_effect = [
+            ApiException(status=500),
+            [test_logs],
+        ]
+        mock_manager.await_pod_completion.return_value = k8s.V1Pod(
+            metadata=k8s.V1ObjectMeta(name=TEST_NAME, namespace=TEST_NAMESPACE)
+        )
+        mocked_hook.return_value.get_pod.return_value = k8s.V1Pod(
+            metadata=k8s.V1ObjectMeta(name=TEST_NAME, namespace=TEST_NAMESPACE)
+        )
+        k = KubernetesPodOperator(
+            task_id="task",
+            get_logs=True,
+            deferrable=True,
+        )
+        # Patch tenacity wait to avoid real delays in tests
+        k._write_logs.retry.wait = tenacity.wait_none()
+        self.run_pod_async(k)
+        assert mocked_client.read_namespaced_pod_log.call_count == 2
+        post_complete_action.assert_called_once()
+
+    @patch(KUB_OP_PATH.format("post_complete_action"))
+    @patch(KUB_OP_PATH.format("client"))
+    @patch(HOOK_CLASS)
+    @patch(KUB_OP_PATH.format("pod_manager"))
+    def test_write_logs_gives_up_after_max_retries(
+        self, mock_manager, mocked_hook, mocked_client, post_complete_action, 
caplog
+    ):
+        """Test that _write_logs gives up after 3 failed attempts and 
trigger_reentry catches the error."""
+        mocked_client.read_namespaced_pod_log.side_effect = 
ApiException(status=500)
+        mock_manager.await_pod_completion.return_value = k8s.V1Pod(
+            metadata=k8s.V1ObjectMeta(name=TEST_NAME, namespace=TEST_NAMESPACE)
+        )
+        mocked_hook.return_value.get_pod.return_value = k8s.V1Pod(
+            metadata=k8s.V1ObjectMeta(name=TEST_NAME, namespace=TEST_NAMESPACE)
+        )
+        k = KubernetesPodOperator(
+            task_id="task",
+            get_logs=True,
+            deferrable=True,
+        )
+        # Patch tenacity wait to avoid real delays in tests
+        k._write_logs.retry.wait = tenacity.wait_none()
+        self.run_pod_async(k)
+        # 3 attempts (stop_after_attempt(3))
+        assert mocked_client.read_namespaced_pod_log.call_count > 1
+        # trigger_reentry catches the error and continues; 
post_complete_action still called via _clean
+        post_complete_action.assert_called_once()
+        assert "Reading of logs interrupted with error" in caplog.text
+
     @pytest.mark.parametrize(
         ("log_pod_spec_on_failure", "expect_match"),
         [

Reply via email to