This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 33ee0b98b2 Add _request_timeout to KPO log fetch calls (#36297)
33ee0b98b2 is described below
commit 33ee0b98b2731ecdb27fc67d33e121948dd63c68
Author: Daniel Standish <[email protected]>
AuthorDate: Fri Dec 22 05:56:04 2023 -0800
Add _request_timeout to KPO log fetch calls (#36297)
When param _request_timeout is not provided, the default is no timeout,
which means that sometimes a log read through the kubernetes API may hang
forever. See more details
[here](https://github.com/kubernetes-client/python/blob/2270ff927e8b5e5340aa7f4cc023293fa4d57eb3/examples/watch/timeout-settings.md).
We add the timeout so that connections will be dropped, and there is
appropriate logic to resume after disconnect. One thing that's maybe not
perfect is we increment the "since_tim [...]
---
.../providers/cncf/kubernetes/utils/pod_manager.py | 48 ++++++++++++++++------
1 file changed, 35 insertions(+), 13 deletions(-)
diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index 2f8f300096..e2d0efac83 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -37,7 +37,7 @@ from kubernetes.stream import stream as kubernetes_stream
from pendulum import DateTime
from pendulum.parsing.exceptions import ParserError
from typing_extensions import Literal
-from urllib3.exceptions import HTTPError as BaseHTTPError
+from urllib3.exceptions import HTTPError, TimeoutError
from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning
from airflow.providers.cncf.kubernetes.pod_generator import PodDefaults
@@ -395,7 +395,7 @@ class PodManager(LoggingMixin):
:meta private:
"""
- def consume_logs(*, since_time: DateTime | None = None) -> DateTime |
None:
+ def consume_logs(*, since_time: DateTime | None = None) ->
tuple[DateTime | None, Exception | None]:
"""
Try to follow container logs until container completes.
@@ -404,7 +404,18 @@ class PodManager(LoggingMixin):
Returns the last timestamp observed in logs.
"""
+ exception = None
last_captured_timestamp = None
+ # We timeout connections after 30 minutes because otherwise they
can get
+ # stuck forever. The 30 is somewhat arbitrary.
+ # As a consequence, a TimeoutError will be raised no more than 30
minutes
+ # after starting read.
+ connection_timeout = 60 * 30
+ # We set a shorter read timeout because that helps reduce
*connection* timeouts
+ # (since the connection will be restarted periodically). And with
read timeout,
+ # we don't need to worry about either duplicate messages or losing
messages; we
+ # can safely resume from a few seconds later
+ read_timeout = 60 * 5
try:
logs = self.read_pod_logs(
pod=pod,
@@ -415,6 +426,7 @@ class PodManager(LoggingMixin):
),
follow=follow,
post_termination_timeout=post_termination_timeout,
+ _request_timeout=(connection_timeout, read_timeout),
)
message_to_log = None
message_timestamp = None
@@ -447,29 +459,37 @@ class PodManager(LoggingMixin):
self._progress_callback(line)
self.log.info("[%s] %s", container_name, message_to_log)
last_captured_timestamp = message_timestamp
- except BaseHTTPError:
+ except TimeoutError as e:
+ # in case of timeout, increment return time by 2 seconds to
avoid
+ # duplicate log entries
+ if val := (last_captured_timestamp or since_time):
+ return val.add(seconds=2), e
+ except HTTPError as e:
+ exception = e
self.log.exception(
"Reading of logs interrupted for container %r; will
retry.",
container_name,
)
- return last_captured_timestamp or since_time
+ return last_captured_timestamp or since_time, exception
# note: `read_pod_logs` follows the logs, so we shouldn't necessarily
*need* to
# loop as we do here. But in a long-running process we might
temporarily lose connectivity.
# So the looping logic is there to let us resume following the logs.
last_log_time = since_time
while True:
- last_log_time = consume_logs(since_time=last_log_time)
+ last_log_time, exc = consume_logs(since_time=last_log_time)
if not self.container_is_running(pod,
container_name=container_name):
return PodLoggingStatus(running=False,
last_log_time=last_log_time)
if not follow:
return PodLoggingStatus(running=True,
last_log_time=last_log_time)
else:
- self.log.warning(
- "Pod %s log read interrupted but container %s still
running",
- pod.metadata.name,
- container_name,
- )
+ # a timeout is a normal thing and we ignore it and resume
following logs
+ if not isinstance(exc, TimeoutError):
+ self.log.warning(
+ "Pod %s log read interrupted but container %s still
running",
+ pod.metadata.name,
+ container_name,
+ )
time.sleep(1)
def _reconcile_requested_log_containers(
@@ -610,6 +630,7 @@ class PodManager(LoggingMixin):
since_seconds: int | None = None,
follow=True,
post_termination_timeout: int = 120,
+ **kwargs,
) -> PodLogsConsumer:
"""Read log from the POD."""
additional_kwargs = {}
@@ -618,6 +639,7 @@ class PodManager(LoggingMixin):
if tail_lines:
additional_kwargs["tail_lines"] = tail_lines
+ additional_kwargs.update(**kwargs)
try:
logs = self._client.read_namespaced_pod_log(
@@ -629,7 +651,7 @@ class PodManager(LoggingMixin):
_preload_content=False,
**additional_kwargs,
)
- except BaseHTTPError:
+ except HTTPError:
self.log.exception("There was an error reading the kubernetes
API.")
raise
@@ -658,7 +680,7 @@ class PodManager(LoggingMixin):
return self._client.list_namespaced_event(
namespace=pod.metadata.namespace,
field_selector=f"involvedObject.name={pod.metadata.name}"
)
- except BaseHTTPError as e:
+ except HTTPError as e:
raise AirflowException(f"There was an error reading the kubernetes
API: {e}")
@tenacity.retry(stop=tenacity.stop_after_attempt(3),
wait=tenacity.wait_exponential(), reraise=True)
@@ -666,7 +688,7 @@ class PodManager(LoggingMixin):
"""Read POD information."""
try:
return self._client.read_namespaced_pod(pod.metadata.name,
pod.metadata.namespace)
- except BaseHTTPError as e:
+ except HTTPError as e:
raise AirflowException(f"There was an error reading the kubernetes
API: {e}")
def await_xcom_sidecar_container_start(self, pod: V1Pod) -> None: