This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 70bb74c0 [FLINK-28763] Make savepoint format configurable for 
upgrades/savepoint operations
70bb74c0 is described below

commit 70bb74c06280985065f0ce56236186a146d4436f
Author: Nicholas Jiang <programg...@163.com>
AuthorDate: Tue Aug 9 03:36:08 2022 -0700

    [FLINK-28763] Make savepoint format configurable for upgrades/savepoint 
operations
---
 .../shortcodes/generated/dynamic_section.html      |  6 ++
 .../kubernetes_operator_config_configuration.html  |  6 ++
 .../config/KubernetesOperatorConfigOptions.java    |  9 +++
 .../operator/service/AbstractFlinkService.java     | 14 ++--
 .../kubernetes/operator/TestingClusterClient.java  | 12 +++-
 .../operator/service/NativeFlinkServiceTest.java   | 75 ++++++++++++++++++++++
 6 files changed, 117 insertions(+), 5 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/dynamic_section.html 
b/docs/layouts/shortcodes/generated/dynamic_section.html
index 8d8fb620..4cfac307 100644
--- a/docs/layouts/shortcodes/generated/dynamic_section.html
+++ b/docs/layouts/shortcodes/generated/dynamic_section.html
@@ -44,6 +44,12 @@
             <td>Duration</td>
             <td>Interval at which periodic savepoints will be triggered. The 
triggering schedule is not guaranteed, savepoints will be triggered as part of 
the regular reconcile loop.</td>
         </tr>
+        <tr>
+            <td><h5>kubernetes.operator.savepoint.format.type</h5></td>
+            <td style="word-wrap: break-word;">CANONICAL</td>
+            <td><p>Enum</p></td>
+            <td>Type of the binary format in which a savepoint should be 
taken.<br /><br />Possible values:<ul><li>"CANONICAL": A canonical, common for 
all state backends format. It lets you switch state backends.</li><li>"NATIVE": 
A format specific for the chosen state backend, in its native binary format. 
Might be faster to take and restore from than the canonical one.</li></ul></td>
+        </tr>
         <tr>
             <td><h5>kubernetes.operator.savepoint.history.max.age</h5></td>
             <td style="word-wrap: break-word;">86400000 ms</td>
diff --git 
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
 
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index f950054e..aaf1e936 100644
--- 
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++ 
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -140,6 +140,12 @@
             <td>Integer</td>
             <td>Max attempts of automatic reconcile retries on recoverable 
errors.</td>
         </tr>
+        <tr>
+            <td><h5>kubernetes.operator.savepoint.format.type</h5></td>
+            <td style="word-wrap: break-word;">CANONICAL</td>
+            <td><p>Enum</p></td>
+            <td>Type of the binary format in which a savepoint should be 
taken.<br /><br />Possible values:<ul><li>"CANONICAL": A canonical, common for 
all state backends format. It lets you switch state backends.</li><li>"NATIVE": 
A format specific for the chosen state backend, in its native binary format. 
Might be faster to take and restore from than the canonical one.</li></ul></td>
+        </tr>
         <tr>
             <td><h5>kubernetes.operator.savepoint.history.max.age</h5></td>
             <td style="word-wrap: break-word;">86400000 ms</td>
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
index 774882ac..11f84b7c 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
@@ -21,6 +21,7 @@ package org.apache.flink.kubernetes.operator.config;
 import org.apache.flink.annotation.docs.Documentation;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.core.execution.SavepointFormatType;
 
 import io.javaoperatorsdk.operator.api.config.ConfigurationService;
 import io.javaoperatorsdk.operator.api.reconciler.Constants;
