This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 98d041d9 [FLINK-38548] Rename only the name of child deployment for 
blue/green deployments
98d041d9 is described below

commit 98d041d90dfbf06573bce407cc0fc492d828c6e3
Author: Arda Kuyumcu <[email protected]>
AuthorDate: Mon Nov 3 23:58:12 2025 -0800

    [FLINK-38548] Rename only the name of child deployment for blue/green 
deployments
---
 .../operator/utils/bluegreen/BlueGreenUtils.java   |  44 ++------
 .../utils/bluegreen/BlueGreenUtilsTest.java        | 118 +++++++++++++++++++++
 2 files changed, 127 insertions(+), 35 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java
index cd2d9ae2..98344a9b 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java
@@ -59,27 +59,6 @@ public class BlueGreenUtils {
 
     // ==================== Spec Operations ====================
 
-    /**
-     * Adjusts name references in a spec by replacing deployment names with 
child deployment names.
-     *
-     * @param spec the spec to adjust
-     * @param deploymentName the original deployment name
-     * @param childDeploymentName the child deployment name to replace with
-     * @param wrapperKey the JSON wrapper key
-     * @param valueType the spec type
-     * @return adjusted spec with name references updated
-     */
-    public static <T> T adjustNameReferences(
-            T spec,
-            String deploymentName,
-            String childDeploymentName,
-            String wrapperKey,
-            Class<T> valueType) {
-        String serializedSpec = SpecUtils.writeSpecAsJSON(spec, wrapperKey);
-        String replacedSerializedSpec = serializedSpec.replace(deploymentName, 
childDeploymentName);
-        return SpecUtils.readSpecFromJSON(replacedSerializedSpec, wrapperKey, 
valueType);
-    }
-
     /**
      * Checks if the Blue/Green deployment spec has changed compared to the 
last reconciled spec.
      *
@@ -341,40 +320,35 @@ public class BlueGreenUtils {
             ObjectMeta bgMeta) {
         // Deployment
         FlinkDeployment flinkDeployment = new FlinkDeployment();
-        FlinkBlueGreenDeploymentSpec spec = 
context.getBgDeployment().getSpec();
+        FlinkBlueGreenDeploymentSpec originalSpec = 
context.getBgDeployment().getSpec();
 
         String childDeploymentName =
                 bgMeta.getName() + "-" + 
blueGreenDeploymentType.toString().toLowerCase();
 
-        FlinkBlueGreenDeploymentSpec adjustedSpec =
-                adjustNameReferences(
-                        spec,
-                        bgMeta.getName(),
-                        childDeploymentName,
+        // Create a deep copy of the spec to avoid mutating the original
+        FlinkBlueGreenDeploymentSpec spec =
+                SpecUtils.readSpecFromJSON(
+                        SpecUtils.writeSpecAsJSON(originalSpec, "spec"),
                         "spec",
                         FlinkBlueGreenDeploymentSpec.class);
 
         // The Blue/Green initialSavepointPath is only used for first-time 
deployments
         if (isFirstDeployment) {
             String initialSavepointPath =
-                    
adjustedSpec.getTemplate().getSpec().getJob().getInitialSavepointPath();
+                    
spec.getTemplate().getSpec().getJob().getInitialSavepointPath();
             if (initialSavepointPath != null && 
!initialSavepointPath.isEmpty()) {
                 LOG.info("Using initialSavepointPath: " + 
initialSavepointPath);
-                adjustedSpec
-                        .getTemplate()
-                        .getSpec()
-                        .getJob()
-                        .setInitialSavepointPath(initialSavepointPath);
+                
spec.getTemplate().getSpec().getJob().setInitialSavepointPath(initialSavepointPath);
             } else {
                 LOG.info("Clean startup with no checkpoint/savepoint 
restoration");
             }
         } else if (lastCheckpoint != null) {
             String location = lastCheckpoint.getLocation().replace("file:", 
"");
             LOG.info("Using Blue/Green savepoint/checkpoint: " + location);
-            
adjustedSpec.getTemplate().getSpec().getJob().setInitialSavepointPath(location);
+            
spec.getTemplate().getSpec().getJob().setInitialSavepointPath(location);
         }
 
-        flinkDeployment.setSpec(adjustedSpec.getTemplate().getSpec());
+        flinkDeployment.setSpec(spec.getTemplate().getSpec());
 
         // Deployment metadata
         ObjectMeta flinkDeploymentMeta = 
getDependentObjectMeta(context.getBgDeployment());
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtilsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtilsTest.java
new file mode 100644
index 00000000..859d19f5
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtilsTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils.bluegreen;
+
+import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import 
org.apache.flink.kubernetes.operator.api.bluegreen.BlueGreenDeploymentType;
+import org.apache.flink.kubernetes.operator.api.spec.ConfigObjectNode;
+import 
org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentSpec;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
+import 
org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentTemplateSpec;
+import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
+import 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentStatus;
+import org.apache.flink.kubernetes.operator.api.utils.SpecUtils;
+import 
org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenContext;
+
+import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Tests for {@link BlueGreenUtils}. */
+public class BlueGreenUtilsTest {
+
+    private static final String TEST_NAMESPACE = "test-namespace";
+
+    @Test
+    public void testPrepareFlinkDeploymentWithoutNameReplacement() {
+        String parentDeploymentName = "my-app";
+        FlinkBlueGreenDeployment bgDeployment =
+                buildBlueGreenDeployment(parentDeploymentName, TEST_NAMESPACE);
+
+        // Add configuration that contains the deployment name in values
+        Map<String, String> flinkConfig =
+                
bgDeployment.getSpec().getTemplate().getSpec().getFlinkConfiguration().asFlatMap();
+        flinkConfig.put(
+                "high-availability.storageDir",
+                "s3://" + parentDeploymentName + "/highavailability");
+        flinkConfig.put("metrics.scope.jm", parentDeploymentName + ".jm");
+        
bgDeployment.getSpec().getTemplate().getSpec().setFlinkConfiguration(flinkConfig);
+
+        BlueGreenContext context = createContext(bgDeployment);
+
+        // Test: Prepare a BLUE deployment
+        FlinkDeployment blueDeployment =
+                BlueGreenUtils.prepareFlinkDeployment(
+                        context,
+                        BlueGreenDeploymentType.BLUE,
+                        null,
+                        true,
+                        bgDeployment.getMetadata());
+
+        // Verify child deployment name is correctly set in metadata
+        String expectedChildName = parentDeploymentName + "-blue";
+        assertEquals(expectedChildName, 
blueDeployment.getMetadata().getName());
+
+        // Verify configuration values that contain the parent name are NOT 
replaced
+        Map<String, String> resultFlinkConfig =
+                blueDeployment.getSpec().getFlinkConfiguration().asFlatMap();
+        assertEquals(
+                "s3://" + parentDeploymentName + "/highavailability",
+                resultFlinkConfig.get("high-availability.storageDir"));
+        assertEquals(parentDeploymentName + ".jm", 
resultFlinkConfig.get("metrics.scope.jm"));
+    }
+
+    private static FlinkBlueGreenDeployment buildBlueGreenDeployment(
+            String name, String namespace) {
+        var deployment = new FlinkBlueGreenDeployment();
+        deployment.setMetadata(
+                new ObjectMetaBuilder()
+                        .withName(name)
+                        .withNamespace(namespace)
+                        .withUid(UUID.randomUUID().toString())
+                        .build());
+
+        var flinkDeploymentSpec =
+                FlinkDeploymentSpec.builder()
+                        .flinkConfiguration(new ConfigObjectNode())
+                        
.job(JobSpec.builder().upgradeMode(UpgradeMode.STATELESS).build())
+                        .build();
+
+        var bgDeploymentSpec =
+                new FlinkBlueGreenDeploymentSpec(
+                        new HashMap<>(),
+                        
FlinkDeploymentTemplateSpec.builder().spec(flinkDeploymentSpec).build());
+
+        deployment.setSpec(bgDeploymentSpec);
+        return deployment;
+    }
+
+    private BlueGreenContext createContext(FlinkBlueGreenDeployment 
bgDeployment) {
+        FlinkBlueGreenDeploymentStatus status = new 
FlinkBlueGreenDeploymentStatus();
+        
status.setLastReconciledSpec(SpecUtils.writeSpecAsJSON(bgDeployment.getSpec(), 
"spec"));
+        bgDeployment.setStatus(status);
+
+        return new BlueGreenContext(bgDeployment, status, null, null, null);
+    }
+}

Reply via email to