Varka808 commented on code in PR #27523:
URL: https://github.com/apache/flink/pull/27523#discussion_r2938351206


##########
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java:
##########
@@ -586,6 +586,85 @@ public class KubernetesConfigOptions {
                             "The node label whose value is the same as the 
node name. "
                                     + "Currently, this will only be used to 
set the node affinity of TM pods to avoid being scheduled on blocked nodes.");
 
+    /**
+     * The user-specified PersistentVolumeClaims (PVCs) that will be mounted 
into Flink containers.
+     *
+     * <p>The value should be in the form of {@code pvc-name:/mount/path} 
separated by commas.
+     * Multiple PVCs can be specified by separating them with commas.
+     *
+     * <p>Example: {@code 
checkpoint-pvc:/opt/flink/checkpoints,data-pvc:/opt/flink/data}
+     *
+     * <p>Prerequisites:
+     *
+     * <ul>
+     *   <li>The PVCs must exist in the same namespace as the Flink cluster 
before deployment
+     *   <li>The PVCs must have appropriate access modes:
+     *       <ul>
+     *         <li>ReadWriteOnce (RWO): For single pod access
+     *         <li>ReadWriteMany (RWX): For multiple pods (recommended for HA 
setups)
+     *         <li>ReadOnlyMany (ROX): For read-only access from multiple pods
+     *       </ul>
+     * </ul>
+     *
+     * <p>Common use cases:
+     *
+     * <ul>
+     *   <li>Checkpoint storage: Mount a shared PVC for storing checkpoints
+     *   <li>Savepoint storage: Mount a PVC for savepoint data
+     *   <li>Shared data: Mount read-only PVCs containing reference data
+     *   <li>Job artifacts: Mount PVCs containing job JARs or dependencies
+     * </ul>
+     */
+    public static final ConfigOption<Map<String, String>> 
KUBERNETES_PERSISTENT_VOLUME_CLAIMS =
+            key("kubernetes.persistent-volume-claims")
+                    .mapType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The user-specified %s that will 
be mounted into Flink containers. "
+                                                    + "The value should be in 
the form of %s. "
+                                                    + "Multiple PVCs can be 
specified, for example: %s. "
+                                                    + "The PVCs must exist in 
the same namespace as the Flink cluster before deployment. "
+                                                    + "For HA setups with 
multiple JobManagers or TaskManagers accessing the same storage, "
+                                                    + "use PVCs with 
ReadWriteMany (RWX) or ReadOnlyMany (ROX) access modes.",
+                                            link(
+                                                    
"https://kubernetes.io/docs/concepts/storage/persistent-volumes/";,
+                                                    "PersistentVolumeClaims 
(PVCs)"),
+                                            code("pvc-name:/mount/path"),
+                                            code(
+                                                    
"checkpoint-pvc:/opt/flink/checkpoints,data-pvc:/opt/flink/data"))
+                                    .build());
+
+    /**
+     * Whether to mount PersistentVolumeClaims (PVCs) as read-only.
+     *
+     * <p>When set to true, all PVCs configured via {@link 
#KUBERNETES_PERSISTENT_VOLUME_CLAIMS}
+     * will be mounted as read-only. This is useful when the PVC contains 
shared data that should
+     * not be modified by Flink, such as reference datasets or pre-trained 
models.
+     *
+     * <p>Note: This setting applies globally to all PVCs configured via {@link
+     * #KUBERNETES_PERSISTENT_VOLUME_CLAIMS}. If you need different access 
modes for different PVCs,
+     * consider using pod templates instead.
+     *
+     * <p>Default: false (read-write mode)

Review Comment:
   Can't the `Default: false` be directly known from the `defaultValue(false)` 
below? :)



##########
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java:
##########
@@ -586,6 +586,85 @@ public class KubernetesConfigOptions {
                             "The node label whose value is the same as the 
node name. "
                                     + "Currently, this will only be used to 
set the node affinity of TM pods to avoid being scheduled on blocked nodes.");
 
+    /**
+     * The user-specified PersistentVolumeClaims (PVCs) that will be mounted 
into Flink containers.
+     *
+     * <p>The value should be in the form of {@code pvc-name:/mount/path} 
separated by commas.
+     * Multiple PVCs can be specified by separating them with commas.
+     *
+     * <p>Example: {@code 
checkpoint-pvc:/opt/flink/checkpoints,data-pvc:/opt/flink/data}
+     *
+     * <p>Prerequisites:
+     *
+     * <ul>
+     *   <li>The PVCs must exist in the same namespace as the Flink cluster 
before deployment
+     *   <li>The PVCs must have appropriate access modes:
+     *       <ul>
+     *         <li>ReadWriteOnce (RWO): For single pod access
+     *         <li>ReadWriteMany (RWX): For multiple pods (recommended for HA 
setups)
+     *         <li>ReadOnlyMany (ROX): For read-only access from multiple pods
+     *       </ul>
+     * </ul>
+     *
+     * <p>Common use cases:
+     *
+     * <ul>
+     *   <li>Checkpoint storage: Mount a shared PVC for storing checkpoints
+     *   <li>Savepoint storage: Mount a PVC for savepoint data
+     *   <li>Shared data: Mount read-only PVCs containing reference data
+     *   <li>Job artifacts: Mount PVCs containing job JARs or dependencies
+     * </ul>
+     */

