hterik commented on code in PR #54115:
URL: https://github.com/apache/airflow/pull/54115#discussion_r2253234596


##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -417,6 +417,46 @@ def _change_state(
         if TYPE_CHECKING:
             assert self.kube_scheduler
 
+        if state == TaskInstanceState.FAILED:
+            try:
+                if self.kube_client:

Review Comment:
   In what scenario would `self.kube_client` not be set here?



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -417,6 +417,46 @@ def _change_state(
         if TYPE_CHECKING:
             assert self.kube_scheduler
 
+        if state == TaskInstanceState.FAILED:
+            try:
+                if self.kube_client:
+                    pod = self.kube_client.read_namespaced_pod(name=pod_name, 
namespace=namespace)
+                    pod_status = getattr(pod.status, "phase", None)
+                    pod_reason = getattr(pod.status, "reason", None)
+                    pod_message = getattr(pod.status, "message", None)
+
+                    # If containerStatuses has detailed reasons, print them as 
well.
+                    container_statuses = getattr(pod.status, 
"container_statuses", None)
+                    container_state = None
+                    container_reason = None
+                    container_message = None
+                    if container_statuses:
+                        for cs in container_statuses:
+                            state_obj = cs.state
+                            if state_obj.terminated:
+                                container_state = "terminated"
+                                container_reason = 
getattr(state_obj.terminated, "reason", None)
+                                container_message = 
getattr(state_obj.terminated, "message", None)
+                                break
+                            if state_obj.waiting:
+                                container_state = "waiting"
+                                container_reason = getattr(state_obj.waiting, 
"reason", None)
+                                container_message = getattr(state_obj.waiting, 
"message", None)
+                                break
+                    self.log.error(

Review Comment:
   Should this really be an error from scheduler point of view? If a DAG 
failed, the scheduler has still behaved as expected. Maybe info or warning?
   
   (I would also love to see these errors surfaced to the log in the UI but as 
far as i know the architecture doesn't support that yet, it's only scheduler 
administrators that can access them now. This is still a good improvement as is 
:) )



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -417,6 +417,46 @@ def _change_state(
         if TYPE_CHECKING:
             assert self.kube_scheduler
 
+        if state == TaskInstanceState.FAILED:
+            try:
+                if self.kube_client:
+                    pod = self.kube_client.read_namespaced_pod(name=pod_name, 
namespace=namespace)

Review Comment:
   Would it be better to do all this inside class 
`KubernetesJobWatcher.process_status`? That's where the pod status is being 
read today, to me it feels like that would be more consistent, so that 
`change_state` and `process_status` does not have different views of what is a 
failure and not, and it reduces the number of API calls to k8s. 
   
   I'm not 100% sure but i also believe that `process_status` is being run on a 
separate thread, communicating to the other thread through the `result_queue`, 
if k8s API calls are being done on the result-thread it may introduce 
unexpected latencies there.
   
   See for example https://github.com/apache/airflow/pull/39924 where similar 
debug improvements were done in `process_status`.
   
   Would be good if someone more familiar with the architecture of this class 
could chime in here.
   



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -417,6 +417,46 @@ def _change_state(
         if TYPE_CHECKING:
             assert self.kube_scheduler
 
+        if state == TaskInstanceState.FAILED:
+            try:
+                if self.kube_client:
+                    pod = self.kube_client.read_namespaced_pod(name=pod_name, 
namespace=namespace)
+                    pod_status = getattr(pod.status, "phase", None)
+                    pod_reason = getattr(pod.status, "reason", None)
+                    pod_message = getattr(pod.status, "message", None)
+
+                    # If containerStatuses has detailed reasons, print them as 
well.
+                    container_statuses = getattr(pod.status, 
"container_statuses", None)
+                    container_state = None
+                    container_reason = None
+                    container_message = None
+                    if container_statuses:
+                        for cs in container_statuses:
+                            state_obj = cs.state
+                            if state_obj.terminated:
+                                container_state = "terminated"
+                                container_reason = 
getattr(state_obj.terminated, "reason", None)
+                                container_message = 
getattr(state_obj.terminated, "message", None)
+                                break
+                            if state_obj.waiting:
+                                container_state = "waiting"
+                                container_reason = getattr(state_obj.waiting, 
"reason", None)
+                                container_message = getattr(state_obj.waiting, 
"message", None)
+                                break
+                    self.log.error(
+                        "Pod %s in namespace %s failed. Pod phase: %s, reason: 
%s, message: %s, container_state: %s, container_reason: %s, container_message: 
%s",

Review Comment:
   Please include `key` here. Or at least the `run_id` component of the `key`. 
It makes it a lot easier to search through logs without having to consider 
previous lines "recently logged".



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -417,6 +417,46 @@ def _change_state(
         if TYPE_CHECKING:
             assert self.kube_scheduler
 
+        if state == TaskInstanceState.FAILED:
+            try:
+                if self.kube_client:
+                    pod = self.kube_client.read_namespaced_pod(name=pod_name, 
namespace=namespace)
+                    pod_status = getattr(pod.status, "phase", None)
+                    pod_reason = getattr(pod.status, "reason", None)
+                    pod_message = getattr(pod.status, "message", None)
+
+                    # If containerStatuses has detailed reasons, print them as 
well.
+                    container_statuses = getattr(pod.status, 
"container_statuses", None)
+                    container_state = None
+                    container_reason = None
+                    container_message = None
+                    if container_statuses:
+                        for cs in container_statuses:
+                            state_obj = cs.state
+                            if state_obj.terminated:
+                                container_state = "terminated"
+                                container_reason = 
getattr(state_obj.terminated, "reason", None)

Review Comment:
   When `reason == "Completed"` it means the pod ran to completion as expected. 
It should perhaps not be logged as error,  Though the `state` would most likely 
not be `FAILED` in that case. Only if there are multiple `container_statuses`, 
and since there is a for-loop over them it may appear that that scenario is 
supported.



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -417,6 +417,46 @@ def _change_state(
         if TYPE_CHECKING:
             assert self.kube_scheduler
 
+        if state == TaskInstanceState.FAILED:
+            try:
+                if self.kube_client:
+                    pod = self.kube_client.read_namespaced_pod(name=pod_name, 
namespace=namespace)
+                    pod_status = getattr(pod.status, "phase", None)
+                    pod_reason = getattr(pod.status, "reason", None)
+                    pod_message = getattr(pod.status, "message", None)
+
+                    # If containerStatuses has detailed reasons, print them as 
well.
+                    container_statuses = getattr(pod.status, 
"container_statuses", None)
+                    container_state = None
+                    container_reason = None
+                    container_message = None
+                    if container_statuses:
+                        for cs in container_statuses:
+                            state_obj = cs.state
+                            if state_obj.terminated:
+                                container_state = "terminated"
+                                container_reason = 
getattr(state_obj.terminated, "reason", None)
+                                container_message = 
getattr(state_obj.terminated, "message", None)
+                                break
+                            if state_obj.waiting:
+                                container_state = "waiting"
+                                container_reason = getattr(state_obj.waiting, 
"reason", None)
+                                container_message = getattr(state_obj.waiting, 
"message", None)
+                                break
+                    self.log.error(
+                        "Pod %s in namespace %s failed. Pod phase: %s, reason: 
%s, message: %s, container_state: %s, container_reason: %s, container_message: 
%s",
+                        pod_name,
+                        namespace,
+                        pod_status,
+                        pod_reason,
+                        pod_message,
+                        container_state,
+                        container_reason,
+                        container_message,
+                    )
+            except Exception as e:
+                self.log.warning("Failed to fetch pod failure reason for 
%s/%s: %s", namespace, pod_name, e)

Review Comment:
   Same here, please include `key` in log entry. 



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -417,6 +417,46 @@ def _change_state(
         if TYPE_CHECKING:
             assert self.kube_scheduler
 
+        if state == TaskInstanceState.FAILED:
+            try:
+                if self.kube_client:
+                    pod = self.kube_client.read_namespaced_pod(name=pod_name, 
namespace=namespace)
+                    pod_status = getattr(pod.status, "phase", None)
+                    pod_reason = getattr(pod.status, "reason", None)
+                    pod_message = getattr(pod.status, "message", None)
+
+                    # If containerStatuses has detailed reasons, print them as 
well.
+                    container_statuses = getattr(pod.status, 
"container_statuses", None)

Review Comment:
   Note that there is also a `pod.status.init_container_statuses` attribute, it 
may be good to include those errors as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to