SameerMesiah97 commented on code in PR #61110:
URL: https://github.com/apache/airflow/pull/61110#discussion_r2755938748
##########
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py:
##########
@@ -983,21 +991,141 @@ def test_find_spark_job_picks_running_pod(
running_pod.metadata.labels = {"try_number": "1"}
running_pod.status.phase = "Running"
- # Pending pod should not be selected.
+ # Terminating pod should not be selected.
+ terminating_pod = mock.MagicMock()
+ terminating_pod.metadata.creation_timestamp = timezone.datetime(2025,
1, 1, tzinfo=timezone.utc)
+ terminating_pod.metadata.deletion_timestamp = timezone.datetime(2025,
1, 2, tzinfo=timezone.utc)
+ terminating_pod.metadata.name = "spark-driver-pending"
+ terminating_pod.metadata.labels = {"try_number": "1"}
+ terminating_pod.status.phase = "Running"
+
+ mock_get_kube_client.list_namespaced_pod.return_value.items = [
+ running_pod,
+ terminating_pod,
+ ]
+
+ returned_pod = op.find_spark_job(context)
+
+ assert returned_pod is running_pod
+
+ def test_find_spark_job_picks_pending_pod(
+ self,
+ mock_is_in_cluster,
+ mock_parent_execute,
+ mock_create_namespaced_crd,
+ mock_get_namespaced_custom_object_status,
+ mock_cleanup,
+ mock_create_job_name,
+ mock_get_kube_client,
+ mock_create_pod,
+ mock_await_pod_completion,
+ mock_fetch_requested_container_logs,
+ data_file,
+ ):
+ """
+ Verifies that find_spark_job picks a Running Spark driver pod over a
non-Running pod.
+ """
+
+ task_name = "test_find_spark_job_prefers_running_pod"
+ job_spec =
yaml.safe_load(data_file("spark/application_template.yaml").read_text())
+
+ mock_create_job_name.return_value = task_name
+ op = SparkKubernetesOperator(
+ template_spec=job_spec,
+ kubernetes_conn_id="kubernetes_default_kube_config",
+ task_id=task_name,
+ get_logs=True,
+ reattach_on_restart=True,
+ )
+ context = create_context(op)
+
+ # Pending pod should be selected.
pending_pod = mock.MagicMock()
pending_pod.metadata.creation_timestamp = timezone.datetime(2025, 1,
1, tzinfo=timezone.utc)
- pending_pod.metadata.name = "spark-driver-pending"
+ pending_pod.metadata.name = "spark-driver-running"
pending_pod.metadata.labels = {"try_number": "1"}
pending_pod.status.phase = "Pending"
+ # Terminating pod should not be selected.
+ terminating_pod = mock.MagicMock()
+ terminating_pod.metadata.creation_timestamp = timezone.datetime(2025,
1, 1, tzinfo=timezone.utc)
+ terminating_pod.metadata.deletion_timestamp = timezone.datetime(2025,
1, 2, tzinfo=timezone.utc)
+ terminating_pod.metadata.name = "spark-driver-pending"
+ terminating_pod.metadata.labels = {"try_number": "1"}
+ terminating_pod.status.phase = "Running"
+
mock_get_kube_client.list_namespaced_pod.return_value.items = [
- running_pod,
pending_pod,
+ terminating_pod,
]
returned_pod = op.find_spark_job(context)
- assert returned_pod is running_pod
+ assert returned_pod is pending_pod
+
+ def test_find_spark_job_picks_succeeded(
+ self,
+ mock_is_in_cluster,
+ mock_parent_execute,
+ mock_create_namespaced_crd,
+ mock_get_namespaced_custom_object_status,
+ mock_cleanup,
+ mock_create_job_name,
+ mock_get_kube_client,
+ mock_create_pod,
+ mock_await_pod_completion,
+ mock_fetch_requested_container_logs,
+ data_file,
+ ):
+ """
+ Verifies that find_spark_job picks a Running Spark driver pod over a
non-Running pod.
+ """
+
+ task_name = "test_find_spark_job_prefers_running_pod"
+ job_spec =
yaml.safe_load(data_file("spark/application_template.yaml").read_text())
+
+ mock_create_job_name.return_value = task_name
+ op = SparkKubernetesOperator(
+ template_spec=job_spec,
+ kubernetes_conn_id="kubernetes_default_kube_config",
+ task_id=task_name,
+ get_logs=True,
+ reattach_on_restart=True,
+ )
+ context = create_context(op)
+
+ # Succeeded pod should be selected.
+ succeeded_pod = mock.MagicMock()
+ succeeded_pod.metadata.creation_timestamp = timezone.datetime(2025, 1,
1, tzinfo=timezone.utc)
+ succeeded_pod.metadata.name = "spark-driver-running"
+ succeeded_pod.metadata.labels = {"try_number": "1"}
Review Comment:
I think you should set a `deletion_timestamp` here as well. As I believe
that under your current implementation, this test will still pass if pod phase
is not 'Succeeded'. `deletion_timestamp` is the primary criterion for pod
selection. Or maybe you could leave it unset. With this test, we want to assert
that `find_spark_job` picks the 'Succeeded' pod even when there are multiple
valid candidates i.e non-terminating pods.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]