This is an automated email from the ASF dual-hosted git repository. eladkal pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 2b0bfea837 Add startup_check_interval_seconds to PodManager's await_pod_start (#34231) 2b0bfea837 is described below commit 2b0bfea8374ec0f0289763b064d6425df6d4270f Author: Sebastian Telsemeyer <101300634+stelsemeyer-...@users.noreply.github.com> AuthorDate: Thu Nov 2 04:51:18 2023 +1100 Add startup_check_interval_seconds to PodManager's await_pod_start (#34231) * add startup_check_interval_seconds * change default value in method * fix static checks, add missing param, fix typo * default is 1s * fix outdated docs * add test to check time.sleep is called with specific value * add more documentation * rephrase * add startup_check_interval_seconds * change default value in method * fix static checks, add missing param, fix typo * default is 1s * fix outdated docs * add test to check time.sleep is called with specific value * add more documentation * rephrase * add sleep in else clause * Update airflow/providers/cncf/kubernetes/triggers/pod.py --------- Co-authored-by: eladkal <45845474+elad...@users.noreply.github.com> Co-authored-by: Hussein Awala <huss...@awala.fr> --- airflow/providers/cncf/kubernetes/operators/pod.py | 10 +++++++++- airflow/providers/cncf/kubernetes/triggers/pod.py | 11 ++++++++--- .../providers/cncf/kubernetes/utils/pod_manager.py | 7 +++++-- .../operators/cloud/kubernetes_engine.rst | 8 ++++++++ .../cncf/kubernetes/utils/test_pod_manager.py | 22 ++++++++++++++++++++++ 5 files changed, 52 insertions(+), 6 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py b/airflow/providers/cncf/kubernetes/operators/pod.py index d85257f4aa..56abd98350 100644 --- a/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/airflow/providers/cncf/kubernetes/operators/pod.py @@ -181,6 +181,7 @@ class KubernetesPodOperator(BaseOperator): during the next try. If False, always create a new pod for each try. :param labels: labels to apply to the Pod. (templated) :param startup_timeout_seconds: timeout in seconds to startup the pod. + :param startup_check_interval_seconds: interval in seconds to check if the pod has already started :param get_logs: get the stdout of the base container as logs of the tasks. :param container_logs: list of containers whose logs will be published to stdout Takes a sequence of containers, a single container name or True. If True, @@ -293,6 +294,7 @@ class KubernetesPodOperator(BaseOperator): labels: dict | None = None, reattach_on_restart: bool = True, startup_timeout_seconds: int = 120, + startup_check_interval_seconds: int = 1, get_logs: bool = True, container_logs: Iterable[str] | str | Literal[True] = BASE_CONTAINER_NAME, image_pull_policy: str | None = None, @@ -357,6 +359,7 @@ class KubernetesPodOperator(BaseOperator): self.arguments = arguments or [] self.labels = labels or {} self.startup_timeout_seconds = startup_timeout_seconds + self.startup_check_interval_seconds = startup_check_interval_seconds self.env_vars = convert_env_vars(env_vars) if env_vars else [] if pod_runtime_info_envs: self.env_vars.extend([convert_pod_runtime_info_env(p) for p in pod_runtime_info_envs]) @@ -559,7 +562,11 @@ class KubernetesPodOperator(BaseOperator): def await_pod_start(self, pod: k8s.V1Pod): try: - self.pod_manager.await_pod_start(pod=pod, startup_timeout=self.startup_timeout_seconds) + self.pod_manager.await_pod_start( + pod=pod, + startup_timeout=self.startup_timeout_seconds, + startup_check_interval=self.startup_check_interval_seconds, + ) except PodLaunchFailedException: if self.log_events_on_failure: for event in self.pod_manager.read_pod_events(pod).items: @@ -654,6 +661,7 @@ class KubernetesPodOperator(BaseOperator): poll_interval=self.poll_interval, get_logs=self.get_logs, startup_timeout=self.startup_timeout_seconds, + startup_check_interval=self.startup_check_interval_seconds, base_container_name=self.base_container_name, on_finish_action=self.on_finish_action.value, ), diff --git a/airflow/providers/cncf/kubernetes/triggers/pod.py b/airflow/providers/cncf/kubernetes/triggers/pod.py index 9dc0896be7..5eda424276 100644 --- a/airflow/providers/cncf/kubernetes/triggers/pod.py +++ b/airflow/providers/cncf/kubernetes/triggers/pod.py @@ -83,6 +83,7 @@ class KubernetesPodTrigger(BaseTrigger): in_cluster: bool | None = None, get_logs: bool = True, startup_timeout: int = 120, + startup_check_interval: int = 1, on_finish_action: str = "delete_pod", should_delete_pod: bool | None = None, ): @@ -98,6 +99,7 @@ class KubernetesPodTrigger(BaseTrigger): self.in_cluster = in_cluster self.get_logs = get_logs self.startup_timeout = startup_timeout + self.startup_check_interval = startup_check_interval if should_delete_pod is not None: warnings.warn( @@ -183,9 +185,12 @@ class KubernetesPodTrigger(BaseTrigger): } ) return - - self.log.info("Sleeping for %s seconds.", self.poll_interval) - await asyncio.sleep(self.poll_interval) + else: + self.log.info("Sleeping for %s seconds.", self.startup_check_interval) + await asyncio.sleep(self.startup_check_interval) + else: + self.log.info("Sleeping for %s seconds.", self.poll_interval) + await asyncio.sleep(self.poll_interval) else: yield TriggerEvent( { diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index edbc910980..8e8fbe132d 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -334,13 +334,16 @@ class PodManager(LoggingMixin): """Launch the pod asynchronously.""" return self.run_pod_async(pod) - def await_pod_start(self, pod: V1Pod, startup_timeout: int = 120) -> None: + def await_pod_start( + self, pod: V1Pod, startup_timeout: int = 120, startup_check_interval: int = 1 + ) -> None: """ Wait for the pod to reach phase other than ``Pending``. :param pod: :param startup_timeout: Timeout (in seconds) for startup of the pod (if pod is pending for too long, fails task) + :param startup_check_interval: Interval (in seconds) between checks :return: """ curr_time = time.time() @@ -355,7 +358,7 @@ class PodManager(LoggingMixin): "Check the pod events in kubernetes to determine why." ) raise PodLaunchFailedException(msg) - time.sleep(1) + time.sleep(startup_check_interval) def follow_container_logs(self, pod: V1Pod, container_name: str) -> PodLoggingStatus: warnings.warn( diff --git a/docs/apache-airflow-providers-google/operators/cloud/kubernetes_engine.rst b/docs/apache-airflow-providers-google/operators/cloud/kubernetes_engine.rst index 8b51cdf06c..74c396c985 100644 --- a/docs/apache-airflow-providers-google/operators/cloud/kubernetes_engine.rst +++ b/docs/apache-airflow-providers-google/operators/cloud/kubernetes_engine.rst @@ -131,6 +131,14 @@ Private clusters have two unique endpoint values: ``privateEndpoint``, which is sets the external IP address as the endpoint by default. If you prefer to use the internal IP as the endpoint, you need to set ``use_internal_ip`` parameter to ``True``. +Using with Autopilot (serverless) cluster +''''''''''''''''''''''''''''''''''''''''' + +When running on serverless cluster like GKE Autopilot, the pod startup can sometimes take longer due to cold start. +During the pod startup, the status is checked in regular short intervals and warning messages are emitted if the pod +has not yet started. You can increase this interval length via the ``startup_check_interval_seconds`` parameter, with +recommendation of 60 seconds. + Use of XCom ''''''''''' diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py index d0e5408801..5e4721a448 100644 --- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py +++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py @@ -374,6 +374,28 @@ class TestPodManager: startup_timeout=0, ) + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.time.sleep") + def test_start_pod_startup_interval_seconds(self, mock_time_sleep): + pod_info_pending = mock.MagicMock(**{"status.phase": PodPhase.PENDING}) + pod_info_succeeded = mock.MagicMock(**{"status.phase": PodPhase.SUCCEEDED}) + + def pod_state_gen(): + yield pod_info_pending + yield pod_info_pending + while True: + yield pod_info_succeeded + + self.mock_kube_client.read_namespaced_pod.side_effect = pod_state_gen() + startup_check_interval = 10 # Any value is fine, as time.sleep is mocked to do nothing + mock_pod = MagicMock() + self.pod_manager.await_pod_start( + pod=mock_pod, + startup_timeout=60, # Never hit, any value is fine, as time.sleep is mocked to do nothing + startup_check_interval=startup_check_interval, + ) + mock_time_sleep.assert_called_with(startup_check_interval) + assert mock_time_sleep.call_count == 2 + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running") def test_container_is_running(self, container_is_running_mock): mock_pod = MagicMock()