Nataneljpwd commented on code in PR #61110:
URL: https://github.com/apache/airflow/pull/61110#discussion_r2756015241
##########
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py:
##########
@@ -983,21 +991,141 @@ def test_find_spark_job_picks_running_pod(
running_pod.metadata.labels = {"try_number": "1"}
running_pod.status.phase = "Running"
- # Pending pod should not be selected.
+ # Terminating pod should not be selected.
+ terminating_pod = mock.MagicMock()
+ terminating_pod.metadata.creation_timestamp = timezone.datetime(2025,
1, 1, tzinfo=timezone.utc)
+ terminating_pod.metadata.deletion_timestamp = timezone.datetime(2025,
1, 2, tzinfo=timezone.utc)
+ terminating_pod.metadata.name = "spark-driver-pending"
+ terminating_pod.metadata.labels = {"try_number": "1"}
+ terminating_pod.status.phase = "Running"
+
+ mock_get_kube_client.list_namespaced_pod.return_value.items = [
+ running_pod,
+ terminating_pod,
+ ]
+
+ returned_pod = op.find_spark_job(context)
+
+ assert returned_pod is running_pod
+
+ def test_find_spark_job_picks_pending_pod(
+ self,
+ mock_is_in_cluster,
+ mock_parent_execute,
+ mock_create_namespaced_crd,
+ mock_get_namespaced_custom_object_status,
+ mock_cleanup,
+ mock_create_job_name,
+ mock_get_kube_client,
+ mock_create_pod,
+ mock_await_pod_completion,
+ mock_fetch_requested_container_logs,
+ data_file,
+ ):
+ """
+ Verifies that find_spark_job picks a Running Spark driver pod over a
non-Running pod.
+ """
Review Comment:
Done
##########
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py:
##########
@@ -983,21 +991,141 @@ def test_find_spark_job_picks_running_pod(
running_pod.metadata.labels = {"try_number": "1"}
running_pod.status.phase = "Running"
- # Pending pod should not be selected.
+ # Terminating pod should not be selected.
+ terminating_pod = mock.MagicMock()
+ terminating_pod.metadata.creation_timestamp = timezone.datetime(2025,
1, 1, tzinfo=timezone.utc)
+ terminating_pod.metadata.deletion_timestamp = timezone.datetime(2025,
1, 2, tzinfo=timezone.utc)
+ terminating_pod.metadata.name = "spark-driver-pending"
+ terminating_pod.metadata.labels = {"try_number": "1"}
+ terminating_pod.status.phase = "Running"
+
+ mock_get_kube_client.list_namespaced_pod.return_value.items = [
+ running_pod,
+ terminating_pod,
+ ]
+
+ returned_pod = op.find_spark_job(context)
+
+ assert returned_pod is running_pod
+
+ def test_find_spark_job_picks_pending_pod(
Review Comment:
Here the test does check we get a pending pod rather than a non terminating,
I think it is better if this stays the same
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py:
##########
@@ -279,6 +289,9 @@ def find_spark_job(self, context, exclude_checked: bool =
True):
self.log.info("`try_number` of pod: %s",
pod.metadata.labels.get("try_number", "unknown"))
return pod
+ def _get_field_selector(self) -> str:
+ return
f"status.phase!={PodPhase.FAILED},status.phase!={PodPhase.UNKNOWN}"
Review Comment:
Agree, added
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py:
##########
@@ -248,23 +248,33 @@ def find_spark_job(self, context, exclude_checked: bool =
True):
self._build_find_pod_label_selector(context,
exclude_checked=exclude_checked)
+ ",spark-role=driver"
)
- pod_list = self.client.list_namespaced_pod(self.namespace,
label_selector=label_selector).items
+ # since we did not specify a resource version, we make sure to get the
latest data
+ # we make sure we get only running or pending pods.
+ field_selector = self._get_field_selector()
+ pod_list = self.client.list_namespaced_pod(
+ self.namespace, label_selector=label_selector,
field_selector=field_selector
+ ).items
pod = None
if len(pod_list) > 1:
# When multiple pods match the same labels, select one
deterministically,
- # preferring a Running pod, then creation time, with name as a
tie-breaker.
+ # preferring Succeeded, then Running (while not in terminating)
then Pending pod
+ # as if another pod was created, it will be in either the
terminating status or a terminal phase,
+ # if it is in terminating, it will have a deletion_timestamp on
the pod.
+ # pending pods need to also be selected, as what if a driver pod
just failed and a new pod is
+ # created, we do not want the task to fail.
Review Comment:
Sounds good, way better than what I wrote, changed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]