Merve40 opened a new issue, #47780:
URL: https://github.com/apache/airflow/issues/47780

   ### Apache Airflow Provider(s)
   
   cncf-kubernetes
   
   ### Versions of Apache Airflow Providers
   
   See [Cloud Composer 2 
dependencies](https://cloud.google.com/composer/docs/composer-versions#images-composer-2)
 for composer-2.11.3-airflow-2.10.2
   
   Kubernetes Version:
   apache-airflow-providers-cncf-kubernetes==10.1.0
   
   ### Apache Airflow version
   
   2.10.2
   
   ### Operating System
   
   Linux
   
   ### Deployment
   
   Google Cloud Composer
   
   ### Deployment details
   
   We access a Google Kubernetes Engine from Cloud Composer, both are in 
different VPC networks.
   
   ### What happened
   
   Starting `KubernetesJobOperator` in deferrable mode often-time causes 
race-condition issue in `execute`, when calling `self.get_or_create_pod`
   
   ```python
   def execute(self, context: Context):
           ...
   
           if self.pod is None:
               self.pod = self.get_or_create_pod(  # must set `self.pod` for 
`on_kill`
                   pod_request_obj=self.pod_request_obj,
                   context=context,
               )
   ```
   This method is implemented in the `KubernetesPodOperator`:
   
   ```python
   def get_or_create_pod(self, pod_request_obj: k8s.V1Pod, context: Context) -> 
k8s.V1Pod:
           if self.reattach_on_restart:
               pod = self.find_pod(pod_request_obj.metadata.namespace, 
context=context)
               if pod:
                   return pod
           self.log.debug("Starting pod:\n%s", 
yaml.safe_dump(pod_request_obj.to_dict()))
           self.pod_manager.create_pod(pod=pod_request_obj)
           return pod_request_obj
   ```
   
   The `find_pod` method returns an empty-list, since at the time of the call 
the Job has not created a Pod yet. This results in the creation of a second 
Pod, which does not have the correct template spec.
   
   
   ### What you think should happen instead
   
   In my opinion, there should be an additional wait time to allow the Job to 
create a Pod. This can easily be solved by overriding the `get_or_create_pod`  
method in `KubernetesJobOperator`:
   
   ```python
   def get_or_create_pod(self, pod_request_obj: V1Pod, context: Context) -> 
V1Pod:
           time.sleep(self.startup_timeout_seconds)
           return super().get_or_create_pod(pod_request_obj, context)
   ```
   
   ### How to reproduce
   
   Create a KubernetesPodOperator in deferrable mode
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to