@@ -280,4 +281,12 @@ public class KubernetesOperatorConfigOptions {
                     .defaultValue(true)
                     .withDescription(
                             "Enables last-state fallback for savepoint upgrade 
mode. When the job is not running thus savepoint cannot be triggered but HA 
metadata is available for last state restore the operator can initiate the 
upgrade process when the flag is enabled.");
+
+    @Documentation.Section(SECTION_DYNAMIC)
+    public static final ConfigOption<SavepointFormatType> 
OPERATOR_SAVEPOINT_FORMAT_TYPE =
+            ConfigOptions.key("kubernetes.operator.savepoint.format.type")
+                    .enumType(SavepointFormatType.class)
+                    .defaultValue(SavepointFormatType.DEFAULT)
+                    .withDescription(
+                            "Type of the binary format in which a savepoint 
should be taken.");
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index 1a601446..bd316290 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -25,11 +25,11 @@ import 
org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import 
org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
 import org.apache.flink.kubernetes.operator.artifact.ArtifactManager;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec;
@@ -291,7 +291,9 @@ public abstract class AbstractFlinkService implements 
FlinkService {
                                                     conf.get(FLINK_VERSION)
                                                                     
.isNewerVersionThan(
                                                                             
FlinkVersion.v1_14)
-                                                            ? 
SavepointFormatType.DEFAULT
+                                                            ? conf.get(
+                                                                    
KubernetesOperatorConfigOptions
+                                                                            
.OPERATOR_SAVEPOINT_FORMAT_TYPE)
                                                             : null)
                                             .get(timeout, TimeUnit.SECONDS);
                             savepointOpt = Optional.of(savepoint);
@@ -384,7 +386,9 @@ public abstract class AbstractFlinkService implements 
FlinkService {
                                                 conf.get(FLINK_VERSION)
                                                                 
.isNewerVersionThan(
                                                                         
FlinkVersion.v1_14)
-                                                        ? 
SavepointFormatType.DEFAULT
+                                                        ? conf.get(
+                                                                
KubernetesOperatorConfigOptions
+                                                                        
.OPERATOR_SAVEPOINT_FORMAT_TYPE)
                                                         : null)
                                         .get(timeout, TimeUnit.SECONDS);
                         savepointOpt = Optional.of(savepoint);
@@ -442,7 +446,9 @@ public abstract class AbstractFlinkService implements 
FlinkService {
                                             false,
                                             conf.get(FLINK_VERSION)
                                                             
.isNewerVersionThan(FlinkVersion.v1_14)
-                                                    ? 
SavepointFormatType.DEFAULT
+                                                    ? conf.get(
+                                                            
KubernetesOperatorConfigOptions
+                                                                    
.OPERATOR_SAVEPOINT_FORMAT_TYPE)
                                                     : null,
                                             null))
                             .get(timeout, TimeUnit.SECONDS);
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java
index d96427e4..413a6943 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java
@@ -57,6 +57,8 @@ public class TestingClusterClient<T> extends 
RestClusterClient<T> {
             stopWithSavepointFunction =
                     (ignore1, ignore2, savepointPath) ->
                             CompletableFuture.completedFuture(savepointPath);
+    private TriFunction<JobID, SavepointFormatType, String, 
CompletableFuture<String>>
+            stopWithSavepointFormat;
     private TriFunction<
                     MessageHeaders<?, ?, ?>,
                     MessageParameters,
@@ -97,6 +99,12 @@ public class TestingClusterClient<T> extends 
RestClusterClient<T> {
         this.stopWithSavepointFunction = stopWithSavepointFunction;
     }
 
+    public void setStopWithSavepointFormat(
+            TriFunction<JobID, SavepointFormatType, String, 
CompletableFuture<String>>
+                    stopWithSavepointFormat) {
+        this.stopWithSavepointFormat = stopWithSavepointFormat;
+    }
+
     public void setRequestProcessor(
             TriFunction<
                             MessageHeaders<?, ?, ?>,
@@ -184,7 +192,9 @@ public class TestingClusterClient<T> extends 
RestClusterClient<T> {
             boolean advanceToEndOfTime,
             @Nullable String savepointDirectory,
             SavepointFormatType formatType) {
-        return stopWithSavepointFunction.apply(jobId, advanceToEndOfTime, 
savepointDirectory);
+        return stopWithSavepointFormat == null
+                ? stopWithSavepointFunction.apply(jobId, advanceToEndOfTime, 
savepointDirectory)
+                : stopWithSavepointFormat.apply(jobId, formatType, 
savepointDirectory);
     }
 
     @Override
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
index 8e151f04..c2d13029 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
@@ -20,13 +20,16 @@ package org.apache.flink.kubernetes.operator.service;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingClusterClient;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
@@ -337,6 +340,78 @@ public class NativeFlinkServiceTest {
                 AbstractFlinkService.getEffectiveStatus(allFinished));
     }
 
+    @Test
+    public void testNativeSavepointFormat() throws Exception {
+        final TestingClusterClient<String> testingClusterClient =
+                new TestingClusterClient<>(configuration, 
TestUtils.TEST_DEPLOYMENT_NAME);
+        final String savepointPath = "file:///path/of/svp";
+        final CompletableFuture<Tuple4<JobID, String, Boolean, 
SavepointFormatType>>
+                triggerSavepointFuture = new CompletableFuture<>();
+        configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, 
savepointPath);
+        testingClusterClient.setRequestProcessor(
+                (headers, parameters, requestBody) -> {
+                    triggerSavepointFuture.complete(
+                            new Tuple4<>(
+                                    ((SavepointTriggerMessageParameters) 
parameters)
+                                            .jobID.getValue(),
+                                    ((SavepointTriggerRequestBody) requestBody)
+                                            .getTargetDirectory()
+                                            .get(),
+                                    ((SavepointTriggerRequestBody) 
requestBody).isCancelJob(),
+                                    ((SavepointTriggerRequestBody) 
requestBody).getFormatType()));
+                    return CompletableFuture.completedFuture(new 
TriggerResponse(new TriggerId()));
+                });
+        final CompletableFuture<Tuple3<JobID, SavepointFormatType, String>>
+                stopWithSavepointFuture = new CompletableFuture<>();
+        testingClusterClient.setStopWithSavepointFormat(
+                (id, formatType, savepointDir) -> {
+                    stopWithSavepointFuture.complete(new Tuple3<>(id, 
formatType, savepointDir));
+                    return CompletableFuture.completedFuture(savepointPath);
+                });
+
+        final FlinkService flinkService = 
createFlinkService(testingClusterClient);
+
+        final JobID jobID = JobID.generate();
+        final FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+        deployment
+                .getSpec()
+                .getFlinkConfiguration()
+                .put(CheckpointingOptions.SAVEPOINT_DIRECTORY.key(), 
savepointPath);
+        
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+        JobStatus jobStatus = deployment.getStatus().getJobStatus();
+        jobStatus.setJobId(jobID.toHexString());
+        
jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING.name());
+        ReconciliationUtils.updateStatusForDeployedSpec(deployment, new 
Configuration());
+
+        jobStatus.setJobId(jobID.toString());
+        deployment.getStatus().setJobStatus(jobStatus);
+        flinkService.triggerSavepoint(
+                deployment.getStatus().getJobStatus().getJobId(),
+                SavepointTriggerType.MANUAL,
+                deployment.getStatus().getJobStatus().getSavepointInfo(),
+                new Configuration(configuration)
+                        .set(
+                                
KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE,
+                                SavepointFormatType.NATIVE));
+        assertTrue(triggerSavepointFuture.isDone());
+        assertEquals(jobID, triggerSavepointFuture.get().f0);
+        assertEquals(savepointPath, triggerSavepointFuture.get().f1);
+        assertFalse(triggerSavepointFuture.get().f2);
+        assertEquals(SavepointFormatType.NATIVE, 
triggerSavepointFuture.get().f3);
+
+        flinkService.cancelJob(
+                deployment,
+                UpgradeMode.SAVEPOINT,
+                new Configuration(configManager.getObserveConfig(deployment))
+                        .set(
+                                
KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE,
+                                SavepointFormatType.NATIVE));
+        assertTrue(stopWithSavepointFuture.isDone());
+        assertEquals(jobID, stopWithSavepointFuture.get().f0);
+        assertEquals(SavepointFormatType.NATIVE, 
stopWithSavepointFuture.get().f1);
+        assertEquals(savepointPath, stopWithSavepointFuture.get().f2);
+    }
+
     private JobDetails getJobDetails(
             org.apache.flink.api.common.JobStatus status,
             Tuple2<ExecutionState, Integer>... tasksPerState) {

Reply via email to