This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7c3e742d4287bcebef88710bf8a400e4df8ebf09
Author: sjwiesman <sjwies...@gmail.com>
AuthorDate: Tue Aug 17 09:14:16 2021 -0500

    [hotfix] support setting additional configuration on WritableSavepoint
---
 .../flink/state/api/BootstrapTransformation.java   | 19 +++++++++++++--
 .../apache/flink/state/api/WritableSavepoint.java  | 27 ++++++++++++++++++++--
 .../state/api/BootstrapTransformationTest.java     |  6 ++++-
 .../flink/state/api/SavepointDeepCopyTest.java     | 21 ++++++++++++-----
 4 files changed, 62 insertions(+), 11 deletions(-)

diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java
index eefb469..7ac51a5 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java
@@ -133,6 +133,7 @@ public class BootstrapTransformation<T> {
     /**
      * @param operatorID The operator id for the stream operator.
      * @param stateBackend The state backend for the job.
+     * @param config Additional configurations applied to the bootstrap stream 
tasks.
      * @param globalMaxParallelism Global max parallelism set for the 
savepoint.
      * @param savepointPath The path where the savepoint will be written.
      * @return The operator subtask states for this bootstrap transformation.
@@ -140,12 +141,13 @@ public class BootstrapTransformation<T> {
     DataSet<OperatorState> writeOperatorState(
             OperatorID operatorID,
             StateBackend stateBackend,
+            Configuration config,
             int globalMaxParallelism,
             Path savepointPath) {
         int localMaxParallelism = getMaxParallelism(globalMaxParallelism);
 
         return writeOperatorSubtaskStates(
-                        operatorID, stateBackend, savepointPath, 
localMaxParallelism)
+                        operatorID, stateBackend, config, savepointPath, 
localMaxParallelism)
                 .reduceGroup(new OperatorSubtaskStateReducer(operatorID, 
localMaxParallelism))
                 .name("reduce(OperatorSubtaskState)");
     }
@@ -156,6 +158,16 @@ public class BootstrapTransformation<T> {
             StateBackend stateBackend,
             Path savepointPath,
             int localMaxParallelism) {
+        return writeOperatorSubtaskStates(
+                operatorID, stateBackend, new Configuration(), savepointPath, 
localMaxParallelism);
+    }
+
+    private MapPartitionOperator<T, TaggedOperatorSubtaskState> 
writeOperatorSubtaskStates(
+            OperatorID operatorID,
+            StateBackend stateBackend,
+            Configuration additionalConfig,
+            Path savepointPath,
+            int localMaxParallelism) {
 
         DataSet<T> input = dataSet;
         if (originalKeySelector != null) {
@@ -169,7 +181,7 @@ public class BootstrapTransformation<T> {
 
         operator = dataSet.clean(operator);
 
-        final StreamConfig config = getConfig(operatorID, stateBackend, 
operator);
+        final StreamConfig config = getConfig(operatorID, stateBackend, 
additionalConfig, operator);
 
         BoundedOneInputStreamTaskRunner<T> operatorRunner =
                 new BoundedOneInputStreamTaskRunner<>(config, 
localMaxParallelism, timestamper);
@@ -192,12 +204,15 @@ public class BootstrapTransformation<T> {
     StreamConfig getConfig(
             OperatorID operatorID,
             StateBackend stateBackend,
+            Configuration additionalConfig,
             StreamOperator<TaggedOperatorSubtaskState> operator) {
         // Eagerly perform a deep copy of the configuration, otherwise it will 
result in undefined
         // behavior
         // when deploying with multiple bootstrap transformations.
         Configuration deepCopy =
                 new 
Configuration(dataSet.getExecutionEnvironment().getConfiguration());
+        deepCopy.addAll(additionalConfig);
+
         final StreamConfig config = new StreamConfig(deepCopy);
         config.setChainStart();
         config.setCheckpointingEnabled(true);
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java
index 324e4c1..7d3e51f 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java
@@ -19,6 +19,8 @@ package org.apache.flink.state.api;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.checkpoint.OperatorState;
 import org.apache.flink.runtime.state.StateBackend;
@@ -53,11 +55,14 @@ public abstract class WritableSavepoint<F extends 
WritableSavepoint> {
     /** The state backend to use when writing this savepoint. */
     protected final StateBackend stateBackend;
 
+    private final Configuration configuration;
+
     WritableSavepoint(SavepointMetadata metadata, StateBackend stateBackend) {
         Preconditions.checkNotNull(metadata, "The savepoint metadata must not 
be null");
         Preconditions.checkNotNull(stateBackend, "The state backend must not 
be null");
         this.metadata = metadata;
         this.stateBackend = stateBackend;
+        this.configuration = new Configuration();
     }
 
     /**
@@ -86,6 +91,21 @@ public abstract class WritableSavepoint<F extends 
WritableSavepoint> {
     }
 
     /**
+     * Sets a configuration that will be applied to the stream operators used 
to bootstrap a new
+     * savepoint.
+     *
+     * @param option metadata information
+     * @param value value to be stored
+     * @param <T> type of the value to be stored
+     * @return The modified savepoint.
+     */
+    @SuppressWarnings("unchecked")
+    public <T> F withConfiguration(ConfigOption<T> option, T value) {
+        configuration.set(option, value);
+        return (F) this;
+    }
+
+    /**
      * Write out a new or updated savepoint.
      *
      * @param path The path to where the savepoint should be written.
@@ -96,7 +116,7 @@ public abstract class WritableSavepoint<F extends 
WritableSavepoint> {
         List<BootstrapTransformationWithID<?>> newOperatorTransformations =
                 metadata.getNewOperators();
         DataSet<OperatorState> newOperatorStates =
-                writeOperatorStates(newOperatorTransformations, savepointPath);
+                writeOperatorStates(newOperatorTransformations, configuration, 
savepointPath);
 
         List<OperatorState> existingOperators = 
metadata.getExistingOperators();
 
@@ -125,7 +145,9 @@ public abstract class WritableSavepoint<F extends 
WritableSavepoint> {
     }
 
     private DataSet<OperatorState> writeOperatorStates(
-            List<BootstrapTransformationWithID<?>> newOperatorStates, Path 
savepointWritePath) {
+            List<BootstrapTransformationWithID<?>> newOperatorStates,
+            Configuration config,
+            Path savepointWritePath) {
         return newOperatorStates.stream()
                 .map(
                         newOperatorState ->
@@ -134,6 +156,7 @@ public abstract class WritableSavepoint<F extends 
WritableSavepoint> {
                                         .writeOperatorState(
                                                 
newOperatorState.getOperatorID(),
                                                 stateBackend,
+                                                config,
                                                 metadata.getMaxParallelism(),
                                                 savepointWritePath))
                 .reduce(DataSet::union)
diff --git 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/BootstrapTransformationTest.java
 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/BootstrapTransformationTest.java
index 36c5276..4e25ef3 100644
--- 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/BootstrapTransformationTest.java
+++ 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/BootstrapTransformationTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.operators.DataSource;
 import org.apache.flink.api.java.operators.Operator;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -156,7 +157,10 @@ public class BootstrapTransformationTest extends 
AbstractTestBase {
 
         StreamConfig config =
                 transformation.getConfig(
-                        OperatorIDGenerator.fromUid("uid"), new 
MemoryStateBackend(), null);
+                        OperatorIDGenerator.fromUid("uid"),
+                        new MemoryStateBackend(),
+                        new Configuration(),
+                        null);
         KeySelector selector =
                 config.getStatePartitioner(0, 
Thread.currentThread().getContextClassLoader());
 
diff --git 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointDeepCopyTest.java
 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointDeepCopyTest.java
index fcd2888..80d7d09 100644
--- 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointDeepCopyTest.java
+++ 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointDeepCopyTest.java
@@ -25,10 +25,13 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
 import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
 import org.apache.flink.state.api.functions.KeyedStateReaderFunction;
 import org.apache.flink.test.util.AbstractTestBase;
@@ -49,6 +52,7 @@ import java.util.Collection;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.configuration.CheckpointingOptions.FS_SMALL_FILE_THRESHOLD;
 import static org.hamcrest.Matchers.everyItem;
 import static org.hamcrest.Matchers.isIn;
 import static org.junit.Assert.assertThat;
@@ -57,9 +61,10 @@ import static org.junit.Assert.assertThat;
 @RunWith(value = Parameterized.class)
 public class SavepointDeepCopyTest extends AbstractTestBase {
 
+    private static final MemorySize FILE_STATE_SIZE_THRESHOLD = new 
MemorySize(1);
+
     private static final String TEXT = "The quick brown fox jumps over the 
lazy dog";
     private static final String RANDOM_VALUE = 
RandomStringUtils.randomAlphanumeric(120);
-    private static final int FILE_STATE_SIZE_THRESHOLD = 1;
 
     private final StateBackend backend;
 
@@ -70,10 +75,10 @@ public class SavepointDeepCopyTest extends AbstractTestBase 
{
     @Parameterized.Parameters(name = "State Backend: {0}")
     public static Collection<StateBackend> data() {
         return Arrays.asList(
-                new FsStateBackend(new Path("file:///tmp").toUri(), 
FILE_STATE_SIZE_THRESHOLD),
-                new RocksDBStateBackend(
-                        new FsStateBackend(
-                                new Path("file:///tmp").toUri(), 
FILE_STATE_SIZE_THRESHOLD)));
+                new FsStateBackend(new Path("file:///tmp").toUri()),
+                new RocksDBStateBackend(new FsStateBackend(new 
Path("file:///tmp").toUri())),
+                new HashMapStateBackend(),
+                new EmbeddedRocksDBStateBackend());
     }
 
     /** To bootstrapper a savepoint for testing. */
@@ -151,6 +156,7 @@ public class SavepointDeepCopyTest extends AbstractTestBase 
{
         // create a savepoint with BootstrapTransformations (one per operator)
         // write the created savepoint to a given path
         Savepoint.create(backend, 128)
+                .withConfiguration(FS_SMALL_FILE_THRESHOLD, 
FILE_STATE_SIZE_THRESHOLD)
                 .withOperator("Operator1", transformation)
                 .write(savepointPath1);
 
@@ -169,7 +175,10 @@ public class SavepointDeepCopyTest extends 
AbstractTestBase {
         File savepointUrl2 = createAndRegisterTempFile(new 
AbstractID().toHexString());
         String savepointPath2 = savepointUrl2.getPath();
 
-        ExistingSavepoint savepoint2 = Savepoint.load(env, savepointPath1, 
backend);
+        ExistingSavepoint savepoint2 =
+                Savepoint.load(env, savepointPath1, backend)
+                        .withConfiguration(FS_SMALL_FILE_THRESHOLD, 
FILE_STATE_SIZE_THRESHOLD);
+
         savepoint2.withOperator("Operator2", 
transformation).write(savepointPath2);
         env.execute("create savepoint2");
 

Reply via email to