This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2b0bfea837 Add startup_check_interval_seconds to PodManager's 
await_pod_start (#34231)
2b0bfea837 is described below

commit 2b0bfea8374ec0f0289763b064d6425df6d4270f
Author: Sebastian Telsemeyer 
<101300634+stelsemeyer-...@users.noreply.github.com>
AuthorDate: Thu Nov 2 04:51:18 2023 +1100

    Add startup_check_interval_seconds to PodManager's await_pod_start (#34231)
    
    * add startup_check_interval_seconds
    
    * change default value in method
    
    * fix static checks, add missing param, fix typo
    
    * default is 1s
    
    * fix outdated docs
    
    * add test to check time.sleep is called with specific value
    
    * add more documentation
    
    * rephrase
    
    * add startup_check_interval_seconds
    
    * change default value in method
    
    * fix static checks, add missing param, fix typo
    
    * default is 1s
    
    * fix outdated docs
    
    * add test to check time.sleep is called with specific value
    
    * add more documentation
    
    * rephrase
    
    * add sleep in else clause
    
    * Update airflow/providers/cncf/kubernetes/triggers/pod.py
    
    ---------
    
    Co-authored-by: eladkal <45845474+elad...@users.noreply.github.com>
    Co-authored-by: Hussein Awala <huss...@awala.fr>
---
 airflow/providers/cncf/kubernetes/operators/pod.py | 10 +++++++++-
 airflow/providers/cncf/kubernetes/triggers/pod.py  | 11 ++++++++---
 .../providers/cncf/kubernetes/utils/pod_manager.py |  7 +++++--
 .../operators/cloud/kubernetes_engine.rst          |  8 ++++++++
 .../cncf/kubernetes/utils/test_pod_manager.py      | 22 ++++++++++++++++++++++
 5 files changed, 52 insertions(+), 6 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py 
b/airflow/providers/cncf/kubernetes/operators/pod.py
index d85257f4aa..56abd98350 100644
--- a/airflow/providers/cncf/kubernetes/operators/pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -181,6 +181,7 @@ class KubernetesPodOperator(BaseOperator):
         during the next try. If False, always create a new pod for each try.
     :param labels: labels to apply to the Pod. (templated)
     :param startup_timeout_seconds: timeout in seconds to startup the pod.
+    :param startup_check_interval_seconds: interval in seconds to check if the 
pod has already started
     :param get_logs: get the stdout of the base container as logs of the tasks.
     :param container_logs: list of containers whose logs will be published to 
stdout
         Takes a sequence of containers, a single container name or True. If 
True,
@@ -293,6 +294,7 @@ class KubernetesPodOperator(BaseOperator):
         labels: dict | None = None,
         reattach_on_restart: bool = True,
         startup_timeout_seconds: int = 120,
+        startup_check_interval_seconds: int = 1,
         get_logs: bool = True,
         container_logs: Iterable[str] | str | Literal[True] = 
BASE_CONTAINER_NAME,
         image_pull_policy: str | None = None,
@@ -357,6 +359,7 @@ class KubernetesPodOperator(BaseOperator):
         self.arguments = arguments or []
         self.labels = labels or {}
         self.startup_timeout_seconds = startup_timeout_seconds
+        self.startup_check_interval_seconds = startup_check_interval_seconds
         self.env_vars = convert_env_vars(env_vars) if env_vars else []
         if pod_runtime_info_envs:
             self.env_vars.extend([convert_pod_runtime_info_env(p) for p in 
pod_runtime_info_envs])
@@ -559,7 +562,11 @@ class KubernetesPodOperator(BaseOperator):
 
     def await_pod_start(self, pod: k8s.V1Pod):
         try:
-            self.pod_manager.await_pod_start(pod=pod, 
startup_timeout=self.startup_timeout_seconds)
+            self.pod_manager.await_pod_start(
+                pod=pod,
+                startup_timeout=self.startup_timeout_seconds,
+                startup_check_interval=self.startup_check_interval_seconds,
+            )
         except PodLaunchFailedException:
             if self.log_events_on_failure:
                 for event in self.pod_manager.read_pod_events(pod).items:
@@ -654,6 +661,7 @@ class KubernetesPodOperator(BaseOperator):
                 poll_interval=self.poll_interval,
                 get_logs=self.get_logs,
                 startup_timeout=self.startup_timeout_seconds,
+                startup_check_interval=self.startup_check_interval_seconds,
                 base_container_name=self.base_container_name,
                 on_finish_action=self.on_finish_action.value,
             ),
diff --git a/airflow/providers/cncf/kubernetes/triggers/pod.py 
b/airflow/providers/cncf/kubernetes/triggers/pod.py
index 9dc0896be7..5eda424276 100644
--- a/airflow/providers/cncf/kubernetes/triggers/pod.py
+++ b/airflow/providers/cncf/kubernetes/triggers/pod.py
@@ -83,6 +83,7 @@ class KubernetesPodTrigger(BaseTrigger):
         in_cluster: bool | None = None,
         get_logs: bool = True,
         startup_timeout: int = 120,
