dstandish commented on code in PR #28981:
URL: https://github.com/apache/airflow/pull/28981#discussion_r1083084084


##########
airflow/providers/cncf/kubernetes/utils/pod_manager.py:
##########
@@ -264,15 +279,69 @@ def consume_logs(*, since_time: DateTime | None = None, 
follow: bool = True) ->
                 )
                 time.sleep(1)
 
-    def await_container_completion(self, pod: V1Pod, container_name: str) -> 
None:
+    def fetch_input_container_logs(
+        self, pod: V1Pod, log_containers: list[str] | bool, follow=False
+    ) -> list[PodLoggingStatus]:
+        """
+        Follow the logs of containers in the pod specified by input parameter 
and stream to airflow logging.
+        Returns when all the containers exit
+        """
+        pod_logging_statuses = []
+        all_container_names = self.get_container_names(pod)
+        if len(all_container_names) == 0:
+            self.log.error("Failed to retrieve container names from the pod, 
unable to collect logs")
+        else:
+            # if log_containers is list type, collect logs of the input 
container names
+            if type(log_containers) == list:
+                for container_name in log_containers:
+                    if container_name in all_container_names:
+                        status = self.fetch_container_logs(
+                            pod=pod, container_name=container_name, 
follow=follow
+                        )
+                        pod_logging_statuses.append(status)
+                    else:
+                        self.log.error(
+                            "container name '%s' specified in input parameter 
is not found in the pod",
+                            container_name,
+                        )
+            # if log_containers is bool value True, collect logs from all 
containers
+            # if log_containers is bool value False, collect logs from the 
base (first) container
+            elif type(log_containers) == bool:
+                if log_containers is True:
+                    for container_name in all_container_names:
+                        status = self.fetch_container_logs(
+                            pod=pod, container_name=container_name, 
follow=follow
+                        )
+                        pod_logging_statuses.append(status)
+                else:
+                    status = self.fetch_container_logs(
+                        pod=pod, container_name=all_container_names[0], 
follow=follow
+                    )
+                    pod_logging_statuses.append(status)

Review Comment:
   @mlnsharma what is the expected behavior here when there are multiple 
containers?  Will it follow just _one_ container until done and then jump to 
next container?  That would seem to be tad bit problematic because you won't 
get all container logs continuously.  What if you are following logs from a 
very uninteresting sidecar?
   
   If that's true, just to think of alternatives, we could imagine launching a 
"logs follower" (perhaps simply a callable) in a thread for each container, 
that would funnel messages back to main thread via a queue. The main thread 
would just consume the queue and emit messages from it as they arrive until all 
containers are done.
   
   And if only one container, no thread.  WDYT



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to