This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 33ee0b98b2 Add _request_timeout to KPO log fetch calls (#36297)
33ee0b98b2 is described below

commit 33ee0b98b2731ecdb27fc67d33e121948dd63c68
Author: Daniel Standish <[email protected]>
AuthorDate: Fri Dec 22 05:56:04 2023 -0800

    Add _request_timeout to KPO log fetch calls (#36297)
    
    When param _request_timeout is not provided, the default is no timeout, 
which means that sometimes a log read through the kubernetes API may hang 
forever.  See more details 
[here](https://github.com/kubernetes-client/python/blob/2270ff927e8b5e5340aa7f4cc023293fa4d57eb3/examples/watch/timeout-settings.md).
  We add the timeout so that connections will be dropped, and there is 
appropriate logic to resume after disconnect.  One thing that's maybe not 
perfect is we increment the "since_tim [...]
---
 .../providers/cncf/kubernetes/utils/pod_manager.py | 48 ++++++++++++++++------
 1 file changed, 35 insertions(+), 13 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py 
b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index 2f8f300096..e2d0efac83 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -37,7 +37,7 @@ from kubernetes.stream import stream as kubernetes_stream
 from pendulum import DateTime
 from pendulum.parsing.exceptions import ParserError
 from typing_extensions import Literal
-from urllib3.exceptions import HTTPError as BaseHTTPError
+from urllib3.exceptions import HTTPError, TimeoutError
 
 from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
 from airflow.providers.cncf.kubernetes.pod_generator import PodDefaults
@@ -395,7 +395,7 @@ class PodManager(LoggingMixin):
         :meta private:
         """
 
-        def consume_logs(*, since_time: DateTime | None = None) -> DateTime | 
None:
+        def consume_logs(*, since_time: DateTime | None = None) -> 
tuple[DateTime | None, Exception | None]:
             """
             Try to follow container logs until container completes.
 
@@ -404,7 +404,18 @@ class PodManager(LoggingMixin):
 
             Returns the last timestamp observed in logs.
             """
+            exception = None
             last_captured_timestamp = None
+            # We timeout connections after 30 minutes because otherwise they 
can get
+            # stuck forever. The 30 is somewhat arbitrary.
+            # As a consequence, a TimeoutError will be raised no more than 30 
minutes
+            # after starting read.
+            connection_timeout = 60 * 30
+            # We set a shorter read timeout because that helps reduce 
*connection* timeouts
+            # (since the connection will be restarted periodically). And with 
read timeout,
+            # we don't need to worry about either duplicate messages or losing 
messages; we
+            # can safely resume from a few seconds later
+            read_timeout = 60 * 5
             try:
                 logs = self.read_pod_logs(
                     pod=pod,
@@ -415,6 +426,7 @@ class PodManager(LoggingMixin):
                     ),
                     follow=follow,
                     post_termination_timeout=post_termination_timeout,
+                    _request_timeout=(connection_timeout, read_timeout),
                 )
                 message_to_log = None
                 message_timestamp = None
@@ -447,29 +459,37 @@ class PodManager(LoggingMixin):
                             self._progress_callback(line)
                     self.log.info("[%s] %s", container_name, message_to_log)
                     last_captured_timestamp = message_timestamp
-            except BaseHTTPError:
+            except TimeoutError as e:
+                # in case of timeout, increment return time by 2 seconds to 
avoid
+                # duplicate log entries
+                if val := (last_captured_timestamp or since_time):
+                    return val.add(seconds=2), e
+            except HTTPError as e:
+                exception = e
                 self.log.exception(
                     "Reading of logs interrupted for container %r; will 
retry.",
                     container_name,
                 )
-            return last_captured_timestamp or since_time
+            return last_captured_timestamp or since_time, exception
 
         # note: `read_pod_logs` follows the logs, so we shouldn't necessarily 
*need* to
         # loop as we do here. But in a long-running process we might 
temporarily lose connectivity.
         # So the looping logic is there to let us resume following the logs.
         last_log_time = since_time
         while True:
-            last_log_time = consume_logs(since_time=last_log_time)
+            last_log_time, exc = consume_logs(since_time=last_log_time)
             if not self.container_is_running(pod, 
container_name=container_name):
                 return PodLoggingStatus(running=False, 
last_log_time=last_log_time)
             if not follow:
                 return PodLoggingStatus(running=True, 
last_log_time=last_log_time)
             else:
-                self.log.warning(
-                    "Pod %s log read interrupted but container %s still 
running",
-                    pod.metadata.name,
-                    container_name,
-                )
+                # a timeout is a normal thing and we ignore it and resume 
following logs
+                if not isinstance(exc, TimeoutError):
+                    self.log.warning(
+                        "Pod %s log read interrupted but container %s still 
running",
+                        pod.metadata.name,
+                        container_name,
+                    )
                 time.sleep(1)
 
     def _reconcile_requested_log_containers(
@@ -610,6 +630,7 @@ class PodManager(LoggingMixin):
         since_seconds: int | None = None,
         follow=True,
         post_termination_timeout: int = 120,
+        **kwargs,
     ) -> PodLogsConsumer:
         """Read log from the POD."""
         additional_kwargs = {}
@@ -618,6 +639,7 @@ class PodManager(LoggingMixin):
 
         if tail_lines:
             additional_kwargs["tail_lines"] = tail_lines
+        additional_kwargs.update(**kwargs)
 
         try:
             logs = self._client.read_namespaced_pod_log(
@@ -629,7 +651,7 @@ class PodManager(LoggingMixin):
                 _preload_content=False,
                 **additional_kwargs,
             )
-        except BaseHTTPError:
+        except HTTPError:
             self.log.exception("There was an error reading the kubernetes 
API.")
             raise
 
@@ -658,7 +680,7 @@ class PodManager(LoggingMixin):
             return self._client.list_namespaced_event(
                 namespace=pod.metadata.namespace, 
field_selector=f"involvedObject.name={pod.metadata.name}"
             )
-        except BaseHTTPError as e:
+        except HTTPError as e:
             raise AirflowException(f"There was an error reading the kubernetes 
API: {e}")
 
     @tenacity.retry(stop=tenacity.stop_after_attempt(3), 
wait=tenacity.wait_exponential(), reraise=True)
@@ -666,7 +688,7 @@ class PodManager(LoggingMixin):
         """Read POD information."""
         try:
             return self._client.read_namespaced_pod(pod.metadata.name, 
pod.metadata.namespace)
-        except BaseHTTPError as e:
+        except HTTPError as e:
             raise AirflowException(f"There was an error reading the kubernetes 
API: {e}")
 
     def await_xcom_sidecar_container_start(self, pod: V1Pod) -> None:

Reply via email to