This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 43473c7  [SPARK-54931] Support `Pod Disruption Budget` for Spark 
cluster workers
43473c7 is described below

commit 43473c7ff12c98be35a7292b00bc9dcc31665f0d
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Wed Jan 7 20:02:27 2026 +0900

    [SPARK-54931] Support `Pod Disruption Budget` for Spark cluster workers
    
    ### What changes were proposed in this pull request?
    
    This PR aims to support `Pod Disruption Budget` for Spark cluster workers.
    
    ### Why are the changes needed?
    
    K8s offers features to help the users run highly available applications via 
[Pod disruption 
budgets](https://kubernetes.io/docs/concepts/workloads/pods/disruptions/#pod-disruption-budgets)
 even when clusters introduce frequent voluntary disruptions.
    
    We had better provide a better stability for `SparkCluster`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    A better worker management.
    
    ### How was this patch tested?
    
    Pass the CIs with newly added test cases.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Yes (`Gemini 3 Pro` on `Antigravity`)
    
    Closes #448 from dongjoon-hyun/SPARK-54931.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../templates/operator-rbac.yaml                   |  6 ++++
 .../k8s/operator/context/SparkClusterContext.java  | 10 ++++++
 .../SparkClusterResourceSpecFactory.java           |  3 ++
 .../reconciler/reconcilesteps/ClusterInitStep.java | 10 ++++++
 .../k8s/operator/SparkClusterResourceSpec.java     | 38 ++++++++++++++++++++++
 .../k8s/operator/SparkClusterResourceSpecTest.java | 38 ++++++++++++++++++++++
 6 files changed, 105 insertions(+)

diff --git 
a/build-tools/helm/spark-kubernetes-operator/templates/operator-rbac.yaml 
b/build-tools/helm/spark-kubernetes-operator/templates/operator-rbac.yaml
index 1768159..e0ca9cb 100644
--- a/build-tools/helm/spark-kubernetes-operator/templates/operator-rbac.yaml
+++ b/build-tools/helm/spark-kubernetes-operator/templates/operator-rbac.yaml
@@ -52,6 +52,12 @@ rules:
       - ingresses
     verbs:
       - '*'
+  - apiGroups:
+      - "policy"
+    resources:
+      - poddisruptionbudgets
+    verbs:
+      - '*'
 {{- end }}
 
 {{/*
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/context/SparkClusterContext.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/context/SparkClusterContext.java
index 836139c..892fed5 100644
--- 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/context/SparkClusterContext.java
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/context/SparkClusterContext.java
@@ -24,6 +24,7 @@ import java.util.Optional;
 import io.fabric8.kubernetes.api.model.Service;
 import io.fabric8.kubernetes.api.model.apps.StatefulSet;
 import io.fabric8.kubernetes.api.model.autoscaling.v2.HorizontalPodAutoscaler;
+import io.fabric8.kubernetes.api.model.policy.v1.PodDisruptionBudget;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 import lombok.RequiredArgsConstructor;
@@ -111,6 +112,15 @@ public class SparkClusterContext extends 
BaseContext<SparkCluster> {
     return getSecondaryResourceSpec().getHorizontalPodAutoscaler();
   }
 
+  /**
+   * Returns the specification for the PodDisruptionBudget, if present.
+   *
+   * @return An Optional containing the PodDisruptionBudget object.
+   */
+  public Optional<PodDisruptionBudget> getPodDisruptionBudgetSpec() {
+    return getSecondaryResourceSpec().getPodDisruptionBudget();
+  }
+
   /**
    * Returns the Kubernetes client from the JOSDK context.
    *
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkClusterResourceSpecFactory.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkClusterResourceSpecFactory.java
index b63ecef..9a394fd 100644
--- 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkClusterResourceSpecFactory.java
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkClusterResourceSpecFactory.java
@@ -54,6 +54,9 @@ public final class SparkClusterResourceSpecFactory {
     if (spec.getHorizontalPodAutoscaler().isPresent()) {
       decorator.decorate(spec.getHorizontalPodAutoscaler().get());
     }
+    if (spec.getPodDisruptionBudget().isPresent()) {
+      decorator.decorate(spec.getPodDisruptionBudget().get());
+    }
     return spec;
   }
 }
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/ClusterInitStep.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/ClusterInitStep.java
index 2159b05..443760c 100644
--- 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/ClusterInitStep.java
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/ClusterInitStep.java
@@ -85,6 +85,16 @@ public class ClusterInitStep extends ClusterReconcileStep {
             .resource(horizontalPodAutoscaler.get())
             .create();
       }
+      var podDisruptionBudget = context.getPodDisruptionBudgetSpec();
+      if (podDisruptionBudget.isPresent()) {
+        context
+            .getClient()
+            .policy()
+            .v1()
+            .podDisruptionBudget()
+            .resource(podDisruptionBudget.get())
+            .create();
+      }
 
       ClusterStatus updatedStatus =
           context
diff --git 
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java
 
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java
index d0ce97a..012c1e0 100644
--- 
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java
+++ 
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java
@@ -38,6 +38,8 @@ import 
io.fabric8.kubernetes.api.model.autoscaling.v2.HorizontalPodAutoscalerBui
 import 
io.fabric8.kubernetes.api.model.autoscaling.v2.HorizontalPodAutoscalerSpec;
 import 
io.fabric8.kubernetes.api.model.autoscaling.v2.HorizontalPodAutoscalerSpecBuilder;
 import io.fabric8.kubernetes.api.model.autoscaling.v2.MetricSpecBuilder;
+import io.fabric8.kubernetes.api.model.policy.v1.PodDisruptionBudget;
+import io.fabric8.kubernetes.api.model.policy.v1.PodDisruptionBudgetBuilder;
 import lombok.Getter;
 
 import org.apache.spark.SparkConf;
@@ -53,6 +55,7 @@ public class SparkClusterResourceSpec {
   @Getter private final StatefulSet masterStatefulSet;
   @Getter private final StatefulSet workerStatefulSet;
   @Getter private final Optional<HorizontalPodAutoscaler> 
horizontalPodAutoscaler;
+  @Getter private final Optional<PodDisruptionBudget> podDisruptionBudget;
 
   /**
    * Constructs a new SparkClusterResourceSpec.
@@ -110,6 +113,7 @@ public class SparkClusterResourceSpec {
             workerSpec.getStatefulSetMetadata(),
             workerSpec.getStatefulSetSpec());
     horizontalPodAutoscaler = buildHorizontalPodAutoscaler(clusterName, 
namespace, spec);
+    podDisruptionBudget = buildPodDisruptionBudget(clusterName, namespace, 
spec);
   }
 
   /**
@@ -418,4 +422,38 @@ public class SparkClusterResourceSpec {
             .endSpec()
             .build());
   }
+
+  /**
+   * Builds a PodDisruptionBudget for the Spark workers.
+   *
+   * @param clusterName The name of the cluster.
+   * @param namespace The namespace of the cluster.
+   * @param spec The ClusterSpec for the cluster.
+   * @return An Optional containing a PodDisruptionBudget object, or empty if 
minWorkers < 1.
+   */
+  private static Optional<PodDisruptionBudget> buildPodDisruptionBudget(
+      String clusterName, String namespace, ClusterSpec spec) {
+    if (spec.getClusterTolerations().getInstanceConfig().getMinWorkers() < 1) {
+      return Optional.empty();
+    }
+    return Optional.of(
+        new PodDisruptionBudgetBuilder()
+            .withNewMetadata()
+            .withName(clusterName + "-worker-pdb")
+            .withNamespace(namespace)
+            .addToLabels(LABEL_SPARK_VERSION_NAME, 
spec.getRuntimeVersions().getSparkVersion())
+            .endMetadata()
+            .withNewSpec()
+            .withNewMinAvailable(1)
+            .withNewSelector()
+            .addToMatchLabels(
+                Map.of(
+                    LABEL_SPARK_CLUSTER_NAME,
+                    clusterName,
+                    LABEL_SPARK_ROLE_NAME,
+                    LABEL_SPARK_ROLE_WORKER_VALUE))
+            .endSelector()
+            .endSpec()
+            .build());
+  }
 }
diff --git 
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java
 
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java
index 9717bf5..675365c 100644
--- 
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java
+++ 
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java
@@ -345,4 +345,42 @@ class SparkClusterResourceSpecTest {
     assertEquals(1, hpa.getSpec().getMetrics().size());
     assertEquals("worker", 
hpa.getSpec().getMetrics().get(0).getContainerResource().getContainer());
   }
+
+  @Test
+  void testPodDisruptionBudgetCreation() {
+    var instanceConfig = new WorkerInstanceConfig();
+    instanceConfig.setMinWorkers(1);
+    var clusterTolerations = new ClusterTolerations();
+    clusterTolerations.setInstanceConfig(instanceConfig);
+    when(clusterSpec.getClusterTolerations()).thenReturn(clusterTolerations);
+
+    SparkClusterResourceSpec spec = new SparkClusterResourceSpec(cluster, new 
SparkConf());
+    assertTrue(spec.getPodDisruptionBudget().isPresent());
+    var pdb = spec.getPodDisruptionBudget().get();
+    assertEquals("policy/v1", pdb.getApiVersion());
+    assertEquals("PodDisruptionBudget", pdb.getKind());
+    assertEquals("my-namespace", pdb.getMetadata().getNamespace());
+    assertEquals("cluster-name-worker-pdb", pdb.getMetadata().getName());
+    assertEquals(VERSION, 
pdb.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
+    assertEquals(1, pdb.getSpec().getMinAvailable().getIntVal());
+    assertEquals(
+        Map.of(
+            LABEL_SPARK_CLUSTER_NAME,
+            "cluster-name",
+            LABEL_SPARK_ROLE_NAME,
+            LABEL_SPARK_ROLE_WORKER_VALUE),
+        pdb.getSpec().getSelector().getMatchLabels());
+  }
+
+  @Test
+  void testPodDisruptionBudgetAbsence() {
+    var instanceConfig = new WorkerInstanceConfig();
+    instanceConfig.setMinWorkers(0);
+    var clusterTolerations = new ClusterTolerations();
+    clusterTolerations.setInstanceConfig(instanceConfig);
+    when(clusterSpec.getClusterTolerations()).thenReturn(clusterTolerations);
+
+    SparkClusterResourceSpec spec = new SparkClusterResourceSpec(cluster, new 
SparkConf());
+    assertTrue(spec.getPodDisruptionBudget().isEmpty());
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to