This is an automated email from the ASF dual-hosted git repository. sjwiesman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7c3e742d4287bcebef88710bf8a400e4df8ebf09 Author: sjwiesman <sjwies...@gmail.com> AuthorDate: Tue Aug 17 09:14:16 2021 -0500 [hotfix] support setting additional configuration on WritableSavepoint --- .../flink/state/api/BootstrapTransformation.java | 19 +++++++++++++-- .../apache/flink/state/api/WritableSavepoint.java | 27 ++++++++++++++++++++-- .../state/api/BootstrapTransformationTest.java | 6 ++++- .../flink/state/api/SavepointDeepCopyTest.java | 21 ++++++++++++----- 4 files changed, 62 insertions(+), 11 deletions(-) diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java index eefb469..7ac51a5 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java @@ -133,6 +133,7 @@ public class BootstrapTransformation<T> { /** * @param operatorID The operator id for the stream operator. * @param stateBackend The state backend for the job. + * @param config Additional configurations applied to the bootstrap stream tasks. * @param globalMaxParallelism Global max parallelism set for the savepoint. * @param savepointPath The path where the savepoint will be written. * @return The operator subtask states for this bootstrap transformation. @@ -140,12 +141,13 @@ public class BootstrapTransformation<T> { DataSet<OperatorState> writeOperatorState( OperatorID operatorID, StateBackend stateBackend, + Configuration config, int globalMaxParallelism, Path savepointPath) { int localMaxParallelism = getMaxParallelism(globalMaxParallelism); return writeOperatorSubtaskStates( - operatorID, stateBackend, savepointPath, localMaxParallelism) + operatorID, stateBackend, config, savepointPath, localMaxParallelism) .reduceGroup(new OperatorSubtaskStateReducer(operatorID, localMaxParallelism)) .name("reduce(OperatorSubtaskState)"); } @@ -156,6 +158,16 @@ public class BootstrapTransformation<T> { StateBackend stateBackend, Path savepointPath, int localMaxParallelism) { + return writeOperatorSubtaskStates( + operatorID, stateBackend, new Configuration(), savepointPath, localMaxParallelism); + } + + private MapPartitionOperator<T, TaggedOperatorSubtaskState> writeOperatorSubtaskStates( + OperatorID operatorID, + StateBackend stateBackend, + Configuration additionalConfig, + Path savepointPath, + int localMaxParallelism) { DataSet<T> input = dataSet; if (originalKeySelector != null) { @@ -169,7 +181,7 @@ public class BootstrapTransformation<T> { operator = dataSet.clean(operator); - final StreamConfig config = getConfig(operatorID, stateBackend, operator); + final StreamConfig config = getConfig(operatorID, stateBackend, additionalConfig, operator); BoundedOneInputStreamTaskRunner<T> operatorRunner = new BoundedOneInputStreamTaskRunner<>(config, localMaxParallelism, timestamper); @@ -192,12 +204,15 @@ public class BootstrapTransformation<T> { StreamConfig getConfig( OperatorID operatorID, StateBackend stateBackend, + Configuration additionalConfig, StreamOperator<TaggedOperatorSubtaskState> operator) { // Eagerly perform a deep copy of the configuration, otherwise it will result in undefined // behavior // when deploying with multiple bootstrap transformations. Configuration deepCopy = new Configuration(dataSet.getExecutionEnvironment().getConfiguration()); + deepCopy.addAll(additionalConfig); + final StreamConfig config = new StreamConfig(deepCopy); config.setChainStart(); config.setCheckpointingEnabled(true); diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java index 324e4c1..7d3e51f 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java @@ -19,6 +19,8 @@ package org.apache.flink.state.api; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.java.DataSet; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.checkpoint.OperatorState; import org.apache.flink.runtime.state.StateBackend; @@ -53,11 +55,14 @@ public abstract class WritableSavepoint<F extends WritableSavepoint> { /** The state backend to use when writing this savepoint. */ protected final StateBackend stateBackend; + private final Configuration configuration; + WritableSavepoint(SavepointMetadata metadata, StateBackend stateBackend) { Preconditions.checkNotNull(metadata, "The savepoint metadata must not be null"); Preconditions.checkNotNull(stateBackend, "The state backend must not be null"); this.metadata = metadata; this.stateBackend = stateBackend; + this.configuration = new Configuration(); } /** @@ -86,6 +91,21 @@ public abstract class WritableSavepoint<F extends WritableSavepoint> { } /** + * Sets a configuration that will be applied to the stream operators used to bootstrap a new + * savepoint. + * + * @param option metadata information + * @param value value to be stored + * @param <T> type of the value to be stored + * @return The modified savepoint. + */ + @SuppressWarnings("unchecked") + public <T> F withConfiguration(ConfigOption<T> option, T value) { + configuration.set(option, value); + return (F) this; + } + + /** * Write out a new or updated savepoint. * * @param path The path to where the savepoint should be written. @@ -96,7 +116,7 @@ public abstract class WritableSavepoint<F extends WritableSavepoint> { List<BootstrapTransformationWithID<?>> newOperatorTransformations = metadata.getNewOperators(); DataSet<OperatorState> newOperatorStates = - writeOperatorStates(newOperatorTransformations, savepointPath); + writeOperatorStates(newOperatorTransformations, configuration, savepointPath); List<OperatorState> existingOperators = metadata.getExistingOperators(); @@ -125,7 +145,9 @@ public abstract class WritableSavepoint<F extends WritableSavepoint> { } private DataSet<OperatorState> writeOperatorStates( - List<BootstrapTransformationWithID<?>> newOperatorStates, Path savepointWritePath) { + List<BootstrapTransformationWithID<?>> newOperatorStates, + Configuration config, + Path savepointWritePath) { return newOperatorStates.stream() .map( newOperatorState -> @@ -134,6 +156,7 @@ public abstract class WritableSavepoint<F extends WritableSavepoint> { .writeOperatorState( newOperatorState.getOperatorID(), stateBackend, + config, metadata.getMaxParallelism(), savepointWritePath)) .reduce(DataSet::union) diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/BootstrapTransformationTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/BootstrapTransformationTest.java index 36c5276..4e25ef3 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/BootstrapTransformationTest.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/BootstrapTransformationTest.java @@ -24,6 +24,7 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.Operator; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; @@ -156,7 +157,10 @@ public class BootstrapTransformationTest extends AbstractTestBase { StreamConfig config = transformation.getConfig( - OperatorIDGenerator.fromUid("uid"), new MemoryStateBackend(), null); + OperatorIDGenerator.fromUid("uid"), + new MemoryStateBackend(), + new Configuration(), + null); KeySelector selector = config.getStatePartitioner(0, Thread.currentThread().getContextClassLoader()); diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointDeepCopyTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointDeepCopyTest.java index fcd2888..80d7d09 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointDeepCopyTest.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointDeepCopyTest.java @@ -25,10 +25,13 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction; import org.apache.flink.state.api.functions.KeyedStateReaderFunction; import org.apache.flink.test.util.AbstractTestBase; @@ -49,6 +52,7 @@ import java.util.Collection; import java.util.Set; import java.util.stream.Collectors; +import static org.apache.flink.configuration.CheckpointingOptions.FS_SMALL_FILE_THRESHOLD; import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.isIn; import static org.junit.Assert.assertThat; @@ -57,9 +61,10 @@ import static org.junit.Assert.assertThat; @RunWith(value = Parameterized.class) public class SavepointDeepCopyTest extends AbstractTestBase { + private static final MemorySize FILE_STATE_SIZE_THRESHOLD = new MemorySize(1); + private static final String TEXT = "The quick brown fox jumps over the lazy dog"; private static final String RANDOM_VALUE = RandomStringUtils.randomAlphanumeric(120); - private static final int FILE_STATE_SIZE_THRESHOLD = 1; private final StateBackend backend; @@ -70,10 +75,10 @@ public class SavepointDeepCopyTest extends AbstractTestBase { @Parameterized.Parameters(name = "State Backend: {0}") public static Collection<StateBackend> data() { return Arrays.asList( - new FsStateBackend(new Path("file:///tmp").toUri(), FILE_STATE_SIZE_THRESHOLD), - new RocksDBStateBackend( - new FsStateBackend( - new Path("file:///tmp").toUri(), FILE_STATE_SIZE_THRESHOLD))); + new FsStateBackend(new Path("file:///tmp").toUri()), + new RocksDBStateBackend(new FsStateBackend(new Path("file:///tmp").toUri())), + new HashMapStateBackend(), + new EmbeddedRocksDBStateBackend()); } /** To bootstrapper a savepoint for testing. */ @@ -151,6 +156,7 @@ public class SavepointDeepCopyTest extends AbstractTestBase { // create a savepoint with BootstrapTransformations (one per operator) // write the created savepoint to a given path Savepoint.create(backend, 128) + .withConfiguration(FS_SMALL_FILE_THRESHOLD, FILE_STATE_SIZE_THRESHOLD) .withOperator("Operator1", transformation) .write(savepointPath1); @@ -169,7 +175,10 @@ public class SavepointDeepCopyTest extends AbstractTestBase { File savepointUrl2 = createAndRegisterTempFile(new AbstractID().toHexString()); String savepointPath2 = savepointUrl2.getPath(); - ExistingSavepoint savepoint2 = Savepoint.load(env, savepointPath1, backend); + ExistingSavepoint savepoint2 = + Savepoint.load(env, savepointPath1, backend) + .withConfiguration(FS_SMALL_FILE_THRESHOLD, FILE_STATE_SIZE_THRESHOLD); + savepoint2.withOperator("Operator2", transformation).write(savepointPath2); env.execute("create savepoint2");