This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a31d0feae1ff7a9adc6dff1462f408fdb2e30037 Author: Stephan Ewen <se...@apache.org> AuthorDate: Mon Mar 29 21:59:10 2021 +0200 [FLINK-21935][state backends] Remove async parameter for HashMapStateBackend. Checkpoints are always asynchronous now. --- .../tests/DataStreamAllroundTestJobFactory.java | 13 +-- .../StickyAllocationAndLocalRecoveryTestJob.java | 5 +- .../examples/statemachine/StateMachineExample.java | 4 +- .../runtime/state/hashmap/HashMapStateBackend.java | 92 +--------------------- .../runtime/state/HashMapStateBackendTest.java | 11 +-- .../HeapKeyedStateBackendAsyncByDefaultTest.java | 7 -- 6 files changed, 9 insertions(+), 123 deletions(-) diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java index f2521cf..83e2cde 100644 --- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java +++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java @@ -212,12 +212,6 @@ public class DataStreamAllroundTestJobFactory { .withDescription( "Activate or deactivate incremental snapshots if RocksDBStateBackend is selected."); - private static final ConfigOption<Boolean> STATE_BACKEND_FILE_ASYNC = - ConfigOptions.key("state_backend.file.async") - .defaultValue(true) - .withDescription( - "Activate or deactivate asynchronous snapshots if FileStateBackend is selected."); - private static final ConfigOption<Integer> SEQUENCE_GENERATOR_SRC_KEYSPACE = ConfigOptions.key("sequence_generator_source.keyspace").defaultValue(200); @@ -356,12 +350,7 @@ public class DataStreamAllroundTestJobFactory { final String stateBackend = pt.get(STATE_BACKEND.key(), STATE_BACKEND.defaultValue()); if ("hashmap".equalsIgnoreCase(stateBackend)) { - boolean asyncCheckpoints = - pt.getBoolean( - STATE_BACKEND_FILE_ASYNC.key(), - STATE_BACKEND_FILE_ASYNC.defaultValue()); - - env.setStateBackend(new HashMapStateBackend(asyncCheckpoints)); + env.setStateBackend(new HashMapStateBackend()); } else if ("rocks".equalsIgnoreCase(stateBackend)) { boolean incrementalCheckpoints = pt.getBoolean( diff --git a/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java index 656ad69..7337da2 100644 --- a/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java +++ b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java @@ -70,8 +70,6 @@ import java.util.Set; * default <code>file</code>. * <li>killJvmOnFail: flag that determines whether or not an artificial failure induced by the * test kills the JVM or not. - * <li>asyncCheckpoints: flag for async checkpoints with file state backend, default <code>true - * </code>. * <li>incrementalCheckpoints: flag for incremental checkpoint with rocks state backend, default * <code>false</code>. * <li>delay: sleep delay to throttle down the production of the source, default 0. @@ -110,8 +108,7 @@ public class StickyAllocationAndLocalRecoveryTestJob { String stateBackend = pt.get("stateBackend", "hashmap"); if ("hashmap".equals(stateBackend)) { - boolean asyncCheckpoints = pt.getBoolean("asyncCheckpoints", true); - env.setStateBackend(new HashMapStateBackend(asyncCheckpoints)); + env.setStateBackend(new HashMapStateBackend()); } else if ("rocks".equals(stateBackend)) { boolean incrementalCheckpoints = pt.getBoolean("incrementalCheckpoints", false); env.setStateBackend(new EmbeddedRocksDBStateBackend(incrementalCheckpoints)); diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java index 1bb76ad..93d3326 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java @@ -62,7 +62,6 @@ public class StateMachineExample { System.out.println("Options for both the above setups: "); System.out.println("\t[--backend <hashmap|rocks>]"); System.out.println("\t[--checkpoint-dir <filepath>]"); - System.out.println("\t[--async-checkpoints <true|false>]"); System.out.println("\t[--incremental-checkpoints <true|false>]"); System.out.println("\t[--output <filepath> OR null for stdout]"); System.out.println(); @@ -79,8 +78,7 @@ public class StateMachineExample { final String stateBackend = params.get("backend", "memory"); if ("hashmap".equals(stateBackend)) { final String checkpointDir = params.get("checkpoint-dir"); - boolean asyncCheckpoints = params.getBoolean("async-checkpoints", false); - env.setStateBackend(new HashMapStateBackend(asyncCheckpoints)); + env.setStateBackend(new HashMapStateBackend()); env.getCheckpointConfig().setCheckpointStorage(checkpointDir); } else if ("rocks".equals(stateBackend)) { final String checkpointDir = params.get("checkpoint-dir"); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/hashmap/HashMapStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/hashmap/HashMapStateBackend.java index 6e764d3..0d39387 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/hashmap/HashMapStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/hashmap/HashMapStateBackend.java @@ -21,11 +21,9 @@ package org.apache.flink.runtime.state.hashmap; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.core.fs.CloseableRegistry; -import org.apache.flink.core.fs.FileSystem; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.query.TaskKvStateRegistry; @@ -43,16 +41,12 @@ import org.apache.flink.runtime.state.TaskStateManager; import org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder; import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; -import org.apache.flink.util.TernaryBoolean; import javax.annotation.Nonnull; import java.io.IOException; -import java.net.URI; import java.util.Collection; -import static org.apache.flink.util.Preconditions.checkNotNull; - /** * This state backend holds the working state in the memory (JVM heap) of the TaskManagers and * checkpoints based on the configured {@link org.apache.flink.runtime.state.CheckpointStorage}. @@ -80,81 +74,12 @@ public class HashMapStateBackend extends AbstractStateBackend implements Configu private static final long serialVersionUID = 1L; - // ------------------------------------------------------------------------ - - /** - * Switch to chose between synchronous and asynchronous snapshots. A value of 'undefined' means - * not yet configured, in which case the default will be used. - */ - private final TernaryBoolean asynchronousSnapshots; - // ----------------------------------------------------------------------- - /** - * Creates a new state backend that stores its checkpoint data in the file system and location - * defined by the given URI. - * - * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or - * 'S3://') must be accessible via {@link FileSystem#get(URI)}. - * - * <p>For a state backend targeting HDFS, this means that the URI must either specify the - * authority (host and port), or that the Hadoop configuration that describes that information - * must be in the classpath. - */ - public HashMapStateBackend() { - this(TernaryBoolean.UNDEFINED); - } - - /** - * Creates a new state backend that stores its checkpoint data in the file system and location - * defined by the given URI. - * - * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or - * 'S3://') must be accessible via {@link FileSystem#get(URI)}. - * - * <p>For a state backend targeting HDFS, this means that the URI must either specify the - * authority (host and port), or that the Hadoop configuration that describes that information - * must be in the classpath. - * - * @param asynchronousSnapshots Flag to switch between synchronous and asynchronous snapshot - * mode. - */ - public HashMapStateBackend(boolean asynchronousSnapshots) { - this(TernaryBoolean.fromBoolean(asynchronousSnapshots)); - } - - /** - * Creates a new state backend that stores its checkpoint data in the file system and location - * defined by the given URI. - * - * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or - * 'S3://') must be accessible via {@link FileSystem#get(URI)}. - * - * <p>For a state backend targeting HDFS, this means that the URI must either specify the - * authority (host and port), or that the Hadoop configuration that describes that information - * must be in the classpath. - * - * @param asynchronousSnapshots Flag to switch between synchronous and asynchronous snapshot - * mode. If UNDEFINED, the value configured in the runtime configuration will be used. - */ - public HashMapStateBackend(TernaryBoolean asynchronousSnapshots) { - checkNotNull(asynchronousSnapshots, "asynchronousSnapshots"); - - this.asynchronousSnapshots = asynchronousSnapshots; - } - - private HashMapStateBackend(HashMapStateBackend original, ReadableConfig config) { - // if asynchronous snapshots were configured, use that setting, - // else check the configuration - this.asynchronousSnapshots = - original.asynchronousSnapshots.resolveUndefined( - config.get(CheckpointingOptions.ASYNC_SNAPSHOTS)); - } - @Override public HashMapStateBackend configure(ReadableConfig config, ClassLoader classLoader) throws IllegalConfigurationException { - return new HashMapStateBackend(this, config); + return this; } @Override @@ -189,7 +114,7 @@ public class HashMapStateBackend extends AbstractStateBackend implements Configu getCompressionDecorator(env.getExecutionConfig()), localRecoveryConfig, priorityQueueSetFactory, - isUsingAsynchronousSnapshots(), + true, cancelStreamRegistry) .build(); } @@ -205,20 +130,9 @@ public class HashMapStateBackend extends AbstractStateBackend implements Configu return new DefaultOperatorStateBackendBuilder( env.getUserCodeClassLoader().asClassLoader(), env.getExecutionConfig(), - isUsingAsynchronousSnapshots(), + true, stateHandles, cancelStreamRegistry) .build(); } - - /** - * Gets whether the key/value data structures are asynchronously snapshotted. - * - * <p>If not explicitly configured, this is the default value of {@link - * CheckpointingOptions#ASYNC_SNAPSHOTS}. - */ - public boolean isUsingAsynchronousSnapshots() { - return asynchronousSnapshots.getOrDefault( - CheckpointingOptions.ASYNC_SNAPSHOTS.defaultValue()); - } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/HashMapStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/HashMapStateBackendTest.java index baa2a78..76a1957 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/HashMapStateBackendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/HashMapStateBackendTest.java @@ -48,12 +48,10 @@ public class HashMapStateBackendTest extends StateBackendTestBase<HashMapStateBa return Arrays.asList( new Object[][] { { - true, (SupplierWithException<CheckpointStorage, IOException>) JobManagerCheckpointStorage::new }, { - false, (SupplierWithException<CheckpointStorage, IOException>) () -> { String checkpointPath = @@ -64,15 +62,12 @@ public class HashMapStateBackendTest extends StateBackendTestBase<HashMapStateBa }); } - @Parameterized.Parameter(value = 0) - public boolean useAsyncMode; - - @Parameterized.Parameter(value = 1) + @Parameterized.Parameter public SupplierWithException<CheckpointStorage, IOException> storageSupplier; @Override protected HashMapStateBackend getStateBackend() { - return new HashMapStateBackend(useAsyncMode); + return new HashMapStateBackend(); } @Override @@ -82,7 +77,7 @@ public class HashMapStateBackendTest extends StateBackendTestBase<HashMapStateBa @Override protected boolean supportsAsynchronousSnapshots() { - return useAsyncMode; + return true; } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java index fafb20b..85a6202 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.state; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.runtime.state.filesystem.FsStateBackend; -import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.junit.Rule; @@ -40,12 +39,6 @@ public class HeapKeyedStateBackendAsyncByDefaultTest { } @Test - public void testHashMapStateBackendDefaultToAsync() { - HashMapStateBackend backend = new HashMapStateBackend(); - assertTrue(backend.isUsingAsynchronousSnapshots()); - } - - @Test public void testFsStateBackendDefaultsToAsync() throws Exception { FsStateBackend backend = new FsStateBackend(tmpFolder.newFolder().toURI()); assertTrue(backend.isUsingAsynchronousSnapshots());