jason810496 commented on code in PR #54115:
URL: https://github.com/apache/airflow/pull/54115#discussion_r2280751516


##########
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py:
##########
@@ -53,7 +53,7 @@
     get_logs_task_metadata,
 )
 from airflow.providers.standard.operators.empty import EmptyOperator
-from airflow.utils import timezone
+from airflow.utils import timezone  # type: ignore[attr-defined]

Review Comment:
   The `timezone` has been moved to new location.
   
   ```suggestion
   try:
       from airflow.sdk import timezone
   except ImportError:
       from airflow.utils import timezone  # type: ignore[attr-defined,no-redef]
   ```



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:
##########
@@ -254,28 +275,51 @@ def process_status(
                                 and 
container_status_state["waiting"]["message"] == "pull QPS exceeded"
                             ):
                                 continue
-                            self.log.error(
-                                "Event: %s has container %s with fatal reason 
%s",
+                            key = annotations_to_key(annotations=annotations)
+                            task_key_str = (
+                                f"{key.dag_id}.{key.task_id}.{key.try_number}" 
if key else "unknown"
+                            )
+                            self.log.warning(
+                                "Event: %s has container %s with fatal reason 
%s, task: %s",
                                 pod_name,
                                 container_status["name"],
                                 container_status_state["waiting"]["reason"],
+                                task_key_str,
                             )
                             self.watcher_queue.put(
-                                (pod_name, namespace, 
TaskInstanceState.FAILED, annotations, resource_version)
+                                (
+                                    pod_name,
+                                    namespace,
+                                    TaskInstanceState.FAILED,
+                                    annotations,
+                                    resource_version,
+                                    failure_details,
+                                )
                             )
                             break
                 else:
                     self.log.info("Event: %s Pending, annotations: %s", 
pod_name, annotations_string)
             else:
                 self.log.debug("Event: %s Pending, annotations: %s", pod_name, 
annotations_string)
         elif status == "Failed":
-            self.log.error("Event: %s Failed, annotations: %s", pod_name, 
annotations_string)
+            key = annotations_to_key(annotations=annotations)
+            task_key_str = f"{key.dag_id}.{key.task_id}.{key.try_number}" if 
key else "unknown"
+            self.log.warning(
+                "Event: %s Failed, task: %s, annotations: %s", pod_name, 
task_key_str, annotations_string
+            )

Review Comment:
   Would it be better to call `collect_pod_failure_details` only within the 
`status == "Failed"` block? For other cases, `failure_details` should remain 
`None`.



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:
##########
@@ -302,6 +353,116 @@ def process_status(
             )
 
 
+def collect_pod_failure_details(pod: k8s.V1Pod) -> FailureDetails | None:
+    """
+    Collect detailed failure information from a failed pod.
+
+    Analyzes both init containers and main containers to determine the root 
cause
+    of pod failure, prioritizing terminated containers with non-zero exit 
codes.
+
+    Args:
+        pod: The Kubernetes V1Pod object to analyze
+
+    Returns:
+        FailureDetails dict with failure information, or None if no failure 
details found
+    """
+    if not pod.status or pod.status.phase != "Failed":
+        return None
+
+    try:
+        # Basic pod-level information
+        failure_details: FailureDetails = {
+            "pod_status": getattr(pod.status, "phase", None),
+            "pod_reason": getattr(pod.status, "reason", None),
+            "pod_message": getattr(pod.status, "message", None),
+        }
+
+        # Check init containers first (they run before main containers)
+        container_failure = _analyze_init_containers(pod.status)
+
+        # If no init container failure found, check main containers
+        if not container_failure:
+            container_failure = _analyze_main_containers(pod.status)
+
+        # Merge container failure details
+        if container_failure:
+            failure_details.update(container_failure)
+
+        return failure_details
+
+    except Exception:
+        # Log unexpected exception for debugging
+        import logging
+
+        logging.getLogger(__name__).exception(
+            "Unexpected error while collecting pod failure details for pod %s",
+            getattr(pod.metadata, "name", "unknown"),
+        )

Review Comment:
   How about passing the logger from `KubernetesJobWatcher.log` property as 
parameter instead of creating a new one?



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:
##########
@@ -210,22 +211,42 @@ def process_status(
         if POD_REVOKED_KEY in pod.metadata.labels.keys():
             return
 
+        # Collect failure details for failed pods using the new function
+        failure_details = None
+        if status == "Failed":
+            try:
+                failure_details = collect_pod_failure_details(pod)
+            except Exception as e:
+                self.log.warning(
+                    "Failed to collect pod failure details for %s/%s: %s", 
namespace, pod_name, e
+                )
+
         annotations_string = annotations_for_logging_task_metadata(annotations)
         if event["type"] == "DELETED" and not pod.metadata.deletion_timestamp:
             # This will happen only when the task pods are adopted by another 
executor.
             # So, there is no change in the pod state.
             # However, need to free the executor slot from the current 
executor.
             self.log.info("Event: pod %s adopted, annotations: %s", pod_name, 
annotations_string)
-            self.watcher_queue.put((pod_name, namespace, ADOPTED, annotations, 
resource_version))
+            self.watcher_queue.put((pod_name, namespace, ADOPTED, annotations, 
resource_version, None))
         elif hasattr(pod.status, "reason") and pod.status.reason == 
"ProviderFailed":
             # Most likely this happens due to Kubernetes setup (virtual 
kubelet, virtual nodes, etc.)
-            self.log.error(
-                "Event: %s failed to start with reason ProviderFailed, 
annotations: %s",
+            key = annotations_to_key(annotations=annotations)
+            task_key_str = f"{key.dag_id}.{key.task_id}.{key.try_number}" if 
key else "unknown"
+            self.log.warning(
+                "Event: %s failed to start with reason ProviderFailed, task: 
%s, annotations: %s",
                 pod_name,
+                task_key_str,
                 annotations_string,
             )
             self.watcher_queue.put(
-                (pod_name, namespace, TaskInstanceState.FAILED, annotations, 
resource_version)
+                (
+                    pod_name,
+                    namespace,
+                    TaskInstanceState.FAILED,
+                    annotations,
+                    resource_version,
+                    failure_details,

Review Comment:
   ```suggestion
                       None,
   ```
   
   If I don't get it wrong, it seems that only `status == "Failed"` case will 
call `failure_details = collect_pod_failure_details(pod)`.



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:
##########
@@ -254,28 +275,51 @@ def process_status(
                                 and 
container_status_state["waiting"]["message"] == "pull QPS exceeded"
                             ):
                                 continue
-                            self.log.error(
-                                "Event: %s has container %s with fatal reason 
%s",
+                            key = annotations_to_key(annotations=annotations)
+                            task_key_str = (
+                                f"{key.dag_id}.{key.task_id}.{key.try_number}" 
if key else "unknown"
+                            )
+                            self.log.warning(
+                                "Event: %s has container %s with fatal reason 
%s, task: %s",
                                 pod_name,
                                 container_status["name"],
                                 container_status_state["waiting"]["reason"],
+                                task_key_str,
                             )
                             self.watcher_queue.put(
-                                (pod_name, namespace, 
TaskInstanceState.FAILED, annotations, resource_version)
+                                (
+                                    pod_name,
+                                    namespace,
+                                    TaskInstanceState.FAILED,
+                                    annotations,
+                                    resource_version,
+                                    failure_details,

Review Comment:
   ```suggestion
                                       None,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to