jason810496 commented on code in PR #54115:
URL: https://github.com/apache/airflow/pull/54115#discussion_r2280751516
##########
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py:
##########
@@ -53,7 +53,7 @@
get_logs_task_metadata,
)
from airflow.providers.standard.operators.empty import EmptyOperator
-from airflow.utils import timezone
+from airflow.utils import timezone # type: ignore[attr-defined]
Review Comment:
The `timezone` has been moved to new location.
```suggestion
try:
from airflow.sdk import timezone
except ImportError:
from airflow.utils import timezone # type: ignore[attr-defined,no-redef]
```
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:
##########
@@ -254,28 +275,51 @@ def process_status(
and
container_status_state["waiting"]["message"] == "pull QPS exceeded"
):
continue
- self.log.error(
- "Event: %s has container %s with fatal reason
%s",
+ key = annotations_to_key(annotations=annotations)
+ task_key_str = (
+ f"{key.dag_id}.{key.task_id}.{key.try_number}"
if key else "unknown"
+ )
+ self.log.warning(
+ "Event: %s has container %s with fatal reason
%s, task: %s",
pod_name,
container_status["name"],
container_status_state["waiting"]["reason"],
+ task_key_str,
)
self.watcher_queue.put(
- (pod_name, namespace,
TaskInstanceState.FAILED, annotations, resource_version)
+ (
+ pod_name,
+ namespace,
+ TaskInstanceState.FAILED,
+ annotations,
+ resource_version,
+ failure_details,
+ )
)
break
else:
self.log.info("Event: %s Pending, annotations: %s",
pod_name, annotations_string)
else:
self.log.debug("Event: %s Pending, annotations: %s", pod_name,
annotations_string)
elif status == "Failed":
- self.log.error("Event: %s Failed, annotations: %s", pod_name,
annotations_string)
+ key = annotations_to_key(annotations=annotations)
+ task_key_str = f"{key.dag_id}.{key.task_id}.{key.try_number}" if
key else "unknown"
+ self.log.warning(
+ "Event: %s Failed, task: %s, annotations: %s", pod_name,
task_key_str, annotations_string
+ )
Review Comment:
Would it be better to call `collect_pod_failure_details` only within the
`status == "Failed"` block? For other cases, `failure_details` should remain
`None`.
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:
##########
@@ -302,6 +353,116 @@ def process_status(
)
+def collect_pod_failure_details(pod: k8s.V1Pod) -> FailureDetails | None:
+ """
+ Collect detailed failure information from a failed pod.
+
+ Analyzes both init containers and main containers to determine the root
cause
+ of pod failure, prioritizing terminated containers with non-zero exit
codes.
+
+ Args:
+ pod: The Kubernetes V1Pod object to analyze
+
+ Returns:
+ FailureDetails dict with failure information, or None if no failure
details found
+ """
+ if not pod.status or pod.status.phase != "Failed":
+ return None
+
+ try:
+ # Basic pod-level information
+ failure_details: FailureDetails = {
+ "pod_status": getattr(pod.status, "phase", None),
+ "pod_reason": getattr(pod.status, "reason", None),
+ "pod_message": getattr(pod.status, "message", None),
+ }
+
+ # Check init containers first (they run before main containers)
+ container_failure = _analyze_init_containers(pod.status)
+
+ # If no init container failure found, check main containers
+ if not container_failure:
+ container_failure = _analyze_main_containers(pod.status)
+
+ # Merge container failure details
+ if container_failure:
+ failure_details.update(container_failure)
+
+ return failure_details
+
+ except Exception:
+ # Log unexpected exception for debugging
+ import logging
+
+ logging.getLogger(__name__).exception(
+ "Unexpected error while collecting pod failure details for pod %s",
+ getattr(pod.metadata, "name", "unknown"),
+ )
Review Comment:
How about passing the logger from `KubernetesJobWatcher.log` property as
parameter instead of creating a new one?
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:
##########
@@ -210,22 +211,42 @@ def process_status(
if POD_REVOKED_KEY in pod.metadata.labels.keys():
return
+ # Collect failure details for failed pods using the new function
+ failure_details = None
+ if status == "Failed":
+ try:
+ failure_details = collect_pod_failure_details(pod)
+ except Exception as e:
+ self.log.warning(
+ "Failed to collect pod failure details for %s/%s: %s",
namespace, pod_name, e
+ )
+
annotations_string = annotations_for_logging_task_metadata(annotations)
if event["type"] == "DELETED" and not pod.metadata.deletion_timestamp:
# This will happen only when the task pods are adopted by another
executor.
# So, there is no change in the pod state.
# However, need to free the executor slot from the current
executor.
self.log.info("Event: pod %s adopted, annotations: %s", pod_name,
annotations_string)
- self.watcher_queue.put((pod_name, namespace, ADOPTED, annotations,
resource_version))
+ self.watcher_queue.put((pod_name, namespace, ADOPTED, annotations,
resource_version, None))
elif hasattr(pod.status, "reason") and pod.status.reason ==
"ProviderFailed":
# Most likely this happens due to Kubernetes setup (virtual
kubelet, virtual nodes, etc.)
- self.log.error(
- "Event: %s failed to start with reason ProviderFailed,
annotations: %s",
+ key = annotations_to_key(annotations=annotations)
+ task_key_str = f"{key.dag_id}.{key.task_id}.{key.try_number}" if
key else "unknown"
+ self.log.warning(
+ "Event: %s failed to start with reason ProviderFailed, task:
%s, annotations: %s",
pod_name,
+ task_key_str,
annotations_string,
)
self.watcher_queue.put(
- (pod_name, namespace, TaskInstanceState.FAILED, annotations,
resource_version)
+ (
+ pod_name,
+ namespace,
+ TaskInstanceState.FAILED,
+ annotations,
+ resource_version,
+ failure_details,
Review Comment:
```suggestion
None,
```
If I don't get it wrong, it seems that only `status == "Failed"` case will
call `failure_details = collect_pod_failure_details(pod)`.
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:
##########
@@ -254,28 +275,51 @@ def process_status(
and
container_status_state["waiting"]["message"] == "pull QPS exceeded"
):
continue
- self.log.error(
- "Event: %s has container %s with fatal reason
%s",
+ key = annotations_to_key(annotations=annotations)
+ task_key_str = (
+ f"{key.dag_id}.{key.task_id}.{key.try_number}"
if key else "unknown"
+ )
+ self.log.warning(
+ "Event: %s has container %s with fatal reason
%s, task: %s",
pod_name,
container_status["name"],
container_status_state["waiting"]["reason"],
+ task_key_str,
)
self.watcher_queue.put(
- (pod_name, namespace,
TaskInstanceState.FAILED, annotations, resource_version)
+ (
+ pod_name,
+ namespace,
+ TaskInstanceState.FAILED,
+ annotations,
+ resource_version,
+ failure_details,
Review Comment:
```suggestion
None,
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]