jordi-crespo opened a new issue, #42144:
URL: https://github.com/apache/airflow/issues/42144

   ### Apache Airflow Provider(s)
   
   cncf-kubernetes
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-cncf-kubernetes==8.3.4
   
   ### Apache Airflow version
   
   2.9.2
   
   ### Operating System
   
   Docker, creating the image from python:3.10-slim-buster
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### What happened
   
   When using `KubernetesJobOperator` in deferred mode, the job executes 
correctly, but logs from the associated pod are not retrieved. 
   
   The issue arises in the 
[`execute_complete`](https://github.com/apache/airflow/blob/main/airflow/providers/cncf/kubernetes/operators/job.py#L226)
 method, where the pod_name and pod_namespace passed from the trigger are None, 
which causes the following code to fail:
   
   ```
   if self.get_logs:
       pod_name = event["pod_name"]
       pod_namespace = event["pod_namespace"]
       self.pod = self.hook.get_pod(pod_name, pod_namespace)
       if not self.pod:
           raise PodNotFoundException("Could not find pod after resuming from 
deferral")
       self._write_logs(self.pod)
   ```
   
   First, I create a `V1Pod` object that I pass to the `kubernetesJobOperator` 
using the parameter  `full_pod_spec`. This works fine as well in the 
`kubernetesPodOperator`. The `kubernetesJobOperator` creates the job it creates 
a `V1Pod` from my 
[`full_pod_spec`](https://github.com/apache/airflow/blob/main/airflow/providers/cncf/kubernetes/operators/job.py#L307).
 Later in this [line of 
code](https://github.com/apache/airflow/blob/main/airflow/providers/cncf/kubernetes/operators/job.py#L169)
 it creates the `self.pod` object.
   
   Because I am executing the opeartor as deferred the 
`self.execute_deferrable()` is executed. This method uses the 
`KubernetesJobTrigger` class and [passes the `pod_name` and 
`pod_namespace`](https://github.com/apache/airflow/blob/main/airflow/providers/cncf/kubernetes/operators/job.py#L212).
 Because I don't get an error, I assume that the self.pod object exist, because 
it can access the following pod attributes:
   
   ````
   pod_name=self.pod.metadata.name,  # type: ignore[union-attr]
   pod_namespace=self.pod.metadata.namespace,  # type: ignore[union-attr]
   ````
   
   
[`KubernetesJobTrigger`](https://github.com/apache/airflow/blob/main/airflow/providers/cncf/kubernetes/triggers/job.py#L32),
 yields  
[`TriggerEvent`](https://github.com/apache/airflow/blob/main/airflow/providers/cncf/kubernetes/triggers/job.py#L122)
 object, passing the `pod_name`and `pod_namespace` like this:
   
   ````
   yield TriggerEvent(
               {
                   "pod_name": pod.metadata.name if self.get_logs else None,
                   "pod_namespace": pod.metadata.namespace if self.get_logs 
else None,
     
               }
           )
   
   ````
   
   This pod object comes from this [piece of 
code](https://github.com/apache/airflow/blob/main/airflow/providers/cncf/kubernetes/triggers/job.py#L105):
   
   ````
   if self.get_logs or self.do_xcom_push:
   pod = await self.hook.get_pod(name=self.pod_name, 
namespace=self.pod_namespace)
   ````
   Next,  the method 
[`execute_complete`](https://github.com/apache/airflow/blob/main/airflow/providers/cncf/kubernetes/operators/job.py#L226)
 is executed is getting the `pod_name`and `pod_namespace` from the event.
   
   But I found that there these values are `None`.  I believe that the 
`TriggerEvent` is yielding None values.
   
   To resolve this, I had to manually extract the pod name from the job itself, 
by inheriting the `kubernetesJobOperator` and creating a new  
`execute_complete` method.
   
   
   ```
   pod_list_response = self.hook.get_namespaced_pod_list(
       label_selector=f"job-name={job_name}", namespace=job_namespace
   )
   
   pod_list = json.loads(pod_list_response.read())
   pods = pod_list.get("items", [])
   if not pods:
       raise PodNotFoundException(f"No pods found for job: {job_name} in 
namespace: {job_namespace}")
   
   pod = pods[0]
   pod_name = pod["metadata"]["name"]
   pod_namespace = pod["metadata"]["namespace"]
   ```
   
   After retrieving the pod details, I encountered another issue with the 
client used to fetch logs. By default, 
[`self.client`](https://github.com/apache/airflow/blob/main/airflow/providers/cncf/kubernetes/operators/job.py#L470)
 in `KubernetesJobOperator` points to the `BatchV1Api`:
   
   ```
   @cached_property
   def client(self) -> BatchV1Api:
       return self.hook.batch_v1_client
   ```
   
   This is happening in the following piece of code:
   
   ````
   if self.get_logs:
               pod_name = event["pod_name"]
               pod_namespace = event["pod_namespace"]
               self.pod = self.hook.get_pod(pod_name, pod_namespace)
               if not self.pod:
                   raise PodNotFoundException("Could not find pod after 
resuming from deferral")
               self._write_logs(self.pod)
   ````
   
   
[`self._write_logs(self.pod)`](https://github.com/apache/airflow/blob/main/airflow/providers/cncf/kubernetes/operators/pod.py#L832)
 is created in the `kubernetesPodOperator` that is creating the `self.client 
using   use `CoreV1Api`.   I resolved this by switching the client before 
calling the `_write_logs` function:
   
   ```
   self.client = self.hook.core_v1_client
   self._write_logs(self.pod)
   
   ```
   
   ### What you think should happen instead
   
   We should be able to send the pod_name and pod_namespace from the 
`KubernetesJobTrigger` and that they are not None values.
   
   Also, a raise error is executed if the job is in failed state before being 
able to print the logs:
   
   ````
   
       def execute_complete(self, context: Context, event: dict, **kwargs):
           ti = context["ti"]
           ti.xcom_push(key="job", value=event["job"])
           if event["status"] == "error":
               raise AirflowException(event["message"])
   
           if self.get_logs:
               pod_name = event["pod_name"]
               pod_namespace = event["pod_namespace"]
               self.pod = self.hook.get_pod(pod_name, pod_namespace)
               if not self.pod:
                   raise PodNotFoundException("Could not find pod after 
resuming from deferral")
               self._write_logs(self.pod)
   ````
   
   But even if the job fails, we would still want to see the logs:
   
   
   ````
   
       def execute_complete(self, context: Context, event: dict, **kwargs):
           ti = context["ti"]
           ti.xcom_push(key="job", value=event["job"])
          
   
           if self.get_logs:
               pod_name = event["pod_name"]
               pod_namespace = event["pod_namespace"]
               self.pod = self.hook.get_pod(pod_name, pod_namespace)
               if not self.pod:
                   raise PodNotFoundException("Could not find pod after 
resuming from deferral")
               self._write_logs(self.pod)
   
    if event["status"] == "error":
               raise AirflowException(event["message"])
   ````
   
   
   
   ### How to reproduce
   
   Write a DAG with the following operator:
   
   
   
   ````
   from kubernetes.client import V1Pod, V1ObjectMeta, V1PodSpec, V1Container
   from airflow.providers.cncf.kubernetes.operators.job import 
KubernetesJobOperator
   
   # Define the pod
   pod = V1Pod(
       metadata=V1ObjectMeta(name="example-pod"),
       spec=V1PodSpec(
           containers=[V1Container(
               name="example-container",
               image="busybox",
               command=["/bin/sh", "-c", "echo Hello, Kubernetes! && sleep 60"]
           )],
           restart_policy="Never"
       )
   )
   
   # Define the KubernetesJobOperator
   job = KubernetesJobOperator(
       task_id="example_k8s_job",
       full_pod_spec=pod,
       get_logs=True,
       in_cluster=True
   )
   ````
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to