hussein-awala commented on code in PR #36240:
URL: https://github.com/apache/airflow/pull/36240#discussion_r1428952107
##########
airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -471,6 +471,15 @@ def _change_state(
if TYPE_CHECKING:
assert self.kube_scheduler
+ if state == TaskInstanceState.ADOPTED:
+ # When the task pod is adopted by another scheduler,
+ # then remove the task from the current scheduler running queue.
Review Comment:
Better to use `executor` instead of `scheduler`:
```suggestion
# When the task pod is adopted by another executor,
# then remove the task from the current executor running queue.
```
##########
airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:
##########
@@ -220,7 +220,15 @@ def process_status(
pod = event["object"]
annotations_string = annotations_for_logging_task_metadata(annotations)
"""Process status response."""
- if status == "Pending":
+ if event["type"] == "DELETED" and not pod.metadata.deletion_timestamp:
Review Comment:
Nice one!
##########
airflow/utils/state.py:
##########
@@ -46,6 +46,9 @@ class TaskInstanceState(str, Enum):
REMOVED = "removed" # Task vanished from DAG before it ran
SCHEDULED = "scheduled" # Task should run and will be handed to executor
soon
+ # Set by executor
+ ADOPTED = "adopted"
+
Review Comment:
KubernetesExecutor is a part of the `cncf.kuberntes` provider, which could
be used with the old version of Airlfow (the current min airflow version for
providers is 2.6.0), which means that this state could be unavailable for the
users who use the latest version of k8s provider with an old version of Airflow.
Also, the TI will never have this state in the metadata, so maybe you can
find another better way to tell the executor to free a slot.
##########
airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:
##########
@@ -220,7 +220,15 @@ def process_status(
pod = event["object"]
annotations_string = annotations_for_logging_task_metadata(annotations)
"""Process status response."""
- if status == "Pending":
+ if event["type"] == "DELETED" and not pod.metadata.deletion_timestamp:
+ # This will happen only when the task pods are adopted by another
scheduler.
Review Comment:
```suggestion
# This will happen only when the task pods are adopted by
another executor.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]