hterik commented on code in PR #54115:
URL: https://github.com/apache/airflow/pull/54115#discussion_r2256641887


##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:
##########
@@ -210,22 +210,115 @@ def process_status(
         if POD_REVOKED_KEY in pod.metadata.labels.keys():
             return
 
+        # Collect failure details for failed pods
+        failure_details = None
+        if status == "Failed":
+            try:
+                pod_status = getattr(pod.status, "phase", None)
+                pod_reason = getattr(pod.status, "reason", None)
+                pod_message = getattr(pod.status, "message", None)
+
+                # Container status analysis - check both init and main 
containers
+                container_info = {}
+
+                # Check init containers first (they run before main containers)
+                init_container_statuses = getattr(pod.status, 
"init_container_statuses", None)
+                if init_container_statuses:
+                    for cs in init_container_statuses:
+                        state_obj = cs.state
+                        if state_obj.terminated:
+                            terminated_reason = getattr(state_obj.terminated, 
"reason", None)
+                            exit_code = getattr(state_obj.terminated, 
"exit_code", 0)
+                            # Only treat as failure if exit code != 0 AND 
reason is not "Completed"
+                            if exit_code != 0 and terminated_reason != 
"Completed":
+                                # Init container failed
+                                container_info = {
+                                    "state": "terminated",
+                                    "reason": terminated_reason,
+                                    "message": getattr(state_obj.terminated, 
"message", None),
+                                    "exit_code": exit_code,
+                                    "container_type": "init",
+                                    "container_name": getattr(cs, "name", 
"unknown"),
+                                }
+                                break
+                        elif state_obj.waiting:
+                            container_info = {
+                                "state": "waiting",
+                                "reason": getattr(state_obj.waiting, "reason", 
None),
+                                "message": getattr(state_obj.waiting, 
"message", None),
+                                "container_type": "init",
+                                "container_name": getattr(cs, "name", 
"unknown"),
+                            }
+                            # Continue to look for terminated state in other 
init containers
+
+                # If no init container failure found, check main containers
+                if not container_info:
+                    container_statuses = getattr(pod.status, 
"container_statuses", None)
+                    if container_statuses:
+                        for cs in container_statuses:
+                            state_obj = cs.state
+                            # Prioritize terminated state for final failure 
details
+                            if state_obj.terminated:
+                                terminated_reason = 
getattr(state_obj.terminated, "reason", None)
+                                exit_code = getattr(state_obj.terminated, 
"exit_code", 0)
+                                # Only treat as failure if exit code != 0 AND 
reason is not "Completed"
+                                if exit_code != 0 and terminated_reason != 
"Completed":
+                                    container_info = {
+                                        "state": "terminated",
+                                        "reason": terminated_reason,
+                                        "message": 
getattr(state_obj.terminated, "message", None),
+                                        "exit_code": exit_code,
+                                        "container_type": "main",
+                                        "container_name": getattr(cs, "name", 
"unknown"),
+                                    }
+                                    break
+                            elif state_obj.waiting:
+                                container_info = {
+                                    "state": "waiting",
+                                    "reason": getattr(state_obj.waiting, 
"reason", None),
+                                    "message": getattr(state_obj.waiting, 
"message", None),
+                                    "container_type": "main",
+                                    "container_name": getattr(cs, "name", 
"unknown"),
+                                }
+                                # Continue to look for terminated state in 
other containers
+
+                failure_details = {
+                    "pod_status": pod_status,
+                    "pod_reason": pod_reason,
+                    "pod_message": pod_message,
+                    **container_info,
+                }
+            except Exception as e:
+                self.log.warning(
+                    "Failed to collect pod failure details for %s/%s: %s", 
namespace, pod_name, e
+                )

Review Comment:
   Suggest refactoring this into a function: `def collect_failures(pod: V1Pod) 
-> FailureDetails`. With `FailureDetails` being `TypedDict` as suggested above.



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py:
##########
@@ -31,11 +31,15 @@
     # TaskInstance key, command, configuration, pod_template_file
     KubernetesJobType = tuple[TaskInstanceKey, CommandType, Any, str | None]
 
-    # key, pod state, pod_name, namespace, resource_version
-    KubernetesResultsType = tuple[TaskInstanceKey, TaskInstanceState | str | 
None, str, str, str]
-
-    # pod_name, namespace, pod state, annotations, resource_version
-    KubernetesWatchType = tuple[str, str, TaskInstanceState | str | None, 
dict[str, str], str]
+    # key, pod state, pod_name, namespace, resource_version, failure_details
+    KubernetesResultsType = tuple[
+        TaskInstanceKey, TaskInstanceState | str | None, str, str, str, 
dict[str, Any] | None
+    ]
+
+    # pod_name, namespace, pod state, annotations, resource_version, 
failure_details
+    KubernetesWatchType = tuple[
+        str, str, TaskInstanceState | str | None, dict[str, str], str, 
dict[str, Any] | None
+    ]

Review Comment:
   Prefer `typing.NamedTuple`. Tuples with this many entries are very difficult 
to reason about otherwise.



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py:
##########
@@ -31,11 +31,15 @@
     # TaskInstance key, command, configuration, pod_template_file
     KubernetesJobType = tuple[TaskInstanceKey, CommandType, Any, str | None]
 
-    # key, pod state, pod_name, namespace, resource_version
-    KubernetesResultsType = tuple[TaskInstanceKey, TaskInstanceState | str | 
None, str, str, str]
-
-    # pod_name, namespace, pod state, annotations, resource_version
-    KubernetesWatchType = tuple[str, str, TaskInstanceState | str | None, 
dict[str, str], str]
+    # key, pod state, pod_name, namespace, resource_version, failure_details
+    KubernetesResultsType = tuple[
+        TaskInstanceKey, TaskInstanceState | str | None, str, str, str, 
dict[str, Any] | None

Review Comment:
   Prefer a `TypedDict` or `dataclass` for the Failure details. Untyped `dicts` 
are very easy to get wrong due to typos etc.
   
   Maybe such FailureDetails class even deserves it's own custom `__str__` 
implementation, to avoid all the `.get` in `_change_state`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to