This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 98d041d9 [FLINK-38548] Rename only the name of child deployment for
blue/green deployments
98d041d9 is described below
commit 98d041d90dfbf06573bce407cc0fc492d828c6e3
Author: Arda Kuyumcu <[email protected]>
AuthorDate: Mon Nov 3 23:58:12 2025 -0800
[FLINK-38548] Rename only the name of child deployment for blue/green
deployments
---
.../operator/utils/bluegreen/BlueGreenUtils.java | 44 ++------
.../utils/bluegreen/BlueGreenUtilsTest.java | 118 +++++++++++++++++++++
2 files changed, 127 insertions(+), 35 deletions(-)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java
index cd2d9ae2..98344a9b 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java
@@ -59,27 +59,6 @@ public class BlueGreenUtils {
// ==================== Spec Operations ====================
- /**
- * Adjusts name references in a spec by replacing deployment names with
child deployment names.
- *
- * @param spec the spec to adjust
- * @param deploymentName the original deployment name
- * @param childDeploymentName the child deployment name to replace with
- * @param wrapperKey the JSON wrapper key
- * @param valueType the spec type
- * @return adjusted spec with name references updated
- */
- public static <T> T adjustNameReferences(
- T spec,
- String deploymentName,
- String childDeploymentName,
- String wrapperKey,
- Class<T> valueType) {
- String serializedSpec = SpecUtils.writeSpecAsJSON(spec, wrapperKey);
- String replacedSerializedSpec = serializedSpec.replace(deploymentName,
childDeploymentName);
- return SpecUtils.readSpecFromJSON(replacedSerializedSpec, wrapperKey,
valueType);
- }
-
/**
* Checks if the Blue/Green deployment spec has changed compared to the
last reconciled spec.
*
@@ -341,40 +320,35 @@ public class BlueGreenUtils {
ObjectMeta bgMeta) {
// Deployment
FlinkDeployment flinkDeployment = new FlinkDeployment();
- FlinkBlueGreenDeploymentSpec spec =
context.getBgDeployment().getSpec();
+ FlinkBlueGreenDeploymentSpec originalSpec =
context.getBgDeployment().getSpec();
String childDeploymentName =
bgMeta.getName() + "-" +
blueGreenDeploymentType.toString().toLowerCase();
- FlinkBlueGreenDeploymentSpec adjustedSpec =
- adjustNameReferences(
- spec,
- bgMeta.getName(),
- childDeploymentName,
+ // Create a deep copy of the spec to avoid mutating the original
+ FlinkBlueGreenDeploymentSpec spec =
+ SpecUtils.readSpecFromJSON(
+ SpecUtils.writeSpecAsJSON(originalSpec, "spec"),
"spec",
FlinkBlueGreenDeploymentSpec.class);
// The Blue/Green initialSavepointPath is only used for first-time
deployments
if (isFirstDeployment) {
String initialSavepointPath =
-
adjustedSpec.getTemplate().getSpec().getJob().getInitialSavepointPath();
+
spec.getTemplate().getSpec().getJob().getInitialSavepointPath();
if (initialSavepointPath != null &&
!initialSavepointPath.isEmpty()) {
LOG.info("Using initialSavepointPath: " +
initialSavepointPath);
- adjustedSpec
- .getTemplate()
- .getSpec()
- .getJob()
- .setInitialSavepointPath(initialSavepointPath);
+
spec.getTemplate().getSpec().getJob().setInitialSavepointPath(initialSavepointPath);
} else {
LOG.info("Clean startup with no checkpoint/savepoint
restoration");
}
} else if (lastCheckpoint != null) {
String location = lastCheckpoint.getLocation().replace("file:",
"");
LOG.info("Using Blue/Green savepoint/checkpoint: " + location);
-
adjustedSpec.getTemplate().getSpec().getJob().setInitialSavepointPath(location);
+
spec.getTemplate().getSpec().getJob().setInitialSavepointPath(location);
}
- flinkDeployment.setSpec(adjustedSpec.getTemplate().getSpec());
+ flinkDeployment.setSpec(spec.getTemplate().getSpec());
// Deployment metadata
ObjectMeta flinkDeploymentMeta =
getDependentObjectMeta(context.getBgDeployment());
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtilsTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtilsTest.java
new file mode 100644
index 00000000..859d19f5
--- /dev/null
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtilsTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils.bluegreen;
+
+import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import
org.apache.flink.kubernetes.operator.api.bluegreen.BlueGreenDeploymentType;
+import org.apache.flink.kubernetes.operator.api.spec.ConfigObjectNode;
+import
org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentSpec;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
+import
org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentTemplateSpec;
+import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
+import
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentStatus;
+import org.apache.flink.kubernetes.operator.api.utils.SpecUtils;
+import
org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenContext;
+
+import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Tests for {@link BlueGreenUtils}. */
+public class BlueGreenUtilsTest {
+
+ private static final String TEST_NAMESPACE = "test-namespace";
+
+ @Test
+ public void testPrepareFlinkDeploymentWithoutNameReplacement() {
+ String parentDeploymentName = "my-app";
+ FlinkBlueGreenDeployment bgDeployment =
+ buildBlueGreenDeployment(parentDeploymentName, TEST_NAMESPACE);
+
+ // Add configuration that contains the deployment name in values
+ Map<String, String> flinkConfig =
+
bgDeployment.getSpec().getTemplate().getSpec().getFlinkConfiguration().asFlatMap();
+ flinkConfig.put(
+ "high-availability.storageDir",
+ "s3://" + parentDeploymentName + "/highavailability");
+ flinkConfig.put("metrics.scope.jm", parentDeploymentName + ".jm");
+
bgDeployment.getSpec().getTemplate().getSpec().setFlinkConfiguration(flinkConfig);
+
+ BlueGreenContext context = createContext(bgDeployment);
+
+ // Test: Prepare a BLUE deployment
+ FlinkDeployment blueDeployment =
+ BlueGreenUtils.prepareFlinkDeployment(
+ context,
+ BlueGreenDeploymentType.BLUE,
+ null,
+ true,
+ bgDeployment.getMetadata());
+
+ // Verify child deployment name is correctly set in metadata
+ String expectedChildName = parentDeploymentName + "-blue";
+ assertEquals(expectedChildName,
blueDeployment.getMetadata().getName());
+
+ // Verify configuration values that contain the parent name are NOT
replaced
+ Map<String, String> resultFlinkConfig =
+ blueDeployment.getSpec().getFlinkConfiguration().asFlatMap();
+ assertEquals(
+ "s3://" + parentDeploymentName + "/highavailability",
+ resultFlinkConfig.get("high-availability.storageDir"));
+ assertEquals(parentDeploymentName + ".jm",
resultFlinkConfig.get("metrics.scope.jm"));
+ }
+
+ private static FlinkBlueGreenDeployment buildBlueGreenDeployment(
+ String name, String namespace) {
+ var deployment = new FlinkBlueGreenDeployment();
+ deployment.setMetadata(
+ new ObjectMetaBuilder()
+ .withName(name)
+ .withNamespace(namespace)
+ .withUid(UUID.randomUUID().toString())
+ .build());
+
+ var flinkDeploymentSpec =
+ FlinkDeploymentSpec.builder()
+ .flinkConfiguration(new ConfigObjectNode())
+
.job(JobSpec.builder().upgradeMode(UpgradeMode.STATELESS).build())
+ .build();
+
+ var bgDeploymentSpec =
+ new FlinkBlueGreenDeploymentSpec(
+ new HashMap<>(),
+
FlinkDeploymentTemplateSpec.builder().spec(flinkDeploymentSpec).build());
+
+ deployment.setSpec(bgDeploymentSpec);
+ return deployment;
+ }
+
+ private BlueGreenContext createContext(FlinkBlueGreenDeployment
bgDeployment) {
+ FlinkBlueGreenDeploymentStatus status = new
FlinkBlueGreenDeploymentStatus();
+
status.setLastReconciledSpec(SpecUtils.writeSpecAsJSON(bgDeployment.getSpec(),
"spec"));
+ bgDeployment.setStatus(status);
+
+ return new BlueGreenContext(bgDeployment, status, null, null, null);
+ }
+}