This is an automated email from the ASF dual-hosted git repository. dstandish pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 68b3b7b468 Simplify KPO multi container log reconciliation logic (#35450) 68b3b7b468 is described below commit 68b3b7b4683c8e06098dfa8820be18f253d55f47 Author: Daniel Standish <15932138+dstand...@users.noreply.github.com> AuthorDate: Tue Nov 7 10:22:24 2023 -0800 Simplify KPO multi container log reconciliation logic (#35450) Easier to follow this way. --- airflow/providers/cncf/kubernetes/operators/pod.py | 2 +- .../providers/cncf/kubernetes/utils/pod_manager.py | 69 ++++++++++++++-------- .../cncf/kubernetes/operators/test_pod.py | 4 +- .../cncf/kubernetes/utils/test_pod_manager.py | 2 +- 4 files changed, 46 insertions(+), 31 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py b/airflow/providers/cncf/kubernetes/operators/pod.py index 5b58b1bedb..58e54a72d3 100644 --- a/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/airflow/providers/cncf/kubernetes/operators/pod.py @@ -611,7 +611,7 @@ class KubernetesPodOperator(BaseOperator): if self.get_logs: self.pod_manager.fetch_requested_container_logs( pod=self.pod, - container_logs=self.container_logs, + containers=self.container_logs, follow_logs=True, ) if not self.get_logs or ( diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 2d5abec0e9..75ff82fc2f 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -381,6 +381,8 @@ class PodManager(LoggingMixin): Between when the pod starts and logs being available, there might be a delay due to CSR not approved and signed yet. In such situation, ApiException is thrown. This is why we are retrying on this specific exception. + + :meta private: """ @tenacity.retry( @@ -476,53 +478,68 @@ class PodManager(LoggingMixin): else: # follow requested, but container is done break - def fetch_requested_container_logs( - self, pod: V1Pod, container_logs: Iterable[str] | str | Literal[True], follow_logs=False - ) -> None: - """ - Follow the logs of containers in the specified pod and publish it to airflow logging. - - Returns when all the containers exit. - """ - all_containers = self.get_container_names(pod) - if all_containers: - if isinstance(container_logs, str): + def _reconcile_requested_log_containers( + self, requested: Iterable[str] | str | bool, actual: list[str], pod_name + ) -> list[str]: + """Return actual containers based on requested.""" + containers_to_log = [] + if actual: + if isinstance(requested, str): # fetch logs only for requested container if only one container is provided - if container_logs in all_containers: - self.fetch_container_logs(pod=pod, container_name=container_logs, follow=follow_logs) + if requested in actual: + containers_to_log.append(requested) else: self.log.error( "container %s whose logs were requested not found in the pod %s", - container_logs, - pod.metadata.name, + requested, + pod_name, ) - elif isinstance(container_logs, bool): + elif isinstance(requested, bool): # if True is provided, get logs for all the containers - if container_logs is True: - for container_name in all_containers: - self.fetch_container_logs(pod=pod, container_name=container_name, follow=follow_logs) + if requested is True: + containers_to_log.extend(actual) else: self.log.error( "False is not a valid value for container_logs", ) else: # if a sequence of containers are provided, iterate for every container in the pod - if isinstance(container_logs, Iterable): - for container in container_logs: - if container in all_containers: - self.fetch_container_logs(pod=pod, container_name=container, follow=follow_logs) + if isinstance(requested, Iterable): + for container in requested: + if container in actual: + containers_to_log.append(container) else: self.log.error( "Container %s whose logs were requests not found in the pod %s", container, - pod.metadata.name, + pod_name, ) else: self.log.error( - "Invalid type %s specified for container names input parameter", type(container_logs) + "Invalid type %s specified for container names input parameter", type(requested) ) else: - self.log.error("Could not retrieve containers for the pod: %s", pod.metadata.name) + self.log.error("Could not retrieve containers for the pod: %s", pod_name) + return containers_to_log + + def fetch_requested_container_logs( + self, pod: V1Pod, containers: Iterable[str] | str | Literal[True], follow_logs=False + ) -> None: + """ + Follow the logs of containers in the specified pod and publish it to airflow logging. + + Returns when all the containers exit. + + :meta private: + """ + all_containers = self.get_container_names(pod) + containers_to_log = self._reconcile_requested_log_containers( + requested=containers, + actual=all_containers, + pod_name=pod.metadata.name, + ) + for c in containers_to_log: + self.fetch_container_logs(pod=pod, container_name=c, follow=follow_logs) def await_container_completion(self, pod: V1Pod, container_name: str) -> None: """ diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py b/tests/providers/cncf/kubernetes/operators/test_pod.py index 4ad631ac4e..ae49d7fe7d 100644 --- a/tests/providers/cncf/kubernetes/operators/test_pod.py +++ b/tests/providers/cncf/kubernetes/operators/test_pod.py @@ -1371,9 +1371,7 @@ class TestKubernetesPodOperator: pod = self.run_pod(k) # check that the base container is not included in the logs - mock_fetch_log.assert_called_once_with( - pod=pod, container_logs=["some_init_container"], follow_logs=True - ) + mock_fetch_log.assert_called_once_with(pod=pod, containers=["some_init_container"], follow_logs=True) # check that KPO waits for the base container to complete before proceeding to extract XCom mock_await_container_completion.assert_called_once_with(pod=pod, container_name="base") # check that we wait for the xcom sidecar to start before extracting XCom diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py index 11dce0e17b..e6f071559b 100644 --- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py +++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py @@ -421,7 +421,7 @@ class TestPodManager: ) self.pod_manager.fetch_requested_container_logs( - pod=mock_pod, container_logs=container_logs, follow_logs=follow + pod=mock_pod, containers=container_logs, follow_logs=follow ) calls = {tuple(x[1].values()) for x in container_is_running.call_args_list} pod = self.pod_manager.read_pod.return_value