+        startup_check_interval: int = 1,
         on_finish_action: str = "delete_pod",
         should_delete_pod: bool | None = None,
     ):
@@ -98,6 +99,7 @@ class KubernetesPodTrigger(BaseTrigger):
         self.in_cluster = in_cluster
         self.get_logs = get_logs
         self.startup_timeout = startup_timeout
+        self.startup_check_interval = startup_check_interval
 
         if should_delete_pod is not None:
             warnings.warn(
@@ -183,9 +185,12 @@ class KubernetesPodTrigger(BaseTrigger):
                                 }
                             )
                             return
-
-                    self.log.info("Sleeping for %s seconds.", 
self.poll_interval)
-                    await asyncio.sleep(self.poll_interval)
+                        else:
+                            self.log.info("Sleeping for %s seconds.", 
self.startup_check_interval)
+                            await asyncio.sleep(self.startup_check_interval)
+                    else:
+                        self.log.info("Sleeping for %s seconds.", 
self.poll_interval)
+                        await asyncio.sleep(self.poll_interval)
                 else:
                     yield TriggerEvent(
                         {
diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py 
b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index edbc910980..8e8fbe132d 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -334,13 +334,16 @@ class PodManager(LoggingMixin):
         """Launch the pod asynchronously."""
         return self.run_pod_async(pod)
 
-    def await_pod_start(self, pod: V1Pod, startup_timeout: int = 120) -> None:
+    def await_pod_start(
+        self, pod: V1Pod, startup_timeout: int = 120, startup_check_interval: 
int = 1
+    ) -> None:
         """
         Wait for the pod to reach phase other than ``Pending``.
 
         :param pod:
         :param startup_timeout: Timeout (in seconds) for startup of the pod
             (if pod is pending for too long, fails task)
+        :param startup_check_interval: Interval (in seconds) between checks
         :return:
         """
         curr_time = time.time()
@@ -355,7 +358,7 @@ class PodManager(LoggingMixin):
                     "Check the pod events in kubernetes to determine why."
                 )
                 raise PodLaunchFailedException(msg)
-            time.sleep(1)
+            time.sleep(startup_check_interval)
 
     def follow_container_logs(self, pod: V1Pod, container_name: str) -> 
PodLoggingStatus:
         warnings.warn(
diff --git 
a/docs/apache-airflow-providers-google/operators/cloud/kubernetes_engine.rst 
b/docs/apache-airflow-providers-google/operators/cloud/kubernetes_engine.rst
index 8b51cdf06c..74c396c985 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/kubernetes_engine.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/kubernetes_engine.rst
@@ -131,6 +131,14 @@ Private clusters have two unique endpoint values: 
``privateEndpoint``, which is
 sets the external IP address as the endpoint by default. If you prefer to use 
the internal IP as the
 endpoint, you need to set ``use_internal_ip`` parameter to ``True``.
 
+Using with Autopilot (serverless) cluster
+'''''''''''''''''''''''''''''''''''''''''
+
+When running on serverless cluster like GKE Autopilot, the pod startup can 
sometimes take longer due to cold start.
+During the pod startup, the status is checked in regular short intervals and 
warning messages are emitted if the pod
+has not yet started. You can increase this interval length via the 
``startup_check_interval_seconds`` parameter, with
+recommendation of 60 seconds.
+
 Use of XCom
 '''''''''''
 
diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py 
b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
index d0e5408801..5e4721a448 100644
--- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
+++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
@@ -374,6 +374,28 @@ class TestPodManager:
                 startup_timeout=0,
             )
 
+    
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.time.sleep")
+    def test_start_pod_startup_interval_seconds(self, mock_time_sleep):
+        pod_info_pending = mock.MagicMock(**{"status.phase": PodPhase.PENDING})
+        pod_info_succeeded = mock.MagicMock(**{"status.phase": 
PodPhase.SUCCEEDED})
+
+        def pod_state_gen():
+            yield pod_info_pending
+            yield pod_info_pending
+            while True:
+                yield pod_info_succeeded
+
+        self.mock_kube_client.read_namespaced_pod.side_effect = pod_state_gen()
+        startup_check_interval = 10  # Any value is fine, as time.sleep is 
mocked to do nothing
+        mock_pod = MagicMock()
+        self.pod_manager.await_pod_start(
+            pod=mock_pod,
+            startup_timeout=60,  # Never hit, any value is fine, as time.sleep 
is mocked to do nothing
+            startup_check_interval=startup_check_interval,
+        )
+        mock_time_sleep.assert_called_with(startup_check_interval)
+        assert mock_time_sleep.call_count == 2
+
     
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running")
     def test_container_is_running(self, container_is_running_mock):
         mock_pod = MagicMock()

Reply via email to