jordi-crespo opened a new issue, #42144: URL: https://github.com/apache/airflow/issues/42144
### Apache Airflow Provider(s) cncf-kubernetes ### Versions of Apache Airflow Providers apache-airflow-providers-cncf-kubernetes==8.3.4 ### Apache Airflow version 2.9.2 ### Operating System Docker, creating the image from python:3.10-slim-buster ### Deployment Official Apache Airflow Helm Chart ### Deployment details _No response_ ### What happened When using `KubernetesJobOperator` in deferred mode, the job executes correctly, but logs from the associated pod are not retrieved. The issue arises in the [`execute_complete`](https://github.com/apache/airflow/blob/main/airflow/providers/cncf/kubernetes/operators/job.py#L226) method, where the pod_name and pod_namespace passed from the trigger are None, which causes the following code to fail: ``` if self.get_logs: pod_name = event["pod_name"] pod_namespace = event["pod_namespace"] self.pod = self.hook.get_pod(pod_name, pod_namespace) if not self.pod: raise PodNotFoundException("Could not find pod after resuming from deferral") self._write_logs(self.pod) ``` First, I create a `V1Pod` object that I pass to the `kubernetesJobOperator` using the parameter `full_pod_spec`. This works fine as well in the `kubernetesPodOperator`. The `kubernetesJobOperator` creates the job it creates a `V1Pod` from my [`full_pod_spec`](https://github.com/apache/airflow/blob/main/airflow/providers/cncf/kubernetes/operators/job.py#L307). Later in this [line of code](https://github.com/apache/airflow/blob/main/airflow/providers/cncf/kubernetes/operators/job.py#L169) it creates the `self.pod` object. Because I am executing the opeartor as deferred the `self.execute_deferrable()` is executed. This method uses the `KubernetesJobTrigger` class and [passes the `pod_name` and `pod_namespace`](https://github.com/apache/airflow/blob/main/airflow/providers/cncf/kubernetes/operators/job.py#L212). Because I don't get an error, I assume that the self.pod object exist, because it can access the following pod attributes: ```` pod_name=self.pod.metadata.name, # type: ignore[union-attr] pod_namespace=self.pod.metadata.namespace, # type: ignore[union-attr] ```` [`KubernetesJobTrigger`](https://github.com/apache/airflow/blob/main/airflow/providers/cncf/kubernetes/triggers/job.py#L32), yields [`TriggerEvent`](https://github.com/apache/airflow/blob/main/airflow/providers/cncf/kubernetes/triggers/job.py#L122) object, passing the `pod_name`and `pod_namespace` like this: ```` yield TriggerEvent( { "pod_name": pod.metadata.name if self.get_logs else None, "pod_namespace": pod.metadata.namespace if self.get_logs else None, } ) ```` This pod object comes from this [piece of code](https://github.com/apache/airflow/blob/main/airflow/providers/cncf/kubernetes/triggers/job.py#L105): ```` if self.get_logs or self.do_xcom_push: pod = await self.hook.get_pod(name=self.pod_name, namespace=self.pod_namespace) ```` Next, the method [`execute_complete`](https://github.com/apache/airflow/blob/main/airflow/providers/cncf/kubernetes/operators/job.py#L226) is executed is getting the `pod_name`and `pod_namespace` from the event. But I found that there these values are `None`. I believe that the `TriggerEvent` is yielding None values. To resolve this, I had to manually extract the pod name from the job itself, by inheriting the `kubernetesJobOperator` and creating a new `execute_complete` method. ``` pod_list_response = self.hook.get_namespaced_pod_list( label_selector=f"job-name={job_name}", namespace=job_namespace ) pod_list = json.loads(pod_list_response.read()) pods = pod_list.get("items", []) if not pods: raise PodNotFoundException(f"No pods found for job: {job_name} in namespace: {job_namespace}") pod = pods[0] pod_name = pod["metadata"]["name"] pod_namespace = pod["metadata"]["namespace"] ``` After retrieving the pod details, I encountered another issue with the client used to fetch logs. By default, [`self.client`](https://github.com/apache/airflow/blob/main/airflow/providers/cncf/kubernetes/operators/job.py#L470) in `KubernetesJobOperator` points to the `BatchV1Api`: ``` @cached_property def client(self) -> BatchV1Api: return self.hook.batch_v1_client ``` This is happening in the following piece of code: ```` if self.get_logs: pod_name = event["pod_name"] pod_namespace = event["pod_namespace"] self.pod = self.hook.get_pod(pod_name, pod_namespace) if not self.pod: raise PodNotFoundException("Could not find pod after resuming from deferral") self._write_logs(self.pod) ```` [`self._write_logs(self.pod)`](https://github.com/apache/airflow/blob/main/airflow/providers/cncf/kubernetes/operators/pod.py#L832) is created in the `kubernetesPodOperator` that is creating the `self.client using use `CoreV1Api`. I resolved this by switching the client before calling the `_write_logs` function: ``` self.client = self.hook.core_v1_client self._write_logs(self.pod) ``` ### What you think should happen instead We should be able to send the pod_name and pod_namespace from the `KubernetesJobTrigger` and that they are not None values. Also, a raise error is executed if the job is in failed state before being able to print the logs: ```` def execute_complete(self, context: Context, event: dict, **kwargs): ti = context["ti"] ti.xcom_push(key="job", value=event["job"]) if event["status"] == "error": raise AirflowException(event["message"]) if self.get_logs: pod_name = event["pod_name"] pod_namespace = event["pod_namespace"] self.pod = self.hook.get_pod(pod_name, pod_namespace) if not self.pod: raise PodNotFoundException("Could not find pod after resuming from deferral") self._write_logs(self.pod) ```` But even if the job fails, we would still want to see the logs: ```` def execute_complete(self, context: Context, event: dict, **kwargs): ti = context["ti"] ti.xcom_push(key="job", value=event["job"]) if self.get_logs: pod_name = event["pod_name"] pod_namespace = event["pod_namespace"] self.pod = self.hook.get_pod(pod_name, pod_namespace) if not self.pod: raise PodNotFoundException("Could not find pod after resuming from deferral") self._write_logs(self.pod) if event["status"] == "error": raise AirflowException(event["message"]) ```` ### How to reproduce Write a DAG with the following operator: ```` from kubernetes.client import V1Pod, V1ObjectMeta, V1PodSpec, V1Container from airflow.providers.cncf.kubernetes.operators.job import KubernetesJobOperator # Define the pod pod = V1Pod( metadata=V1ObjectMeta(name="example-pod"), spec=V1PodSpec( containers=[V1Container( name="example-container", image="busybox", command=["/bin/sh", "-c", "echo Hello, Kubernetes! && sleep 60"] )], restart_policy="Never" ) ) # Define the KubernetesJobOperator job = KubernetesJobOperator( task_id="example_k8s_job", full_pod_spec=pod, get_logs=True, in_cluster=True ) ```` ### Anything else _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org