sunank200 commented on code in PR #67896:
URL: https://github.com/apache/airflow/pull/67896#discussion_r3401847502


##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py:
##########
@@ -199,6 +199,13 @@ def from_obj(obj) -> dict | k8s.V1Pod | None:
         if not k8s_object and not k8s_legacy_object:
             return None
 
+        if not k8s_object and k8s_legacy_object:
+            raise AirflowConfigException(
+                "Using KubernetesExecutor dict inside an executor_config 
object"
+                "is now deprecated. Please switch to pod_override"
+                "kubernetes.client.models.V1Pod object."
+            )

Review Comment:
   ```suggestion
               raise AirflowConfigException(
                   "Using KubernetesExecutor dict inside an executor_config 
object "
                   "is now deprecated. Please switch to pod_override "
                   "kubernetes.client.models.V1Pod object."
               )
   ```



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py:
##########
@@ -199,6 +199,13 @@ def from_obj(obj) -> dict | k8s.V1Pod | None:
         if not k8s_object and not k8s_legacy_object:
             return None
 
+        if not k8s_object and k8s_legacy_object:
+            raise AirflowConfigException(
+                "Using KubernetesExecutor dict inside an executor_config 
object"
+                "is now deprecated. Please switch to pod_override"
+                "kubernetes.client.models.V1Pod object."
+            )

Review Comment:
   Nit: the message says "deprecated" but the function raises - nothing is 
deprecated, it's already broken. Maybe "no longer supported" reads more 
accurately?



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -227,6 +227,7 @@ def execute_async(
         else:
             pod_template_file = None
         self.event_buffer[key] = (TaskInstanceState.QUEUED, 
self.scheduler_job_id)
+        self.running.add(key)

Review Comment:
   The `self.running.add(key)` sits between `event_buffer` assignment and 
`task_queue.put()`. If `task_queue.put()` ever raises (full bounded queue or 
error), the key would be in `self.running` without a corresponding queued job. 
Can this occur? In that case we can consider moving `self.running.add(key)` 
after `task_queue.put()` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to