This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new 70bb74c0 [FLINK-28763] Make savepoint format configurable for upgrades/savepoint operations 70bb74c0 is described below commit 70bb74c06280985065f0ce56236186a146d4436f Author: Nicholas Jiang <programg...@163.com> AuthorDate: Tue Aug 9 03:36:08 2022 -0700 [FLINK-28763] Make savepoint format configurable for upgrades/savepoint operations --- .../shortcodes/generated/dynamic_section.html | 6 ++ .../kubernetes_operator_config_configuration.html | 6 ++ .../config/KubernetesOperatorConfigOptions.java | 9 +++ .../operator/service/AbstractFlinkService.java | 14 ++-- .../kubernetes/operator/TestingClusterClient.java | 12 +++- .../operator/service/NativeFlinkServiceTest.java | 75 ++++++++++++++++++++++ 6 files changed, 117 insertions(+), 5 deletions(-) diff --git a/docs/layouts/shortcodes/generated/dynamic_section.html b/docs/layouts/shortcodes/generated/dynamic_section.html index 8d8fb620..4cfac307 100644 --- a/docs/layouts/shortcodes/generated/dynamic_section.html +++ b/docs/layouts/shortcodes/generated/dynamic_section.html @@ -44,6 +44,12 @@ <td>Duration</td> <td>Interval at which periodic savepoints will be triggered. The triggering schedule is not guaranteed, savepoints will be triggered as part of the regular reconcile loop.</td> </tr> + <tr> + <td><h5>kubernetes.operator.savepoint.format.type</h5></td> + <td style="word-wrap: break-word;">CANONICAL</td> + <td><p>Enum</p></td> + <td>Type of the binary format in which a savepoint should be taken.<br /><br />Possible values:<ul><li>"CANONICAL": A canonical, common for all state backends format. It lets you switch state backends.</li><li>"NATIVE": A format specific for the chosen state backend, in its native binary format. Might be faster to take and restore from than the canonical one.</li></ul></td> + </tr> <tr> <td><h5>kubernetes.operator.savepoint.history.max.age</h5></td> <td style="word-wrap: break-word;">86400000 ms</td> diff --git a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html index f950054e..aaf1e936 100644 --- a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html +++ b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html @@ -140,6 +140,12 @@ <td>Integer</td> <td>Max attempts of automatic reconcile retries on recoverable errors.</td> </tr> + <tr> + <td><h5>kubernetes.operator.savepoint.format.type</h5></td> + <td style="word-wrap: break-word;">CANONICAL</td> + <td><p>Enum</p></td> + <td>Type of the binary format in which a savepoint should be taken.<br /><br />Possible values:<ul><li>"CANONICAL": A canonical, common for all state backends format. It lets you switch state backends.</li><li>"NATIVE": A format specific for the chosen state backend, in its native binary format. Might be faster to take and restore from than the canonical one.</li></ul></td> + </tr> <tr> <td><h5>kubernetes.operator.savepoint.history.max.age</h5></td> <td style="word-wrap: break-word;">86400000 ms</td> diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java index 774882ac..11f84b7c 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java @@ -21,6 +21,7 @@ package org.apache.flink.kubernetes.operator.config; import org.apache.flink.annotation.docs.Documentation; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.core.execution.SavepointFormatType; import io.javaoperatorsdk.operator.api.config.ConfigurationService; import io.javaoperatorsdk.operator.api.reconciler.Constants; @@ -280,4 +281,12 @@ public class KubernetesOperatorConfigOptions { .defaultValue(true) .withDescription( "Enables last-state fallback for savepoint upgrade mode. When the job is not running thus savepoint cannot be triggered but HA metadata is available for last state restore the operator can initiate the upgrade process when the flag is enabled."); + + @Documentation.Section(SECTION_DYNAMIC) + public static final ConfigOption<SavepointFormatType> OPERATOR_SAVEPOINT_FORMAT_TYPE = + ConfigOptions.key("kubernetes.operator.savepoint.format.type") + .enumType(SavepointFormatType.class) + .defaultValue(SavepointFormatType.DEFAULT) + .withDescription( + "Type of the binary format in which a savepoint should be taken."); } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java index 1a601446..bd316290 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java @@ -25,11 +25,11 @@ import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; -import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator; import org.apache.flink.kubernetes.operator.artifact.ArtifactManager; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; +import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob; import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec; @@ -291,7 +291,9 @@ public abstract class AbstractFlinkService implements FlinkService { conf.get(FLINK_VERSION) .isNewerVersionThan( FlinkVersion.v1_14) - ? SavepointFormatType.DEFAULT + ? conf.get( + KubernetesOperatorConfigOptions + .OPERATOR_SAVEPOINT_FORMAT_TYPE) : null) .get(timeout, TimeUnit.SECONDS); savepointOpt = Optional.of(savepoint); @@ -384,7 +386,9 @@ public abstract class AbstractFlinkService implements FlinkService { conf.get(FLINK_VERSION) .isNewerVersionThan( FlinkVersion.v1_14) - ? SavepointFormatType.DEFAULT + ? conf.get( + KubernetesOperatorConfigOptions + .OPERATOR_SAVEPOINT_FORMAT_TYPE) : null) .get(timeout, TimeUnit.SECONDS); savepointOpt = Optional.of(savepoint); @@ -442,7 +446,9 @@ public abstract class AbstractFlinkService implements FlinkService { false, conf.get(FLINK_VERSION) .isNewerVersionThan(FlinkVersion.v1_14) - ? SavepointFormatType.DEFAULT + ? conf.get( + KubernetesOperatorConfigOptions + .OPERATOR_SAVEPOINT_FORMAT_TYPE) : null, null)) .get(timeout, TimeUnit.SECONDS); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java index d96427e4..413a6943 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java @@ -57,6 +57,8 @@ public class TestingClusterClient<T> extends RestClusterClient<T> { stopWithSavepointFunction = (ignore1, ignore2, savepointPath) -> CompletableFuture.completedFuture(savepointPath); + private TriFunction<JobID, SavepointFormatType, String, CompletableFuture<String>> + stopWithSavepointFormat; private TriFunction< MessageHeaders<?, ?, ?>, MessageParameters, @@ -97,6 +99,12 @@ public class TestingClusterClient<T> extends RestClusterClient<T> { this.stopWithSavepointFunction = stopWithSavepointFunction; } + public void setStopWithSavepointFormat( + TriFunction<JobID, SavepointFormatType, String, CompletableFuture<String>> + stopWithSavepointFormat) { + this.stopWithSavepointFormat = stopWithSavepointFormat; + } + public void setRequestProcessor( TriFunction< MessageHeaders<?, ?, ?>, @@ -184,7 +192,9 @@ public class TestingClusterClient<T> extends RestClusterClient<T> { boolean advanceToEndOfTime, @Nullable String savepointDirectory, SavepointFormatType formatType) { - return stopWithSavepointFunction.apply(jobId, advanceToEndOfTime, savepointDirectory); + return stopWithSavepointFormat == null + ? stopWithSavepointFunction.apply(jobId, advanceToEndOfTime, savepointDirectory) + : stopWithSavepointFormat.apply(jobId, formatType, savepointDirectory); } @Override diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java index 8e151f04..c2d13029 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java @@ -20,13 +20,16 @@ package org.apache.flink.kubernetes.operator.service; import org.apache.flink.api.common.JobID; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.TestingClusterClient; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; +import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion; import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; @@ -337,6 +340,78 @@ public class NativeFlinkServiceTest { AbstractFlinkService.getEffectiveStatus(allFinished)); } + @Test + public void testNativeSavepointFormat() throws Exception { + final TestingClusterClient<String> testingClusterClient = + new TestingClusterClient<>(configuration, TestUtils.TEST_DEPLOYMENT_NAME); + final String savepointPath = "file:///path/of/svp"; + final CompletableFuture<Tuple4<JobID, String, Boolean, SavepointFormatType>> + triggerSavepointFuture = new CompletableFuture<>(); + configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointPath); + testingClusterClient.setRequestProcessor( + (headers, parameters, requestBody) -> { + triggerSavepointFuture.complete( + new Tuple4<>( + ((SavepointTriggerMessageParameters) parameters) + .jobID.getValue(), + ((SavepointTriggerRequestBody) requestBody) + .getTargetDirectory() + .get(), + ((SavepointTriggerRequestBody) requestBody).isCancelJob(), + ((SavepointTriggerRequestBody) requestBody).getFormatType())); + return CompletableFuture.completedFuture(new TriggerResponse(new TriggerId())); + }); + final CompletableFuture<Tuple3<JobID, SavepointFormatType, String>> + stopWithSavepointFuture = new CompletableFuture<>(); + testingClusterClient.setStopWithSavepointFormat( + (id, formatType, savepointDir) -> { + stopWithSavepointFuture.complete(new Tuple3<>(id, formatType, savepointDir)); + return CompletableFuture.completedFuture(savepointPath); + }); + + final FlinkService flinkService = createFlinkService(testingClusterClient); + + final JobID jobID = JobID.generate(); + final FlinkDeployment deployment = TestUtils.buildApplicationCluster(); + deployment + .getSpec() + .getFlinkConfiguration() + .put(CheckpointingOptions.SAVEPOINT_DIRECTORY.key(), savepointPath); + deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY); + JobStatus jobStatus = deployment.getStatus().getJobStatus(); + jobStatus.setJobId(jobID.toHexString()); + jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING.name()); + ReconciliationUtils.updateStatusForDeployedSpec(deployment, new Configuration()); + + jobStatus.setJobId(jobID.toString()); + deployment.getStatus().setJobStatus(jobStatus); + flinkService.triggerSavepoint( + deployment.getStatus().getJobStatus().getJobId(), + SavepointTriggerType.MANUAL, + deployment.getStatus().getJobStatus().getSavepointInfo(), + new Configuration(configuration) + .set( + KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE, + SavepointFormatType.NATIVE)); + assertTrue(triggerSavepointFuture.isDone()); + assertEquals(jobID, triggerSavepointFuture.get().f0); + assertEquals(savepointPath, triggerSavepointFuture.get().f1); + assertFalse(triggerSavepointFuture.get().f2); + assertEquals(SavepointFormatType.NATIVE, triggerSavepointFuture.get().f3); + + flinkService.cancelJob( + deployment, + UpgradeMode.SAVEPOINT, + new Configuration(configManager.getObserveConfig(deployment)) + .set( + KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE, + SavepointFormatType.NATIVE)); + assertTrue(stopWithSavepointFuture.isDone()); + assertEquals(jobID, stopWithSavepointFuture.get().f0); + assertEquals(SavepointFormatType.NATIVE, stopWithSavepointFuture.get().f1); + assertEquals(savepointPath, stopWithSavepointFuture.get().f2); + } + private JobDetails getJobDetails( org.apache.flink.api.common.JobStatus status, Tuple2<ExecutionState, Integer>... tasksPerState) {