This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a31d0feae1ff7a9adc6dff1462f408fdb2e30037
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Mar 29 21:59:10 2021 +0200

    [FLINK-21935][state backends] Remove async parameter for 
HashMapStateBackend.
    
    Checkpoints are always asynchronous now.
---
 .../tests/DataStreamAllroundTestJobFactory.java    | 13 +--
 .../StickyAllocationAndLocalRecoveryTestJob.java   |  5 +-
 .../examples/statemachine/StateMachineExample.java |  4 +-
 .../runtime/state/hashmap/HashMapStateBackend.java | 92 +---------------------
 .../runtime/state/HashMapStateBackendTest.java     | 11 +--
 .../HeapKeyedStateBackendAsyncByDefaultTest.java   |  7 --
 6 files changed, 9 insertions(+), 123 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
index f2521cf..83e2cde 100644
--- 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
+++ 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
@@ -212,12 +212,6 @@ public class DataStreamAllroundTestJobFactory {
                     .withDescription(
                             "Activate or deactivate incremental snapshots if 
RocksDBStateBackend is selected.");
 
-    private static final ConfigOption<Boolean> STATE_BACKEND_FILE_ASYNC =
-            ConfigOptions.key("state_backend.file.async")
-                    .defaultValue(true)
-                    .withDescription(
-                            "Activate or deactivate asynchronous snapshots if 
FileStateBackend is selected.");
-
     private static final ConfigOption<Integer> SEQUENCE_GENERATOR_SRC_KEYSPACE 
=
             
ConfigOptions.key("sequence_generator_source.keyspace").defaultValue(200);
 
@@ -356,12 +350,7 @@ public class DataStreamAllroundTestJobFactory {
         final String stateBackend = pt.get(STATE_BACKEND.key(), 
STATE_BACKEND.defaultValue());
 
         if ("hashmap".equalsIgnoreCase(stateBackend)) {
-            boolean asyncCheckpoints =
-                    pt.getBoolean(
-                            STATE_BACKEND_FILE_ASYNC.key(),
-                            STATE_BACKEND_FILE_ASYNC.defaultValue());
-
-            env.setStateBackend(new HashMapStateBackend(asyncCheckpoints));
+            env.setStateBackend(new HashMapStateBackend());
         } else if ("rocks".equalsIgnoreCase(stateBackend)) {
             boolean incrementalCheckpoints =
                     pt.getBoolean(
diff --git 
a/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
 
b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
index 656ad69..7337da2 100644
--- 
a/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
+++ 
b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
@@ -70,8 +70,6 @@ import java.util.Set;
  *       default <code>file</code>.
  *   <li>killJvmOnFail: flag that determines whether or not an artificial 
failure induced by the
  *       test kills the JVM or not.
- *   <li>asyncCheckpoints: flag for async checkpoints with file state backend, 
default <code>true
- *       </code>.
  *   <li>incrementalCheckpoints: flag for incremental checkpoint with rocks 
state backend, default
  *       <code>false</code>.
  *   <li>delay: sleep delay to throttle down the production of the source, 
default 0.
@@ -110,8 +108,7 @@ public class StickyAllocationAndLocalRecoveryTestJob {
 
         String stateBackend = pt.get("stateBackend", "hashmap");
         if ("hashmap".equals(stateBackend)) {
-            boolean asyncCheckpoints = pt.getBoolean("asyncCheckpoints", true);
-            env.setStateBackend(new HashMapStateBackend(asyncCheckpoints));
+            env.setStateBackend(new HashMapStateBackend());
         } else if ("rocks".equals(stateBackend)) {
             boolean incrementalCheckpoints = 
pt.getBoolean("incrementalCheckpoints", false);
             env.setStateBackend(new 
EmbeddedRocksDBStateBackend(incrementalCheckpoints));
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java
index 1bb76ad..93d3326 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java
@@ -62,7 +62,6 @@ public class StateMachineExample {
         System.out.println("Options for both the above setups: ");
         System.out.println("\t[--backend <hashmap|rocks>]");
         System.out.println("\t[--checkpoint-dir <filepath>]");
-        System.out.println("\t[--async-checkpoints <true|false>]");
         System.out.println("\t[--incremental-checkpoints <true|false>]");
         System.out.println("\t[--output <filepath> OR null for stdout]");
         System.out.println();
@@ -79,8 +78,7 @@ public class StateMachineExample {
         final String stateBackend = params.get("backend", "memory");
         if ("hashmap".equals(stateBackend)) {
             final String checkpointDir = params.get("checkpoint-dir");
-            boolean asyncCheckpoints = params.getBoolean("async-checkpoints", 
false);
-            env.setStateBackend(new HashMapStateBackend(asyncCheckpoints));
+            env.setStateBackend(new HashMapStateBackend());
             env.getCheckpointConfig().setCheckpointStorage(checkpointDir);
         } else if ("rocks".equals(stateBackend)) {
             final String checkpointDir = params.get("checkpoint-dir");
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/hashmap/HashMapStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/hashmap/HashMapStateBackend.java
index 6e764d3..0d39387 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/hashmap/HashMapStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/hashmap/HashMapStateBackend.java
@@ -21,11 +21,9 @@ package org.apache.flink.runtime.state.hashmap;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.fs.CloseableRegistry;
-import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -43,16 +41,12 @@ import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
-import org.apache.flink.util.TernaryBoolean;
 
 import javax.annotation.Nonnull;
 
 import java.io.IOException;
-import java.net.URI;
 import java.util.Collection;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * This state backend holds the working state in the memory (JVM heap) of the 
TaskManagers and
  * checkpoints based on the configured {@link 
org.apache.flink.runtime.state.CheckpointStorage}.
@@ -80,81 +74,12 @@ public class HashMapStateBackend extends 
AbstractStateBackend implements Configu
 
     private static final long serialVersionUID = 1L;
 
-    // ------------------------------------------------------------------------
-
-    /**
-     * Switch to chose between synchronous and asynchronous snapshots. A value 
of 'undefined' means
-     * not yet configured, in which case the default will be used.
-     */
-    private final TernaryBoolean asynchronousSnapshots;
-
     // -----------------------------------------------------------------------
 
-    /**
-     * Creates a new state backend that stores its checkpoint data in the file 
system and location
-     * defined by the given URI.
-     *
-     * <p>A file system for the file system scheme in the URI (e.g., 
'file://', 'hdfs://', or
-     * 'S3://') must be accessible via {@link FileSystem#get(URI)}.
-     *
-     * <p>For a state backend targeting HDFS, this means that the URI must 
either specify the
-     * authority (host and port), or that the Hadoop configuration that 
describes that information
-     * must be in the classpath.
-     */
-    public HashMapStateBackend() {
-        this(TernaryBoolean.UNDEFINED);
-    }
-
-    /**
-     * Creates a new state backend that stores its checkpoint data in the file 
system and location
-     * defined by the given URI.
-     *
-     * <p>A file system for the file system scheme in the URI (e.g., 
'file://', 'hdfs://', or
-     * 'S3://') must be accessible via {@link FileSystem#get(URI)}.
-     *
-     * <p>For a state backend targeting HDFS, this means that the URI must 
either specify the
-     * authority (host and port), or that the Hadoop configuration that 
describes that information
-     * must be in the classpath.
-     *
-     * @param asynchronousSnapshots Flag to switch between synchronous and 
asynchronous snapshot
-     *     mode.
-     */
-    public HashMapStateBackend(boolean asynchronousSnapshots) {
-        this(TernaryBoolean.fromBoolean(asynchronousSnapshots));
-    }
-
-    /**
-     * Creates a new state backend that stores its checkpoint data in the file 
system and location
-     * defined by the given URI.
-     *
-     * <p>A file system for the file system scheme in the URI (e.g., 
'file://', 'hdfs://', or
-     * 'S3://') must be accessible via {@link FileSystem#get(URI)}.
-     *
-     * <p>For a state backend targeting HDFS, this means that the URI must 
either specify the
-     * authority (host and port), or that the Hadoop configuration that 
describes that information
-     * must be in the classpath.
-     *
-     * @param asynchronousSnapshots Flag to switch between synchronous and 
asynchronous snapshot
-     *     mode. If UNDEFINED, the value configured in the runtime 
configuration will be used.
-     */
-    public HashMapStateBackend(TernaryBoolean asynchronousSnapshots) {
-        checkNotNull(asynchronousSnapshots, "asynchronousSnapshots");
-
-        this.asynchronousSnapshots = asynchronousSnapshots;
-    }
-
-    private HashMapStateBackend(HashMapStateBackend original, ReadableConfig 
config) {
-        // if asynchronous snapshots were configured, use that setting,
-        // else check the configuration
-        this.asynchronousSnapshots =
-                original.asynchronousSnapshots.resolveUndefined(
-                        config.get(CheckpointingOptions.ASYNC_SNAPSHOTS));
-    }
-
     @Override
     public HashMapStateBackend configure(ReadableConfig config, ClassLoader 
classLoader)
             throws IllegalConfigurationException {
-        return new HashMapStateBackend(this, config);
+        return this;
     }
 
     @Override
@@ -189,7 +114,7 @@ public class HashMapStateBackend extends 
AbstractStateBackend implements Configu
                         getCompressionDecorator(env.getExecutionConfig()),
                         localRecoveryConfig,
                         priorityQueueSetFactory,
-                        isUsingAsynchronousSnapshots(),
+                        true,
                         cancelStreamRegistry)
                 .build();
     }
@@ -205,20 +130,9 @@ public class HashMapStateBackend extends 
AbstractStateBackend implements Configu
         return new DefaultOperatorStateBackendBuilder(
                         env.getUserCodeClassLoader().asClassLoader(),
                         env.getExecutionConfig(),
-                        isUsingAsynchronousSnapshots(),
+                        true,
                         stateHandles,
                         cancelStreamRegistry)
                 .build();
     }
-
-    /**
-     * Gets whether the key/value data structures are asynchronously 
snapshotted.
-     *
-     * <p>If not explicitly configured, this is the default value of {@link
-     * CheckpointingOptions#ASYNC_SNAPSHOTS}.
-     */
-    public boolean isUsingAsynchronousSnapshots() {
-        return asynchronousSnapshots.getOrDefault(
-                CheckpointingOptions.ASYNC_SNAPSHOTS.defaultValue());
-    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/HashMapStateBackendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/HashMapStateBackendTest.java
index baa2a78..76a1957 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/HashMapStateBackendTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/HashMapStateBackendTest.java
@@ -48,12 +48,10 @@ public class HashMapStateBackendTest extends 
StateBackendTestBase<HashMapStateBa
         return Arrays.asList(
                 new Object[][] {
                     {
-                        true,
                         (SupplierWithException<CheckpointStorage, IOException>)
                                 JobManagerCheckpointStorage::new
                     },
                     {
-                        false,
                         (SupplierWithException<CheckpointStorage, IOException>)
                                 () -> {
                                     String checkpointPath =
@@ -64,15 +62,12 @@ public class HashMapStateBackendTest extends 
StateBackendTestBase<HashMapStateBa
                 });
     }
 
-    @Parameterized.Parameter(value = 0)
-    public boolean useAsyncMode;
-
-    @Parameterized.Parameter(value = 1)
+    @Parameterized.Parameter
     public SupplierWithException<CheckpointStorage, IOException> 
storageSupplier;
 
     @Override
     protected HashMapStateBackend getStateBackend() {
-        return new HashMapStateBackend(useAsyncMode);
+        return new HashMapStateBackend();
     }
 
     @Override
@@ -82,7 +77,7 @@ public class HashMapStateBackendTest extends 
StateBackendTestBase<HashMapStateBa
 
     @Override
     protected boolean supportsAsynchronousSnapshots() {
-        return useAsyncMode;
+        return true;
     }
 
     @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java
index fafb20b..85a6202 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 
 import org.junit.Rule;
@@ -40,12 +39,6 @@ public class HeapKeyedStateBackendAsyncByDefaultTest {
     }
 
     @Test
-    public void testHashMapStateBackendDefaultToAsync() {
-        HashMapStateBackend backend = new HashMapStateBackend();
-        assertTrue(backend.isUsingAsynchronousSnapshots());
-    }
-
-    @Test
     public void testFsStateBackendDefaultsToAsync() throws Exception {
         FsStateBackend backend = new 
FsStateBackend(tmpFolder.newFolder().toURI());
         assertTrue(backend.isUsingAsynchronousSnapshots());

Reply via email to