Review Comment:
   IMO, `common use cases` could be removed here. This java doc is only for 
devs.



##########
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/PersistentVolumeClaimMountDecorator.java:
##########
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.decorators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkPod;
+import 
org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters;
+
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ContainerBuilder;
+import 
io.fabric8.kubernetes.api.model.PersistentVolumeClaimVolumeSourceBuilder;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.api.model.Volume;
+import io.fabric8.kubernetes.api.model.VolumeBuilder;
+import io.fabric8.kubernetes.api.model.VolumeMount;
+import io.fabric8.kubernetes.api.model.VolumeMountBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Decorator for mounting Kubernetes PersistentVolumeClaims (PVCs) to 
JobManager and TaskManager
+ * pods.
+ *
+ * <p>This decorator allows users to attach pre-existing PVCs to Flink pods, 
enabling persistent
+ * storage for checkpoints, savepoints, or other data that needs to survive 
pod restarts.
+ *
+ * <h2>Configuration</h2>
+ *
+ * <p>Users can configure PVC mounting through the following configuration 
options:
+ *
+ * <ul>
+ *   <li>{@link KubernetesConfigOptions#KUBERNETES_PERSISTENT_VOLUME_CLAIMS} - 
Specifies PVCs and
+ *       their mount paths in the format {@code 
pvc-name:/mount/path,pvc-name2:/mount/path2}
+ *   <li>{@link 
KubernetesConfigOptions#KUBERNETES_PERSISTENT_VOLUME_CLAIM_READ_ONLY} - When 
set to
+ *       true, mounts all PVCs as read-only (default: false)
+ * </ul>
+ *
+ * <h2>Usage Example</h2>
+ *
+ * <pre>{@code
+ * # Mount a single PVC for checkpoint storage
+ * kubernetes.persistent-volume-claims: checkpoint-pvc:/opt/flink/checkpoints
+ *
+ * # Mount multiple PVCs
+ * kubernetes.persistent-volume-claims: 
checkpoint-pvc:/opt/flink/checkpoints,data-pvc:/opt/flink/data
+ *
+ * # Mount PVCs as read-only
+ * kubernetes.persistent-volume-claim-read-only: true
+ * }</pre>
+ *
+ * <h2>Validation</h2>
+ *
+ * <p>This decorator validates that:
+ *
+ * <ul>
+ *   <li>PVC names conform to DNS-1123 subdomain standard (lowercase 
alphanumeric, '-' and '.', max
+ *       253 chars)
+ *   <li>Mount paths are non-empty and absolute (start with '/')
+ *   <li>No duplicate mount paths are specified
+ *   <li>Generated volume names do not conflict with existing volumes
+ * </ul>
+ *
+ * <h2>Volume Name Generation</h2>
+ *
+ * <p>Volume names are generated from PVC names by:
+ *
+ * <ol>
+ *   <li>Replacing '.' with '-' (DNS-1123 label requirement)
+ *   <li>Adding '-pvc' suffix
+ *   <li>Truncating to 63 characters if necessary (with hash suffix for 
uniqueness)
+ * </ol>
+ *
+ * <h2>Important Notes</h2>
+ *
+ * <ul>
+ *   <li>The PVC must exist in the same namespace as the Flink cluster
+ *   <li>The PVC must have an appropriate access mode (ReadWriteOnce, 
ReadWriteMany, etc.)
+ *   <li>For HA setups with multiple JobManagers, use ReadWriteMany or 
ReadOnlyMany access modes
+ *   <li>If you need different read/write modes for different PVCs, use Pod 
Templates instead
+ * </ul>
+ *
+ * @see KubernetesConfigOptions#KUBERNETES_PERSISTENT_VOLUME_CLAIMS
+ * @see KubernetesConfigOptions#KUBERNETES_PERSISTENT_VOLUME_CLAIM_READ_ONLY

Review Comment:
   This part can be removed as this is only a `@Internal` class for dev.



##########
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java:
##########
@@ -586,6 +586,85 @@ public class KubernetesConfigOptions {
                             "The node label whose value is the same as the 
node name. "
                                     + "Currently, this will only be used to 
set the node affinity of TM pods to avoid being scheduled on blocked nodes.");
 
+    /**
+     * The user-specified PersistentVolumeClaims (PVCs) that will be mounted 
into Flink containers.
+     *
+     * <p>The value should be in the form of {@code pvc-name:/mount/path} 
separated by commas.
+     * Multiple PVCs can be specified by separating them with commas.
+     *
+     * <p>Example: {@code 
checkpoint-pvc:/opt/flink/checkpoints,data-pvc:/opt/flink/data}
+     *
+     * <p>Prerequisites:
+     *
+     * <ul>
+     *   <li>The PVCs must exist in the same namespace as the Flink cluster 
before deployment
+     *   <li>The PVCs must have appropriate access modes:
+     *       <ul>
+     *         <li>ReadWriteOnce (RWO): For single pod access
+     *         <li>ReadWriteMany (RWX): For multiple pods (recommended for HA 
setups)
+     *         <li>ReadOnlyMany (ROX): For read-only access from multiple pods
+     *       </ul>
+     * </ul>
+     *
+     * <p>Common use cases:
+     *
+     * <ul>
+     *   <li>Checkpoint storage: Mount a shared PVC for storing checkpoints
+     *   <li>Savepoint storage: Mount a PVC for savepoint data
+     *   <li>Shared data: Mount read-only PVCs containing reference data
+     *   <li>Job artifacts: Mount PVCs containing job JARs or dependencies
+     * </ul>
+     */
+    public static final ConfigOption<Map<String, String>> 
KUBERNETES_PERSISTENT_VOLUME_CLAIMS =
+            key("kubernetes.persistent-volume-claims")
+                    .mapType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The user-specified %s that will 
be mounted into Flink containers. "
+                                                    + "The value should be in 
the form of %s. "
+                                                    + "Multiple PVCs can be 
specified, for example: %s. "
+                                                    + "The PVCs must exist in 
the same namespace as the Flink cluster before deployment. "
+                                                    + "For HA setups with 
multiple JobManagers or TaskManagers accessing the same storage, "
+                                                    + "use PVCs with 
ReadWriteMany (RWX) or ReadOnlyMany (ROX) access modes.",
+                                            link(
+                                                    
"https://kubernetes.io/docs/concepts/storage/persistent-volumes/";,
+                                                    "PersistentVolumeClaims 
(PVCs)"),
+                                            code("pvc-name:/mount/path"),
+                                            code(
+                                                    
"checkpoint-pvc:/opt/flink/checkpoints,data-pvc:/opt/flink/data"))
+                                    .build());
+
+    /**
+     * Whether to mount PersistentVolumeClaims (PVCs) as read-only.
+     *
+     * <p>When set to true, all PVCs configured via {@link 
#KUBERNETES_PERSISTENT_VOLUME_CLAIMS}
+     * will be mounted as read-only. This is useful when the PVC contains 
shared data that should
+     * not be modified by Flink, such as reference datasets or pre-trained 
models.
+     *
+     * <p>Note: This setting applies globally to all PVCs configured via {@link
+     * #KUBERNETES_PERSISTENT_VOLUME_CLAIMS}. If you need different access 
modes for different PVCs,
+     * consider using pod templates instead.
+     *
+     * <p>Default: false (read-write mode)
+     */
+    public static final ConfigOption<Boolean> 
KUBERNETES_PERSISTENT_VOLUME_CLAIM_READ_ONLY =
+            key("kubernetes.persistent-volume-claim-read-only")
+                    .booleanType()
+                    .defaultValue(false)
+                    
.withDeprecatedKeys("kubernetes.persistent-volume-claims.read-only")

Review Comment:
   I don't think we should use `withDeprecatedKeys` before we want to deprecate 
the original option key. BTW, Flink currently does not want a 
key(`kubernetes.persistent-volume-claims`) to be a prefix of another 
key(`kubernetes.persistent-volume-claims.read-only`).



##########
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/PersistentVolumeClaimMountDecorator.java:
##########
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.decorators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkPod;
+import 
org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters;
+
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ContainerBuilder;
+import 
io.fabric8.kubernetes.api.model.PersistentVolumeClaimVolumeSourceBuilder;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.api.model.Volume;
+import io.fabric8.kubernetes.api.model.VolumeBuilder;
+import io.fabric8.kubernetes.api.model.VolumeMount;
+import io.fabric8.kubernetes.api.model.VolumeMountBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Decorator for mounting Kubernetes PersistentVolumeClaims (PVCs) to 
JobManager and TaskManager
+ * pods.
+ *
+ * <p>This decorator allows users to attach pre-existing PVCs to Flink pods, 
enabling persistent
+ * storage for checkpoints, savepoints, or other data that needs to survive 
pod restarts.
+ *
+ * <h2>Configuration</h2>
+ *
+ * <p>Users can configure PVC mounting through the following configuration 
options:
+ *
+ * <ul>
+ *   <li>{@link KubernetesConfigOptions#KUBERNETES_PERSISTENT_VOLUME_CLAIMS} - 
Specifies PVCs and
+ *       their mount paths in the format {@code 
pvc-name:/mount/path,pvc-name2:/mount/path2}
+ *   <li>{@link 
KubernetesConfigOptions#KUBERNETES_PERSISTENT_VOLUME_CLAIM_READ_ONLY} - When 
set to
+ *       true, mounts all PVCs as read-only (default: false)
+ * </ul>
+ *
+ * <h2>Usage Example</h2>
+ *
+ * <pre>{@code
+ * # Mount a single PVC for checkpoint storage
+ * kubernetes.persistent-volume-claims: checkpoint-pvc:/opt/flink/checkpoints
+ *
+ * # Mount multiple PVCs
+ * kubernetes.persistent-volume-claims: 
checkpoint-pvc:/opt/flink/checkpoints,data-pvc:/opt/flink/data
+ *
+ * # Mount PVCs as read-only
+ * kubernetes.persistent-volume-claim-read-only: true
+ * }</pre>
+ *
+ * <h2>Validation</h2>
+ *
+ * <p>This decorator validates that:
+ *
+ * <ul>
+ *   <li>PVC names conform to DNS-1123 subdomain standard (lowercase 
alphanumeric, '-' and '.', max
+ *       253 chars)
+ *   <li>Mount paths are non-empty and absolute (start with '/')
+ *   <li>No duplicate mount paths are specified
+ *   <li>Generated volume names do not conflict with existing volumes
+ * </ul>
+ *
+ * <h2>Volume Name Generation</h2>
+ *
+ * <p>Volume names are generated from PVC names by:
+ *
+ * <ol>
+ *   <li>Replacing '.' with '-' (DNS-1123 label requirement)
+ *   <li>Adding '-pvc' suffix
+ *   <li>Truncating to 63 characters if necessary (with hash suffix for 
uniqueness)
+ * </ol>
+ *
+ * <h2>Important Notes</h2>
+ *
+ * <ul>
+ *   <li>The PVC must exist in the same namespace as the Flink cluster
+ *   <li>The PVC must have an appropriate access mode (ReadWriteOnce, 
ReadWriteMany, etc.)
+ *   <li>For HA setups with multiple JobManagers, use ReadWriteMany or 
ReadOnlyMany access modes
+ *   <li>If you need different read/write modes for different PVCs, use Pod 
Templates instead
+ * </ul>
+ *
+ * @see KubernetesConfigOptions#KUBERNETES_PERSISTENT_VOLUME_CLAIMS
+ * @see KubernetesConfigOptions#KUBERNETES_PERSISTENT_VOLUME_CLAIM_READ_ONLY
+ */
+@Internal
+public class PersistentVolumeClaimMountDecorator extends 
AbstractKubernetesStepDecorator {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(PersistentVolumeClaimMountDecorator.class);
+
+    /** Suffix appended to sanitized PVC names to generate volume names. */
+    @VisibleForTesting static final String VOLUME_NAME_SUFFIX = "-pvc";

Review Comment:
   These static constants can be directly made public and do not require the 
annotation "VisibleForTesting".



##########
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/PersistentVolumeClaimMountDecorator.java:
##########
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.decorators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkPod;
+import 
org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters;
+
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ContainerBuilder;
+import 
io.fabric8.kubernetes.api.model.PersistentVolumeClaimVolumeSourceBuilder;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.api.model.Volume;
+import io.fabric8.kubernetes.api.model.VolumeBuilder;
+import io.fabric8.kubernetes.api.model.VolumeMount;
+import io.fabric8.kubernetes.api.model.VolumeMountBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Decorator for mounting Kubernetes PersistentVolumeClaims (PVCs) to 
JobManager and TaskManager
+ * pods.
+ *
+ * <p>This decorator allows users to attach pre-existing PVCs to Flink pods, 
enabling persistent
+ * storage for checkpoints, savepoints, or other data that needs to survive 
pod restarts.
+ *
+ * <h2>Configuration</h2>
+ *
+ * <p>Users can configure PVC mounting through the following configuration 
options:
+ *
+ * <ul>
+ *   <li>{@link KubernetesConfigOptions#KUBERNETES_PERSISTENT_VOLUME_CLAIMS} - 
Specifies PVCs and
+ *       their mount paths in the format {@code 
pvc-name:/mount/path,pvc-name2:/mount/path2}
+ *   <li>{@link 
KubernetesConfigOptions#KUBERNETES_PERSISTENT_VOLUME_CLAIM_READ_ONLY} - When 
set to
+ *       true, mounts all PVCs as read-only (default: false)
+ * </ul>
+ *
+ * <h2>Usage Example</h2>
+ *
+ * <pre>{@code
+ * # Mount a single PVC for checkpoint storage
+ * kubernetes.persistent-volume-claims: checkpoint-pvc:/opt/flink/checkpoints
+ *
+ * # Mount multiple PVCs
+ * kubernetes.persistent-volume-claims: 
checkpoint-pvc:/opt/flink/checkpoints,data-pvc:/opt/flink/data
+ *
+ * # Mount PVCs as read-only
+ * kubernetes.persistent-volume-claim-read-only: true
+ * }</pre>
+ *
+ * <h2>Validation</h2>
+ *
+ * <p>This decorator validates that:
+ *
+ * <ul>
+ *   <li>PVC names conform to DNS-1123 subdomain standard (lowercase 
alphanumeric, '-' and '.', max
+ *       253 chars)
+ *   <li>Mount paths are non-empty and absolute (start with '/')
+ *   <li>No duplicate mount paths are specified
+ *   <li>Generated volume names do not conflict with existing volumes
+ * </ul>
+ *
+ * <h2>Volume Name Generation</h2>
+ *
+ * <p>Volume names are generated from PVC names by:
+ *
+ * <ol>
+ *   <li>Replacing '.' with '-' (DNS-1123 label requirement)
+ *   <li>Adding '-pvc' suffix
+ *   <li>Truncating to 63 characters if necessary (with hash suffix for 
uniqueness)
+ * </ol>
+ *
+ * <h2>Important Notes</h2>
+ *
+ * <ul>
+ *   <li>The PVC must exist in the same namespace as the Flink cluster
+ *   <li>The PVC must have an appropriate access mode (ReadWriteOnce, 
ReadWriteMany, etc.)
+ *   <li>For HA setups with multiple JobManagers, use ReadWriteMany or 
ReadOnlyMany access modes
+ *   <li>If you need different read/write modes for different PVCs, use Pod 
Templates instead
+ * </ul>
+ *
+ * @see KubernetesConfigOptions#KUBERNETES_PERSISTENT_VOLUME_CLAIMS
+ * @see KubernetesConfigOptions#KUBERNETES_PERSISTENT_VOLUME_CLAIM_READ_ONLY
+ */
+@Internal
+public class PersistentVolumeClaimMountDecorator extends 
AbstractKubernetesStepDecorator {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(PersistentVolumeClaimMountDecorator.class);
+
+    /** Suffix appended to sanitized PVC names to generate volume names. */
+    @VisibleForTesting static final String VOLUME_NAME_SUFFIX = "-pvc";
+
+    /**
+     * Maximum length for Kubernetes volume names (DNS-1123 label standard). 
Must be 63 characters
+     * or less.
+     */
+    @VisibleForTesting static final int MAX_VOLUME_NAME_LENGTH = 63;
+
+    /**
+     * Maximum length for Kubernetes resource names (DNS-1123 subdomain 
standard). Must be 253
+     * characters or less.
+     */
+    @VisibleForTesting static final int MAX_PVC_NAME_LENGTH = 253;
+
+    /** Length reserved for hash suffix when truncating volume names. */
+    private static final int HASH_SUFFIX_LENGTH = 6; // "-" + 5 hex chars
+
+    /**
+     * Pattern for validating DNS-1123 subdomain names. Used for PVC names. 
Must consist of lower
+     * case alphanumeric characters, '-' or '.', and must start and end with 
an alphanumeric
+     * character.
+     */
+    @VisibleForTesting
+    static final Pattern DNS_1123_SUBDOMAIN_PATTERN =
+            Pattern.compile("^[a-z0-9]([-a-z0-9.]*[a-z0-9])?$");
+
+    /**
+     * Pattern for validating DNS-1123 label names. Used for volume names. 
Must consist of lower
+     * case alphanumeric characters or '-', and must start and end with an 
alphanumeric character.
+     */
+    @VisibleForTesting
+    static final Pattern DNS_1123_LABEL_PATTERN =
+            Pattern.compile("^[a-z0-9]([-a-z0-9]*[a-z0-9])?$");
+
+    private final Map<String, String> pvcNamesToMountPaths;
+    private final boolean readOnly;
+
+    /**
+     * Creates a new PersistentVolumeClaimMountDecorator.
+     *
+     * @param kubernetesComponentConf the Kubernetes parameters containing PVC 
configuration
+     * @throws IllegalArgumentException if the PVC configuration is invalid
+     */
+    public PersistentVolumeClaimMountDecorator(
+            AbstractKubernetesParameters kubernetesComponentConf) {
+        checkNotNull(kubernetesComponentConf, "kubernetesComponentConf must 
not be null");
+        this.pvcNamesToMountPaths = 
kubernetesComponentConf.getPersistentVolumeClaimsToMountPaths();
+        this.readOnly = 
kubernetesComponentConf.isPersistentVolumeClaimReadOnly();
+
+        // Validate PVC configuration
+        validatePvcConfiguration(pvcNamesToMountPaths);
+    }
+
+    /**
+     * Validates the PVC configuration.
+     *
+     * @param pvcNamesToMountPaths the map of PVC names to mount paths
+     * @throws IllegalArgumentException if validation fails
+     */
+    @VisibleForTesting
+    static void validatePvcConfiguration(Map<String, String> 
pvcNamesToMountPaths) {
+        if (pvcNamesToMountPaths.isEmpty()) {
+            return;
+        }
+
+        Set<String> mountPaths = new HashSet<>();
+        Set<String> volumeNames = new HashSet<>();
+
+        for (Map.Entry<String, String> entry : 
pvcNamesToMountPaths.entrySet()) {
+            String pvcName = entry.getKey();
+            String mountPath = entry.getValue();
+
+            // Validate PVC name (DNS-1123 subdomain)
+            validatePvcName(pvcName);
+
+            // Validate mount path
+            validateMountPath(mountPath, mountPaths);
+            mountPaths.add(mountPath);
+
+            // Check for volume name conflicts (after sanitization)
+            String volumeName = getVolumeName(pvcName);
+            checkArgument(
+                    !volumeNames.contains(volumeName),
+                    "Volume name conflict detected: PVC '%s' generates volume 
name '%s' which conflicts with another PVC. "
+                            + "Please use more distinct PVC names.",
+                    pvcName,
+                    volumeName);
+            volumeNames.add(volumeName);
+        }
+    }
+
+    /**
+     * Validates that a PVC name conforms to DNS-1123 subdomain standard.
+     *
+     * <p>DNS-1123 subdomain allows:
+     *
+     * <ul>
+     *   <li>Lowercase alphanumeric characters, '-' and '.'
+     *   <li>Must start and end with an alphanumeric character
+     *   <li>Maximum length of 253 characters
+     * </ul>
+     *
+     * @param pvcName the PVC name to validate
+     * @throws IllegalArgumentException if the PVC name is invalid
+     */
+    @VisibleForTesting
+    static void validatePvcName(String pvcName) {
+        checkArgument(
+                pvcName != null && !pvcName.trim().isEmpty(),
+                "PVC name must not be null or empty.");
+
+        checkArgument(
+                pvcName.length() <= MAX_PVC_NAME_LENGTH,
+                "PVC name '%s' exceeds maximum length of %d characters.",
+                pvcName,
+                MAX_PVC_NAME_LENGTH);
+
+        checkArgument(
+                DNS_1123_SUBDOMAIN_PATTERN.matcher(pvcName).matches(),
+                "PVC name '%s' is invalid. Must conform to DNS-1123 subdomain 
standard: "
+                        + "lowercase alphanumeric characters, '-' or '.', must 
start and end with alphanumeric.",
+                pvcName);
+    }
+
+    /**
+     * Validates the mount path.
+     *
+     * @param mountPath the mount path to validate
+     * @param existingPaths existing mount paths to check for duplicates
+     * @throws IllegalArgumentException if the mount path is invalid
+     */
+    @VisibleForTesting
+    static void validateMountPath(String mountPath, Set<String> existingPaths) 
{
+        checkArgument(
+                mountPath != null && !mountPath.trim().isEmpty(),
+                "Mount path must not be null or empty.");
+
+        checkArgument(
+                mountPath.startsWith("/"),
+                "Mount path '%s' must be an absolute path starting with '/'.",
+                mountPath);
+
+        checkArgument(
+                !existingPaths.contains(mountPath),
+                "Duplicate mount path detected: '%s'. Each PVC must have a 
unique mount path.",
+                mountPath);
+    }
+
+    @Override
+    public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
+        if (pvcNamesToMountPaths.isEmpty()) {
+            return flinkPod;
+        }
+
+        // Check for conflicts with existing volumes (fail-fast for volume 
name conflicts)
+        checkVolumeConflicts(flinkPod);
+
+        final Pod podWithVolumes = 
decoratePod(flinkPod.getPodWithoutMainContainer());
+        final Container containerWithMounts = 
decorateMainContainer(flinkPod.getMainContainer());
+
+        return new FlinkPod.Builder(flinkPod)
+                .withPod(podWithVolumes)
+                .withMainContainer(containerWithMounts)
+                .build();
+    }
+
+    /**
+     * Checks for conflicts between PVC volumes and existing Pod 
volumes/mounts.
+     *
+     * <p>For volume name conflicts, this method throws an exception 
(fail-fast) because Kubernetes
+     * does not allow duplicate volume names in a Pod spec. For mount path 
conflicts, this method
+     * logs a warning since Kubernetes allows multiple volume mounts, though 
the behavior may be
+     * unexpected.
+     *
+     * @param flinkPod the FlinkPod to check
+     * @throws IllegalArgumentException if a volume name conflict is detected
+     */
+    private void checkVolumeConflicts(FlinkPod flinkPod) {
+        // Get existing volume names from pod spec
+        Set<String> existingVolumeNames = new HashSet<>();
+        if (flinkPod.getPodWithoutMainContainer().getSpec() != null
+                && 
flinkPod.getPodWithoutMainContainer().getSpec().getVolumes() != null) {
+            
flinkPod.getPodWithoutMainContainer().getSpec().getVolumes().stream()
+                    .map(Volume::getName)
+                    .forEach(existingVolumeNames::add);
+        }
+
+        // Get existing mount paths from container
+        Set<String> existingMountPaths = new HashSet<>();
+        if (flinkPod.getMainContainer().getVolumeMounts() != null) {
+            flinkPod.getMainContainer().getVolumeMounts().stream()
+                    .map(VolumeMount::getMountPath)
+                    .forEach(existingMountPaths::add);
+        }
+
+        // Check for conflicts
+        for (Map.Entry<String, String> entry : 
pvcNamesToMountPaths.entrySet()) {
+            String volumeName = getVolumeName(entry.getKey());
+            String mountPath = entry.getValue();
+
+            // Volume name conflict: fail-fast (K8s does not allow duplicate 
volume names)
+            checkArgument(
+                    !existingVolumeNames.contains(volumeName),
+                    "Volume name conflict: PVC '%s' generates volume name '%s' 
which already exists in the Pod spec. "
+                            + "This conflict may be caused by Pod Template 
configuration. "
+                            + "Please use a different PVC name or remove the 
conflicting volume from Pod Template.",
+                    entry.getKey(),
+                    volumeName);
+
+            // Mount path conflict: warn only (K8s allows but behavior may be 
unexpected)
+            if (existingMountPaths.contains(mountPath)) {
+                LOG.warn(
+                        "Mount path '{}' for PVC '{}' conflicts with an 
existing mount in the Pod spec. "
+                                + "The PVC mount will be added alongside the 
existing mount. "
+                                + "Consider using unique mount paths to avoid 
unexpected behavior.",
+                        mountPath,
+                        entry.getKey());
+            }
+        }
+    }
+
+    /**
+     * Decorates the main container with volume mounts for all configured PVCs.
+     *
+     * @param container the original main container
+     * @return a new container with PVC volume mounts added
+     */
+    private Container decorateMainContainer(Container container) {
+        final List<VolumeMount> volumeMounts = buildVolumeMounts();
+        return new 
ContainerBuilder(container).addAllToVolumeMounts(volumeMounts).build();
+    }
+
+    /**
+     * Decorates the pod specification with volumes for all configured PVCs.
+     *
+     * @param pod the original pod without main container
+     * @return a new pod with PVC volumes added
+     */
+    private Pod decoratePod(Pod pod) {
+        final List<Volume> volumes = buildVolumes();
+        return new 
PodBuilder(pod).editOrNewSpec().addAllToVolumes(volumes).endSpec().build();
+    }
+
+    /**
+     * Builds the list of volume mounts for all configured PVCs.
+     *
+     * @return list of volume mounts
+     */
+    @VisibleForTesting
+    List<VolumeMount> buildVolumeMounts() {
+        return pvcNamesToMountPaths.entrySet().stream()
+                .map(this::buildVolumeMount)
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * Builds a single volume mount for a PVC.
+     *
+     * @param pvcNameToMountPath entry containing PVC name and mount path
+     * @return the volume mount
+     */
+    private VolumeMount buildVolumeMount(Map.Entry<String, String> 
pvcNameToMountPath) {
+        return new VolumeMountBuilder()
+                .withName(getVolumeName(pvcNameToMountPath.getKey()))
+                .withMountPath(pvcNameToMountPath.getValue())
+                .withReadOnly(readOnly)
+                .build();
+    }
+
+    /**
+     * Builds the list of volumes for all configured PVCs.
+     *
+     * @return list of volumes
+     */
+    @VisibleForTesting
+    List<Volume> buildVolumes() {
+        return pvcNamesToMountPaths.keySet().stream()
+                .map(this::buildVolume)
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * Builds a single volume for a PVC.
+     *
+     * @param pvcName the name of the PVC
+     * @return the volume
+     */
+    private Volume buildVolume(String pvcName) {
+        return new VolumeBuilder()
+                .withName(getVolumeName(pvcName))
+                .withPersistentVolumeClaim(
+                        new PersistentVolumeClaimVolumeSourceBuilder()
+                                .withClaimName(pvcName)
+                                .withReadOnly(readOnly)
+                                .build())
+                .build();
+    }
+
+    /**
+     * Generates a valid Kubernetes volume name from the PVC name.
+     *
+     * <p>Volume names must conform to DNS-1123 label standard:
+     *
+     * <ul>
+     *   <li>Lowercase alphanumeric characters or '-'
+     *   <li>Must start and end with an alphanumeric character
+     *   <li>Maximum 63 characters
+     * </ul>
+     *
+     * <p>The conversion process:
+     *
+     * <ol>
+     *   <li>Replace '.' with '-' (dots are allowed in PVC names but not in 
volume names)
+     *   <li>Append '-pvc' suffix
+     *   <li>If length exceeds 63, truncate and append hash for uniqueness
+     * </ol>
+     *
+     * @param pvcName the name of the PVC
+     * @return a valid volume name
+     */
+    @VisibleForTesting
+    static String getVolumeName(String pvcName) {
+        // Sanitize: replace '.' with '-' to conform to DNS-1123 label
+        String sanitized = pvcName.replace('.', '-');
+
+        // Add suffix
+        String volumeName = sanitized + VOLUME_NAME_SUFFIX;
+
+        // Truncate if exceeds max length
+        if (volumeName.length() > MAX_VOLUME_NAME_LENGTH) {
+            volumeName = truncateWithHash(sanitized, pvcName);
+        }
+
+        // Final validation: ensure it doesn't end with '-'
+        while (volumeName.endsWith("-")) {
+            volumeName = volumeName.substring(0, volumeName.length() - 1);
+        }
+
+        return volumeName;
+    }
+
+    /**
+     * Truncates the volume name and appends a hash suffix for uniqueness.
+     *
+     * <p>Format: {truncated-name}-{hash}-pvc
+     *
+     * @param sanitized the sanitized PVC name (with '.' replaced by '-')
+     * @param original the original PVC name (used for hash calculation)
+     * @return truncated volume name with hash suffix
+     */
+    private static String truncateWithHash(String sanitized, String original) {

Review Comment:
   I would like to know if truncating like this is a standard practice, and are 
there any similar projects that do it this way? Or, for names that exceed the 
specified length, simply rise an error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to