hterik commented on code in PR #54115:
URL: https://github.com/apache/airflow/pull/54115#discussion_r2266327211


##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py:
##########
@@ -31,11 +31,15 @@
     # TaskInstance key, command, configuration, pod_template_file
     KubernetesJobType = tuple[TaskInstanceKey, CommandType, Any, str | None]
 
-    # key, pod state, pod_name, namespace, resource_version
-    KubernetesResultsType = tuple[TaskInstanceKey, TaskInstanceState | str | 
None, str, str, str]
-
-    # pod_name, namespace, pod state, annotations, resource_version
-    KubernetesWatchType = tuple[str, str, TaskInstanceState | str | None, 
dict[str, str], str]
+    # key, pod state, pod_name, namespace, resource_version, failure_details
+    KubernetesResultsType = tuple[
+        TaskInstanceKey, TaskInstanceState | str | None, str, str, str, 
dict[str, Any] | None
+    ]
+
+    # pod_name, namespace, pod state, annotations, resource_version, 
failure_details
+    KubernetesWatchType = tuple[
+        str, str, TaskInstanceState | str | None, dict[str, str], str, 
dict[str, Any] | None
+    ]

Review Comment:
   Sounds good :+1: 



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:
##########
@@ -302,6 +353,139 @@ def process_status(
             )
 
 
+def collect_pod_failure_details(pod: k8s.V1Pod) -> FailureDetails | None:
+    """
+    Collect detailed failure information from a failed pod.
+
+    Analyzes both init containers and main containers to determine the root 
cause
+    of pod failure, prioritizing terminated containers with non-zero exit 
codes.
+
+    Args:
+        pod: The Kubernetes V1Pod object to analyze
+
+    Returns:
+        FailureDetails dict with failure information, or None if no failure 
details found
+    """
+    if not pod.status or pod.status.phase != "Failed":
+        return None
+
+    try:
+        # Basic pod-level information
+        failure_details: FailureDetails = {
+            "pod_status": getattr(pod.status, "phase", None),
+            "pod_reason": getattr(pod.status, "reason", None),
+            "pod_message": getattr(pod.status, "message", None),
+        }
+
+        # Check init containers first (they run before main containers)
+        container_failure = _analyze_init_containers(pod.status)
+
+        # If no init container failure found, check main containers
+        if not container_failure:
+            container_failure = _analyze_main_containers(pod.status)
+
+        # Merge container failure details
+        if container_failure:
+            failure_details.update(container_failure)
+
+        return failure_details
+
+    except Exception:
+        # Return basic pod info if container analysis fails
+        return {
+            "pod_status": getattr(pod.status, "phase", None),
+            "pod_reason": getattr(pod.status, "reason", None),
+            "pod_message": getattr(pod.status, "message", None),
+        }
+
+
+def _analyze_init_containers(pod_status: k8s.V1PodStatus) -> FailureDetails | 
None:
+    """Analyze init container statuses for failure details."""
+    init_container_statuses = getattr(pod_status, "init_container_statuses", 
None)
+    if not init_container_statuses:
+        return None
+
+    waiting_info: FailureDetails | None = None
+
+    for cs in init_container_statuses:
+        state_obj = cs.state
+        if state_obj.terminated:
+            terminated_reason = getattr(state_obj.terminated, "reason", None)
+            exit_code = getattr(state_obj.terminated, "exit_code", 0)
+
+            # Only treat as failure if exit code != 0 AND reason is not 
"Completed"
+            if exit_code != 0 and terminated_reason != "Completed":
+                return cast(
+                    "FailureDetails",
+                    {
+                        "container_state": "terminated",
+                        "container_reason": terminated_reason,
+                        "container_message": getattr(state_obj.terminated, 
"message", None),
+                        "exit_code": exit_code,
+                        "container_type": "init",
+                        "container_name": getattr(cs, "name", "unknown"),
+                    },
+                )
+        elif state_obj.waiting:
+            # Record waiting state but continue looking for terminated 
containers
+            waiting_info = cast(
+                "FailureDetails",
+                {
+                    "container_state": "waiting",
+                    "container_reason": getattr(state_obj.waiting, "reason", 
None),
+                    "container_message": getattr(state_obj.waiting, "message", 
None),
+                    "container_type": "init",
+                    "container_name": getattr(cs, "name", "unknown"),
+                },
+            )
+
+    # If we only found waiting containers, return the last one
+    return waiting_info
+
+
+def _analyze_main_containers(pod_status: k8s.V1PodStatus) -> FailureDetails | 
None:
+    """Analyze main container statuses for failure details."""
+    container_statuses = getattr(pod_status, "container_statuses", None)
+    if not container_statuses:
+        return None
+
+    waiting_info: FailureDetails | None = None
+
+    for cs in container_statuses:
+        state_obj = cs.state
+        if state_obj.terminated:
+            terminated_reason = getattr(state_obj.terminated, "reason", None)
+            exit_code = getattr(state_obj.terminated, "exit_code", 0)
+
+            # Only treat as failure if exit code != 0 AND reason is not 
"Completed"
+            if exit_code != 0 and terminated_reason != "Completed":
+                return cast(
+                    "FailureDetails",
+                    {
+                        "container_state": "terminated",
+                        "container_reason": terminated_reason,
+                        "container_message": getattr(state_obj.terminated, 
"message", None),
+                        "exit_code": exit_code,
+                        "container_type": "main",
+                        "container_name": getattr(cs, "name", "unknown"),
+                    },
+                )
+        elif state_obj.waiting:
+            # Record waiting state but continue looking for terminated 
containers
+            waiting_info = cast(
+                "FailureDetails",
+                {
+                    "container_state": "waiting",
+                    "container_reason": getattr(state_obj.waiting, "reason", 
None),
+                    "container_message": getattr(state_obj.waiting, "message", 
None),
+                    "container_type": "main",
+                    "container_name": getattr(cs, "name", "unknown"),
+                },
+            )

Review Comment:
   I believe this part is exactly the same as in `_analyze_init_containers`. 
Does it make sense to reuse? 



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:
##########
@@ -302,6 +353,139 @@ def process_status(
             )
 
 
+def collect_pod_failure_details(pod: k8s.V1Pod) -> FailureDetails | None:
+    """
+    Collect detailed failure information from a failed pod.
+
+    Analyzes both init containers and main containers to determine the root 
cause
+    of pod failure, prioritizing terminated containers with non-zero exit 
codes.
+
+    Args:
+        pod: The Kubernetes V1Pod object to analyze
+
+    Returns:
+        FailureDetails dict with failure information, or None if no failure 
details found
+    """
+    if not pod.status or pod.status.phase != "Failed":
+        return None
+
+    try:
+        # Basic pod-level information
+        failure_details: FailureDetails = {
+            "pod_status": getattr(pod.status, "phase", None),
+            "pod_reason": getattr(pod.status, "reason", None),
+            "pod_message": getattr(pod.status, "message", None),
+        }
+
+        # Check init containers first (they run before main containers)
+        container_failure = _analyze_init_containers(pod.status)
+
+        # If no init container failure found, check main containers
+        if not container_failure:
+            container_failure = _analyze_main_containers(pod.status)
+
+        # Merge container failure details
+        if container_failure:
+            failure_details.update(container_failure)
+
+        return failure_details
+
+    except Exception:
+        # Return basic pod info if container analysis fails
+        return {

Review Comment:
   Please log this exception before returning, using `logger.exception`. It's 
an unexpected scenario and if it happens, then administrators should have the 
info required to debug it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to