This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 68b3b7b468 Simplify KPO multi container log reconciliation logic 
(#35450)
68b3b7b468 is described below

commit 68b3b7b4683c8e06098dfa8820be18f253d55f47
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Tue Nov 7 10:22:24 2023 -0800

    Simplify KPO multi container log reconciliation logic (#35450)
    
    Easier to follow this way.
---
 airflow/providers/cncf/kubernetes/operators/pod.py |  2 +-
 .../providers/cncf/kubernetes/utils/pod_manager.py | 69 ++++++++++++++--------
 .../cncf/kubernetes/operators/test_pod.py          |  4 +-
 .../cncf/kubernetes/utils/test_pod_manager.py      |  2 +-
 4 files changed, 46 insertions(+), 31 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py 
b/airflow/providers/cncf/kubernetes/operators/pod.py
index 5b58b1bedb..58e54a72d3 100644
--- a/airflow/providers/cncf/kubernetes/operators/pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -611,7 +611,7 @@ class KubernetesPodOperator(BaseOperator):
             if self.get_logs:
                 self.pod_manager.fetch_requested_container_logs(
                     pod=self.pod,
-                    container_logs=self.container_logs,
+                    containers=self.container_logs,
                     follow_logs=True,
                 )
             if not self.get_logs or (
diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py 
b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index 2d5abec0e9..75ff82fc2f 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -381,6 +381,8 @@ class PodManager(LoggingMixin):
         Between when the pod starts and logs being available, there might be a 
delay due to CSR not approved
         and signed yet. In such situation, ApiException is thrown. This is why 
we are retrying on this
         specific exception.
+
+        :meta private:
         """
 
         @tenacity.retry(
@@ -476,53 +478,68 @@ class PodManager(LoggingMixin):
             else:  # follow requested, but container is done
                 break
 
-    def fetch_requested_container_logs(
-        self, pod: V1Pod, container_logs: Iterable[str] | str | Literal[True], 
follow_logs=False
-    ) -> None:
-        """
-        Follow the logs of containers in the specified pod and publish it to 
airflow logging.
-
-        Returns when all the containers exit.
-        """
-        all_containers = self.get_container_names(pod)
-        if all_containers:
-            if isinstance(container_logs, str):
+    def _reconcile_requested_log_containers(
+        self, requested: Iterable[str] | str | bool, actual: list[str], 
pod_name
+    ) -> list[str]:
+        """Return actual containers based on requested."""
+        containers_to_log = []
+        if actual:
+            if isinstance(requested, str):
                 # fetch logs only for requested container if only one 
container is provided
-                if container_logs in all_containers:
-                    self.fetch_container_logs(pod=pod, 
container_name=container_logs, follow=follow_logs)
+                if requested in actual:
+                    containers_to_log.append(requested)
                 else:
                     self.log.error(
                         "container %s whose logs were requested not found in 
the pod %s",
-                        container_logs,
-                        pod.metadata.name,
+                        requested,
+                        pod_name,
                     )
-            elif isinstance(container_logs, bool):
+            elif isinstance(requested, bool):
                 # if True is provided, get logs for all the containers
-                if container_logs is True:
-                    for container_name in all_containers:
-                        self.fetch_container_logs(pod=pod, 
container_name=container_name, follow=follow_logs)
+                if requested is True:
+                    containers_to_log.extend(actual)
                 else:
                     self.log.error(
                         "False is not a valid value for container_logs",
                     )
             else:
                 # if a sequence of containers are provided, iterate for every 
container in the pod
-                if isinstance(container_logs, Iterable):
-                    for container in container_logs:
-                        if container in all_containers:
-                            self.fetch_container_logs(pod=pod, 
container_name=container, follow=follow_logs)
+                if isinstance(requested, Iterable):
+                    for container in requested:
+                        if container in actual:
+                            containers_to_log.append(container)
                         else:
                             self.log.error(
                                 "Container %s whose logs were requests not 
found in the pod %s",
                                 container,
-                                pod.metadata.name,
+                                pod_name,
                             )
                 else:
                     self.log.error(
-                        "Invalid type %s specified for container names input 
parameter", type(container_logs)
+                        "Invalid type %s specified for container names input 
parameter", type(requested)
                     )
         else:
-            self.log.error("Could not retrieve containers for the pod: %s", 
pod.metadata.name)
+            self.log.error("Could not retrieve containers for the pod: %s", 
pod_name)
+        return containers_to_log
+
+    def fetch_requested_container_logs(
+        self, pod: V1Pod, containers: Iterable[str] | str | Literal[True], 
follow_logs=False
+    ) -> None:
+        """
+        Follow the logs of containers in the specified pod and publish it to 
airflow logging.
+
+        Returns when all the containers exit.
+
+        :meta private:
+        """
+        all_containers = self.get_container_names(pod)
+        containers_to_log = self._reconcile_requested_log_containers(
+            requested=containers,
+            actual=all_containers,
+            pod_name=pod.metadata.name,
+        )
+        for c in containers_to_log:
+            self.fetch_container_logs(pod=pod, container_name=c, 
follow=follow_logs)
 
     def await_container_completion(self, pod: V1Pod, container_name: str) -> 
None:
         """
diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py 
b/tests/providers/cncf/kubernetes/operators/test_pod.py
index 4ad631ac4e..ae49d7fe7d 100644
--- a/tests/providers/cncf/kubernetes/operators/test_pod.py
+++ b/tests/providers/cncf/kubernetes/operators/test_pod.py
@@ -1371,9 +1371,7 @@ class TestKubernetesPodOperator:
         pod = self.run_pod(k)
 
         # check that the base container is not included in the logs
-        mock_fetch_log.assert_called_once_with(
-            pod=pod, container_logs=["some_init_container"], follow_logs=True
-        )
+        mock_fetch_log.assert_called_once_with(pod=pod, 
containers=["some_init_container"], follow_logs=True)
         # check that KPO waits for the base container to complete before 
proceeding to extract XCom
         mock_await_container_completion.assert_called_once_with(pod=pod, 
container_name="base")
         # check that we wait for the xcom sidecar to start before extracting 
XCom
diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py 
b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
index 11dce0e17b..e6f071559b 100644
--- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
+++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
@@ -421,7 +421,7 @@ class TestPodManager:
         )
 
         self.pod_manager.fetch_requested_container_logs(
-            pod=mock_pod, container_logs=container_logs, follow_logs=follow
+            pod=mock_pod, containers=container_logs, follow_logs=follow
         )
         calls = {tuple(x[1].values()) for x in 
container_is_running.call_args_list}
         pod = self.pod_manager.read_pod.return_value

Reply via email to