This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new be1dc88a00d Add retries for `_write_logs` method in
`KubernetesPodOperator` (#64471)
be1dc88a00d is described below
commit be1dc88a00d7847cf12e89ab8d2fb8f5933321e7
Author: Daniel Wolf <[email protected]>
AuthorDate: Wed Apr 1 14:49:25 2026 +0200
Add retries for `_write_logs` method in `KubernetesPodOperator` (#64471)
* Add retries for `_write_logs` method in `KubernetesPodOperator`
* Reset changes to timezone
---
.../providers/cncf/kubernetes/operators/pod.py | 61 ++++++++++++---------
.../unit/cncf/kubernetes/operators/test_pod.py | 64 +++++++++++++++++++++-
2 files changed, 98 insertions(+), 27 deletions(-)
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
index 04a9de539b5..67494995f0a 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -101,6 +101,8 @@ if TYPE_CHECKING:
from airflow.providers.cncf.kubernetes.secret import Secret
from airflow.sdk import Context
+log = logging.getLogger(__name__)
+
alphanum_lower = string.ascii_lowercase + string.digits
KUBE_CONFIG_ENV_VAR = "KUBECONFIG"
@@ -971,7 +973,14 @@ class KubernetesPodOperator(BaseOperator):
if event["status"] in ("error", "failed", "timeout", "success"):
if self.get_logs:
- self._write_logs(self.pod, follow=follow,
since_time=last_log_time)
+ try:
+ self._write_logs(self.pod, follow=follow,
since_time=last_log_time)
+ except (HTTPError, ApiException) as e:
+ self.log.warning(
+ "Reading of logs interrupted with error %r. "
+ "Set log level to DEBUG for traceback.",
+ e if not isinstance(e, ApiException) else e.reason,
+ )
for callback in self.callbacks:
callback.on_pod_completion(
@@ -1034,32 +1043,32 @@ class KubernetesPodOperator(BaseOperator):
result=result,
)
+ @tenacity.retry(
+ stop=tenacity.stop_after_attempt(3),
+ wait=tenacity.wait_exponential(max=15),
+ retry=tenacity.retry_if_exception_type((HTTPError, ApiException)),
+ before_sleep=tenacity.before_sleep_log(log, logging.WARNING),
+ reraise=True,
+ )
def _write_logs(self, pod: k8s.V1Pod, follow: bool = False, since_time:
DateTime | None = None) -> None:
- try:
- since_seconds = (
- math.ceil((datetime.datetime.now(tz=datetime.timezone.utc) -
since_time).total_seconds())
- if since_time
- else None
- )
- logs = self.client.read_namespaced_pod_log(
- name=pod.metadata.name,
- namespace=pod.metadata.namespace,
- container=self.base_container_name,
- follow=follow,
- timestamps=False,
- since_seconds=since_seconds,
- _preload_content=False,
- )
- for raw_line in logs:
- line = raw_line.decode("utf-8",
errors="backslashreplace").rstrip("\n")
- if line:
- self.log.info("[%s] logs: %s", self.base_container_name,
line)
- except (HTTPError, ApiException) as e:
- self.log.warning(
- "Reading of logs interrupted with error %r; will retry. "
- "Set log level to DEBUG for traceback.",
- e if not isinstance(e, ApiException) else e.reason,
- )
+ since_seconds = (
+ math.ceil((datetime.datetime.now(tz=datetime.timezone.utc) -
since_time).total_seconds())
+ if since_time
+ else None
+ )
+ logs = self.client.read_namespaced_pod_log(
+ name=pod.metadata.name,
+ namespace=pod.metadata.namespace,
+ container=self.base_container_name,
+ follow=follow,
+ timestamps=False,
+ since_seconds=since_seconds,
+ _preload_content=False,
+ )
+ for raw_line in logs:
+ line = raw_line.decode("utf-8",
errors="backslashreplace").rstrip("\n")
+ if line:
+ self.log.info("[%s] logs: %s", self.base_container_name, line)
def post_complete_action(
self, *, pod: k8s.V1Pod, remote_pod: k8s.V1Pod, context: Context,
result: dict | None, **kwargs
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
index bc9742688ac..412e0e08a63 100644
--- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
@@ -25,6 +25,7 @@ from unittest.mock import MagicMock, mock_open, patch
import pendulum
import pytest
+import tenacity
from kubernetes.client import ApiClient, V1Pod, V1PodSecurityContext,
V1PodStatus, models as k8s
from kubernetes.client.exceptions import ApiException
@@ -2764,7 +2765,7 @@ class TestKubernetesPodOperatorAsync:
@patch(HOOK_CLASS)
@patch(KUB_OP_PATH.format("pod_manager"))
def test_async_write_logs_handler_api_exception(
- self, mock_manager, mocked_hook, mock_extract_xcom,
post_complete_action, mocked_client
+ self, mock_manager, mocked_hook, mock_extract_xcom, mocked_client,
post_complete_action
):
mocked_client.read_namespaced_pod_log.side_effect =
ApiException(status=404)
mock_manager.await_pod_completion.side_effect =
ApiException(status=404)
@@ -2777,9 +2778,70 @@ class TestKubernetesPodOperatorAsync:
get_logs=True,
deferrable=True,
)
+ # Patch tenacity wait to avoid real delays from _write_logs retries
+ k._write_logs.retry.wait = tenacity.wait_none()
self.run_pod_async(k)
post_complete_action.assert_not_called()
+ @patch(KUB_OP_PATH.format("post_complete_action"))
+ @patch(KUB_OP_PATH.format("client"))
+ @patch(HOOK_CLASS)
+ @patch(KUB_OP_PATH.format("pod_manager"))
+ def test_write_logs_retries_on_api_exception(
+ self, mock_manager, mocked_hook, mocked_client, post_complete_action
+ ):
+ """Test that _write_logs retries on ApiException and succeeds on
subsequent attempt."""
+ test_logs = b"log line\n"
+ mocked_client.read_namespaced_pod_log.side_effect = [
+ ApiException(status=500),
+ [test_logs],
+ ]
+ mock_manager.await_pod_completion.return_value = k8s.V1Pod(
+ metadata=k8s.V1ObjectMeta(name=TEST_NAME, namespace=TEST_NAMESPACE)
+ )
+ mocked_hook.return_value.get_pod.return_value = k8s.V1Pod(
+ metadata=k8s.V1ObjectMeta(name=TEST_NAME, namespace=TEST_NAMESPACE)
+ )
+ k = KubernetesPodOperator(
+ task_id="task",
+ get_logs=True,
+ deferrable=True,
+ )
+ # Patch tenacity wait to avoid real delays in tests
+ k._write_logs.retry.wait = tenacity.wait_none()
+ self.run_pod_async(k)
+ assert mocked_client.read_namespaced_pod_log.call_count == 2
+ post_complete_action.assert_called_once()
+
+ @patch(KUB_OP_PATH.format("post_complete_action"))
+ @patch(KUB_OP_PATH.format("client"))
+ @patch(HOOK_CLASS)
+ @patch(KUB_OP_PATH.format("pod_manager"))
+ def test_write_logs_gives_up_after_max_retries(
+ self, mock_manager, mocked_hook, mocked_client, post_complete_action,
caplog
+ ):
+ """Test that _write_logs gives up after 3 failed attempts and
trigger_reentry catches the error."""
+ mocked_client.read_namespaced_pod_log.side_effect =
ApiException(status=500)
+ mock_manager.await_pod_completion.return_value = k8s.V1Pod(
+ metadata=k8s.V1ObjectMeta(name=TEST_NAME, namespace=TEST_NAMESPACE)
+ )
+ mocked_hook.return_value.get_pod.return_value = k8s.V1Pod(
+ metadata=k8s.V1ObjectMeta(name=TEST_NAME, namespace=TEST_NAMESPACE)
+ )
+ k = KubernetesPodOperator(
+ task_id="task",
+ get_logs=True,
+ deferrable=True,
+ )
+ # Patch tenacity wait to avoid real delays in tests
+ k._write_logs.retry.wait = tenacity.wait_none()
+ self.run_pod_async(k)
+ # 3 attempts (stop_after_attempt(3))
+ assert mocked_client.read_namespaced_pod_log.call_count > 1
+ # trigger_reentry catches the error and continues;
post_complete_action still called via _clean
+ post_complete_action.assert_called_once()
+ assert "Reading of logs interrupted with error" in caplog.text
+
@pytest.mark.parametrize(
("log_pod_spec_on_failure", "expect_match"),
[