HsiuChuanHsu commented on code in PR #54115:
URL: https://github.com/apache/airflow/pull/54115#discussion_r2265524340


##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py:
##########
@@ -31,11 +31,15 @@
     # TaskInstance key, command, configuration, pod_template_file
     KubernetesJobType = tuple[TaskInstanceKey, CommandType, Any, str | None]
 
-    # key, pod state, pod_name, namespace, resource_version
-    KubernetesResultsType = tuple[TaskInstanceKey, TaskInstanceState | str | 
None, str, str, str]
-
-    # pod_name, namespace, pod state, annotations, resource_version
-    KubernetesWatchType = tuple[str, str, TaskInstanceState | str | None, 
dict[str, str], str]
+    # key, pod state, pod_name, namespace, resource_version, failure_details
+    KubernetesResultsType = tuple[
+        TaskInstanceKey, TaskInstanceState | str | None, str, str, str, 
dict[str, Any] | None
+    ]
+
+    # pod_name, namespace, pod state, annotations, resource_version, 
failure_details
+    KubernetesWatchType = tuple[
+        str, str, TaskInstanceState | str | None, dict[str, str], str, 
dict[str, Any] | None
+    ]

Review Comment:
   Thank you for the suggestion! 
   I agree that using `typing.NamedTuple` would improve readability for tuples 
with many entries. However, I'd prefer to address this in a separate PR. This 
refactoring would likely require updates to many other functions that consume 
these tuples. 



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:
##########
@@ -210,22 +210,115 @@ def process_status(
         if POD_REVOKED_KEY in pod.metadata.labels.keys():
             return
 
+        # Collect failure details for failed pods
+        failure_details = None
+        if status == "Failed":
+            try:
+                pod_status = getattr(pod.status, "phase", None)
+                pod_reason = getattr(pod.status, "reason", None)
+                pod_message = getattr(pod.status, "message", None)
+
+                # Container status analysis - check both init and main 
containers
+                container_info = {}
+
+                # Check init containers first (they run before main containers)
+                init_container_statuses = getattr(pod.status, 
"init_container_statuses", None)
+                if init_container_statuses:
+                    for cs in init_container_statuses:
+                        state_obj = cs.state
+                        if state_obj.terminated:
+                            terminated_reason = getattr(state_obj.terminated, 
"reason", None)
+                            exit_code = getattr(state_obj.terminated, 
"exit_code", 0)
+                            # Only treat as failure if exit code != 0 AND 
reason is not "Completed"
+                            if exit_code != 0 and terminated_reason != 
"Completed":
+                                # Init container failed
+                                container_info = {
+                                    "state": "terminated",
+                                    "reason": terminated_reason,
+                                    "message": getattr(state_obj.terminated, 
"message", None),
+                                    "exit_code": exit_code,
+                                    "container_type": "init",
+                                    "container_name": getattr(cs, "name", 
"unknown"),
+                                }
+                                break
+                        elif state_obj.waiting:
+                            container_info = {
+                                "state": "waiting",
+                                "reason": getattr(state_obj.waiting, "reason", 
None),
+                                "message": getattr(state_obj.waiting, 
"message", None),
+                                "container_type": "init",
+                                "container_name": getattr(cs, "name", 
"unknown"),
+                            }
+                            # Continue to look for terminated state in other 
init containers
+
+                # If no init container failure found, check main containers
+                if not container_info:
+                    container_statuses = getattr(pod.status, 
"container_statuses", None)
+                    if container_statuses:
+                        for cs in container_statuses:
+                            state_obj = cs.state
+                            # Prioritize terminated state for final failure 
details
+                            if state_obj.terminated:
+                                terminated_reason = 
getattr(state_obj.terminated, "reason", None)
+                                exit_code = getattr(state_obj.terminated, 
"exit_code", 0)
+                                # Only treat as failure if exit code != 0 AND 
reason is not "Completed"
+                                if exit_code != 0 and terminated_reason != 
"Completed":
+                                    container_info = {
+                                        "state": "terminated",
+                                        "reason": terminated_reason,
+                                        "message": 
getattr(state_obj.terminated, "message", None),
+                                        "exit_code": exit_code,
+                                        "container_type": "main",
+                                        "container_name": getattr(cs, "name", 
"unknown"),
+                                    }
+                                    break
+                            elif state_obj.waiting:
+                                container_info = {
+                                    "state": "waiting",
+                                    "reason": getattr(state_obj.waiting, 
"reason", None),
+                                    "message": getattr(state_obj.waiting, 
"message", None),
+                                    "container_type": "main",
+                                    "container_name": getattr(cs, "name", 
"unknown"),
+                                }
+                                # Continue to look for terminated state in 
other containers
+
+                failure_details = {
+                    "pod_status": pod_status,
+                    "pod_reason": pod_reason,
+                    "pod_message": pod_message,
+                    **container_info,
+                }
+            except Exception as e:
+                self.log.warning(
+                    "Failed to collect pod failure details for %s/%s: %s", 
namespace, pod_name, e
+                )

Review Comment:
   Great idea! Already done - I've extracted this into a 
`collect_pod_failure_details` function that returns FailureDetails (TypedDict). 
Much cleaner now, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to