dstandish commented on code in PR #28546:
URL: https://github.com/apache/airflow/pull/28546#discussion_r1055921963


##########
airflow/utils/log/file_task_handler.py:
##########
@@ -191,19 +191,29 @@ def _read(self, ti: TaskInstance, try_number: int, 
metadata: dict[str, Any] | No
                 log += f"*** {str(e)}\n"
                 return log, {"end_of_log": True}
         elif self._should_check_k8s(ti.queue):
-            pod_override = ti.executor_config.get("pod_override")
-            if pod_override and pod_override.metadata and 
pod_override.metadata.namespace:
-                namespace = pod_override.metadata.namespace
-            else:
-                namespace = conf.get("kubernetes_executor", "namespace")
             try:
                 from airflow.kubernetes.kube_client import get_kube_client
+                from airflow.kubernetes.pod_generator import PodGenerator
 
-                kube_client = get_kube_client()
+                client = get_kube_client()
 
                 log += f"*** Trying to get logs (last 100 lines) from worker 
pod {ti.hostname} ***\n\n"
-                res = kube_client.read_namespaced_pod_log(
-                    name=ti.hostname,
+                selector = PodGenerator.build_selector_for_k8s_executor_pod(
+                    dag_id=ti.dag_id,
+                    task_id=ti.task_id,
+                    try_number=ti.try_number,
+                    map_index=ti.map_index,
+                    run_id=ti.run_id,
+                )
+                namespace = self._get_pod_namespace(ti)
+                pod_list = client.list_namespaced_pod(
+                    namespace=namespace,
+                    label_selector=selector,
+                ).items
+                if not pod_list:
+                    raise RuntimeError("Cannot find pod for ti %s", ti)

Review Comment:
   it is already in a try / except so that catches and does just that



##########
airflow/utils/log/file_task_handler.py:
##########
@@ -191,19 +191,29 @@ def _read(self, ti: TaskInstance, try_number: int, 
metadata: dict[str, Any] | No
                 log += f"*** {str(e)}\n"
                 return log, {"end_of_log": True}
         elif self._should_check_k8s(ti.queue):
-            pod_override = ti.executor_config.get("pod_override")
-            if pod_override and pod_override.metadata and 
pod_override.metadata.namespace:
-                namespace = pod_override.metadata.namespace
-            else:
-                namespace = conf.get("kubernetes_executor", "namespace")
             try:
                 from airflow.kubernetes.kube_client import get_kube_client
+                from airflow.kubernetes.pod_generator import PodGenerator
 
-                kube_client = get_kube_client()
+                client = get_kube_client()
 
                 log += f"*** Trying to get logs (last 100 lines) from worker 
pod {ti.hostname} ***\n\n"
-                res = kube_client.read_namespaced_pod_log(
-                    name=ti.hostname,
+                selector = PodGenerator.build_selector_for_k8s_executor_pod(
+                    dag_id=ti.dag_id,
+                    task_id=ti.task_id,
+                    try_number=ti.try_number,
+                    map_index=ti.map_index,
+                    run_id=ti.run_id,
+                )
+                namespace = self._get_pod_namespace(ti)
+                pod_list = client.list_namespaced_pod(
+                    namespace=namespace,
+                    label_selector=selector,
+                ).items
+                if not pod_list:
+                    raise RuntimeError("Cannot find pod for ti %s", ti)

Review Comment:
   so i could duplicate that code or just raise :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to