This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 43473c7 [SPARK-54931] Support `Pod Disruption Budget` for Spark
cluster workers
43473c7 is described below
commit 43473c7ff12c98be35a7292b00bc9dcc31665f0d
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Wed Jan 7 20:02:27 2026 +0900
[SPARK-54931] Support `Pod Disruption Budget` for Spark cluster workers
### What changes were proposed in this pull request?
This PR aims to support `Pod Disruption Budget` for Spark cluster workers.
### Why are the changes needed?
K8s offers features to help the users run highly available applications via
[Pod disruption
budgets](https://kubernetes.io/docs/concepts/workloads/pods/disruptions/#pod-disruption-budgets)
even when clusters introduce frequent voluntary disruptions.
We had better provide a better stability for `SparkCluster`.
### Does this PR introduce _any_ user-facing change?
A better worker management.
### How was this patch tested?
Pass the CIs with newly added test cases.
### Was this patch authored or co-authored using generative AI tooling?
Yes (`Gemini 3 Pro` on `Antigravity`)
Closes #448 from dongjoon-hyun/SPARK-54931.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../templates/operator-rbac.yaml | 6 ++++
.../k8s/operator/context/SparkClusterContext.java | 10 ++++++
.../SparkClusterResourceSpecFactory.java | 3 ++
.../reconciler/reconcilesteps/ClusterInitStep.java | 10 ++++++
.../k8s/operator/SparkClusterResourceSpec.java | 38 ++++++++++++++++++++++
.../k8s/operator/SparkClusterResourceSpecTest.java | 38 ++++++++++++++++++++++
6 files changed, 105 insertions(+)
diff --git
a/build-tools/helm/spark-kubernetes-operator/templates/operator-rbac.yaml
b/build-tools/helm/spark-kubernetes-operator/templates/operator-rbac.yaml
index 1768159..e0ca9cb 100644
--- a/build-tools/helm/spark-kubernetes-operator/templates/operator-rbac.yaml
+++ b/build-tools/helm/spark-kubernetes-operator/templates/operator-rbac.yaml
@@ -52,6 +52,12 @@ rules:
- ingresses
verbs:
- '*'
+ - apiGroups:
+ - "policy"
+ resources:
+ - poddisruptionbudgets
+ verbs:
+ - '*'
{{- end }}
{{/*
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/context/SparkClusterContext.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/context/SparkClusterContext.java
index 836139c..892fed5 100644
---
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/context/SparkClusterContext.java
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/context/SparkClusterContext.java
@@ -24,6 +24,7 @@ import java.util.Optional;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.apps.StatefulSet;
import io.fabric8.kubernetes.api.model.autoscaling.v2.HorizontalPodAutoscaler;
+import io.fabric8.kubernetes.api.model.policy.v1.PodDisruptionBudget;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import lombok.RequiredArgsConstructor;
@@ -111,6 +112,15 @@ public class SparkClusterContext extends
BaseContext<SparkCluster> {
return getSecondaryResourceSpec().getHorizontalPodAutoscaler();
}
+ /**
+ * Returns the specification for the PodDisruptionBudget, if present.
+ *
+ * @return An Optional containing the PodDisruptionBudget object.
+ */
+ public Optional<PodDisruptionBudget> getPodDisruptionBudgetSpec() {
+ return getSecondaryResourceSpec().getPodDisruptionBudget();
+ }
+
/**
* Returns the Kubernetes client from the JOSDK context.
*
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkClusterResourceSpecFactory.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkClusterResourceSpecFactory.java
index b63ecef..9a394fd 100644
---
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkClusterResourceSpecFactory.java
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkClusterResourceSpecFactory.java
@@ -54,6 +54,9 @@ public final class SparkClusterResourceSpecFactory {
if (spec.getHorizontalPodAutoscaler().isPresent()) {
decorator.decorate(spec.getHorizontalPodAutoscaler().get());
}
+ if (spec.getPodDisruptionBudget().isPresent()) {
+ decorator.decorate(spec.getPodDisruptionBudget().get());
+ }
return spec;
}
}
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/ClusterInitStep.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/ClusterInitStep.java
index 2159b05..443760c 100644
---
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/ClusterInitStep.java
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/ClusterInitStep.java
@@ -85,6 +85,16 @@ public class ClusterInitStep extends ClusterReconcileStep {
.resource(horizontalPodAutoscaler.get())
.create();
}
+ var podDisruptionBudget = context.getPodDisruptionBudgetSpec();
+ if (podDisruptionBudget.isPresent()) {
+ context
+ .getClient()
+ .policy()
+ .v1()
+ .podDisruptionBudget()
+ .resource(podDisruptionBudget.get())
+ .create();
+ }
ClusterStatus updatedStatus =
context
diff --git
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java
index d0ce97a..012c1e0 100644
---
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java
+++
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java
@@ -38,6 +38,8 @@ import
io.fabric8.kubernetes.api.model.autoscaling.v2.HorizontalPodAutoscalerBui
import
io.fabric8.kubernetes.api.model.autoscaling.v2.HorizontalPodAutoscalerSpec;
import
io.fabric8.kubernetes.api.model.autoscaling.v2.HorizontalPodAutoscalerSpecBuilder;
import io.fabric8.kubernetes.api.model.autoscaling.v2.MetricSpecBuilder;
+import io.fabric8.kubernetes.api.model.policy.v1.PodDisruptionBudget;
+import io.fabric8.kubernetes.api.model.policy.v1.PodDisruptionBudgetBuilder;
import lombok.Getter;
import org.apache.spark.SparkConf;
@@ -53,6 +55,7 @@ public class SparkClusterResourceSpec {
@Getter private final StatefulSet masterStatefulSet;
@Getter private final StatefulSet workerStatefulSet;
@Getter private final Optional<HorizontalPodAutoscaler>
horizontalPodAutoscaler;
+ @Getter private final Optional<PodDisruptionBudget> podDisruptionBudget;
/**
* Constructs a new SparkClusterResourceSpec.
@@ -110,6 +113,7 @@ public class SparkClusterResourceSpec {
workerSpec.getStatefulSetMetadata(),
workerSpec.getStatefulSetSpec());
horizontalPodAutoscaler = buildHorizontalPodAutoscaler(clusterName,
namespace, spec);
+ podDisruptionBudget = buildPodDisruptionBudget(clusterName, namespace,
spec);
}
/**
@@ -418,4 +422,38 @@ public class SparkClusterResourceSpec {
.endSpec()
.build());
}
+
+ /**
+ * Builds a PodDisruptionBudget for the Spark workers.
+ *
+ * @param clusterName The name of the cluster.
+ * @param namespace The namespace of the cluster.
+ * @param spec The ClusterSpec for the cluster.
+ * @return An Optional containing a PodDisruptionBudget object, or empty if
minWorkers < 1.
+ */
+ private static Optional<PodDisruptionBudget> buildPodDisruptionBudget(
+ String clusterName, String namespace, ClusterSpec spec) {
+ if (spec.getClusterTolerations().getInstanceConfig().getMinWorkers() < 1) {
+ return Optional.empty();
+ }
+ return Optional.of(
+ new PodDisruptionBudgetBuilder()
+ .withNewMetadata()
+ .withName(clusterName + "-worker-pdb")
+ .withNamespace(namespace)
+ .addToLabels(LABEL_SPARK_VERSION_NAME,
spec.getRuntimeVersions().getSparkVersion())
+ .endMetadata()
+ .withNewSpec()
+ .withNewMinAvailable(1)
+ .withNewSelector()
+ .addToMatchLabels(
+ Map.of(
+ LABEL_SPARK_CLUSTER_NAME,
+ clusterName,
+ LABEL_SPARK_ROLE_NAME,
+ LABEL_SPARK_ROLE_WORKER_VALUE))
+ .endSelector()
+ .endSpec()
+ .build());
+ }
}
diff --git
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java
index 9717bf5..675365c 100644
---
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java
+++
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java
@@ -345,4 +345,42 @@ class SparkClusterResourceSpecTest {
assertEquals(1, hpa.getSpec().getMetrics().size());
assertEquals("worker",
hpa.getSpec().getMetrics().get(0).getContainerResource().getContainer());
}
+
+ @Test
+ void testPodDisruptionBudgetCreation() {
+ var instanceConfig = new WorkerInstanceConfig();
+ instanceConfig.setMinWorkers(1);
+ var clusterTolerations = new ClusterTolerations();
+ clusterTolerations.setInstanceConfig(instanceConfig);
+ when(clusterSpec.getClusterTolerations()).thenReturn(clusterTolerations);
+
+ SparkClusterResourceSpec spec = new SparkClusterResourceSpec(cluster, new
SparkConf());
+ assertTrue(spec.getPodDisruptionBudget().isPresent());
+ var pdb = spec.getPodDisruptionBudget().get();
+ assertEquals("policy/v1", pdb.getApiVersion());
+ assertEquals("PodDisruptionBudget", pdb.getKind());
+ assertEquals("my-namespace", pdb.getMetadata().getNamespace());
+ assertEquals("cluster-name-worker-pdb", pdb.getMetadata().getName());
+ assertEquals(VERSION,
pdb.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
+ assertEquals(1, pdb.getSpec().getMinAvailable().getIntVal());
+ assertEquals(
+ Map.of(
+ LABEL_SPARK_CLUSTER_NAME,
+ "cluster-name",
+ LABEL_SPARK_ROLE_NAME,
+ LABEL_SPARK_ROLE_WORKER_VALUE),
+ pdb.getSpec().getSelector().getMatchLabels());
+ }
+
+ @Test
+ void testPodDisruptionBudgetAbsence() {
+ var instanceConfig = new WorkerInstanceConfig();
+ instanceConfig.setMinWorkers(0);
+ var clusterTolerations = new ClusterTolerations();
+ clusterTolerations.setInstanceConfig(instanceConfig);
+ when(clusterSpec.getClusterTolerations()).thenReturn(clusterTolerations);
+
+ SparkClusterResourceSpec spec = new SparkClusterResourceSpec(cluster, new
SparkConf());
+ assertTrue(spec.getPodDisruptionBudget().isEmpty());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]