Repository: flink
Updated Branches:
  refs/heads/master d139e6340 -> 2906698b4


[FLINK-8053] [checkpoints] Default to asynchronous snapshots for FsStateBackend 
and MemoryStateBackend.

This closes #5005.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2906698b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2906698b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2906698b

Branch: refs/heads/master
Commit: 2906698b4a87f21c6fd099cf8a028f68fc311b1f
Parents: d139e63
Author: Stefan Richter <s.rich...@data-artisans.com>
Authored: Mon Nov 13 14:31:45 2017 +0100
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Tue Nov 14 11:52:32 2017 +0100

----------------------------------------------------------------------
 docs/ops/state/large_state_tuning.md            |  6 +-
 docs/ops/state/state_backends.md                |  5 +-
 .../state/filesystem/FsStateBackend.java        |  4 +-
 .../state/memory/MemoryStateBackend.java        |  2 +-
 ...HeapKeyedStateBackendAsyncByDefaultTest.java | 86 ++++++++++++++++++++
 5 files changed, 93 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2906698b/docs/ops/state/large_state_tuning.md
----------------------------------------------------------------------
diff --git a/docs/ops/state/large_state_tuning.md 
b/docs/ops/state/large_state_tuning.md
index dd3e404..85ffd99 100644
--- a/docs/ops/state/large_state_tuning.md
+++ b/docs/ops/state/large_state_tuning.md
@@ -113,11 +113,9 @@ To get state to be snapshotted asynchronously, 
applications have to do two thing
      interfaces like `ValueState`, `ListState`, `ReducingState`, ...
 
   2. Use a state backend that supports asynchronous snapshots. In Flink 1.2, 
only the RocksDB state backend uses
-     fully asynchronous snapshots.
-
-The above two points imply that (in Flink 1.2) large state should generally be 
kept as keyed state, not as operator state.
-This is subject to change with the planned introduction of *managed operator 
state*.
+     fully asynchronous snapshots. Starting from Flink 1.3, heap-based state 
backends also support asynchronous snapshots.
 
+The above two points imply that large state should generally be kept as keyed 
state, not as operator state.
 
 ## Tuning RocksDB
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2906698b/docs/ops/state/state_backends.md
----------------------------------------------------------------------
diff --git a/docs/ops/state/state_backends.md b/docs/ops/state/state_backends.md
index 422df3e..b32ad9f 100644
--- a/docs/ops/state/state_backends.md
+++ b/docs/ops/state/state_backends.md
@@ -81,11 +81,10 @@ The *FsStateBackend* is configured with a file system URL 
(type, address, path),
 
 The FsStateBackend holds in-flight data in the TaskManager's memory. Upon 
checkpointing, it writes state snapshots into files in the configured file 
system and directory. Minimal metadata is stored in the JobManager's memory 
(or, in high-availability mode, in the metadata checkpoint).
 
-The FsStateBackend can be configured to use asynchronous snapshots. While we 
strongly encourage the use of asynchronous snapshots to avoid blocking 
pipelines, please note that this is a new feature and currently not enabled 
-by default. To enable this feature, users can instantiate a `FsStateBackend` 
with the corresponding boolean flag in the constructor set to `true`, e.g.:
+The FsStateBackend uses *asynchronous snapshots by default* to avoid blocking 
the processing pipeline while writing state checkpoints. To disable this 
feature, users can instantiate a `FsStateBackend` with the corresponding 
boolean flag in the constructor set to `false`, e.g.:
 
 {% highlight java %}
-    new FsStateBackend(path, true);
+    new FsStateBackend(path, false);
 {% endhighlight %}
 
 The FsStateBackend is encouraged for:

http://git-wip-us.apache.org/repos/asf/flink/blob/2906698b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index ddfa85c..952988f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -163,7 +163,7 @@ public class FsStateBackend extends AbstractStateBackend {
         * @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
         */
        public FsStateBackend(URI checkpointDataUri) throws IOException {
-               this(checkpointDataUri, DEFAULT_FILE_STATE_THRESHOLD, false);
+               this(checkpointDataUri, DEFAULT_FILE_STATE_THRESHOLD, true);
        }
 
        /**
@@ -208,7 +208,7 @@ public class FsStateBackend extends AbstractStateBackend {
         */
        public FsStateBackend(URI checkpointDataUri, int 
fileStateSizeThreshold) throws IOException {
 
-               this(checkpointDataUri, fileStateSizeThreshold, false);
+               this(checkpointDataUri, fileStateSizeThreshold, true);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/2906698b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index 7ed1dea..b8ebedf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -65,7 +65,7 @@ public class MemoryStateBackend extends AbstractStateBackend {
         * @param maxStateSize The maximal size of the serialized state
         */
        public MemoryStateBackend(int maxStateSize) {
-               this(maxStateSize, false);
+               this(maxStateSize, true);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/2906698b/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java
new file mode 100644
index 0000000..ac4cbeb
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.util.IOUtils;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * This tests that all heap-based {@link StateBackend}s create {@link 
KeyedStateBackend}s that use asynchronous
+ * snapshots by default.
+ */
+public class HeapKeyedStateBackendAsyncByDefaultTest {
+
+       @Test
+       public void testFsStateBackendDefaultsToAsync() throws Exception {
+               TemporaryFolder temporaryFolder = new TemporaryFolder();
+               temporaryFolder.create();
+
+               File folder = temporaryFolder.newFolder();
+
+               try {
+                       // This backend has two constructors that use a default 
value for async snapshots.
+                       FsStateBackend fsStateBackend = new 
FsStateBackend(folder.toURI());
+                       validateSupportForAsyncSnapshots(fsStateBackend);
+
+                       fsStateBackend = new FsStateBackend(folder.toURI(), 
1024);
+                       validateSupportForAsyncSnapshots(fsStateBackend);
+               } finally {
+                       folder.delete();
+                       temporaryFolder.delete();
+               }
+       }
+
+       @Test
+       public void testMemoryStateBackendDefaultsToAsync() throws Exception {
+               MemoryStateBackend memoryStateBackend = new 
MemoryStateBackend();
+               validateSupportForAsyncSnapshots(memoryStateBackend);
+       }
+
+       private void validateSupportForAsyncSnapshots(AbstractStateBackend 
backend) throws IOException {
+
+               AbstractKeyedStateBackend<Integer> keyedStateBackend = 
backend.createKeyedStateBackend(
+                       new DummyEnvironment("Test", 1, 0),
+                       new JobID(),
+                       "testOperator",
+                       IntSerializer.INSTANCE,
+                       1,
+                       new KeyGroupRange(0, 0),
+                       null
+               );
+
+               try {
+                       
Assert.assertTrue(keyedStateBackend.supportsAsynchronousSnapshots());
+               } finally {
+                       IOUtils.closeQuietly(keyedStateBackend);
+                       keyedStateBackend.dispose();
+               }
+       }
+}

Reply via email to