Repository: flink
Updated Branches:
  refs/heads/master 28ff5a3c9 -> 95e9004e3


[FLINK-4218] [checkpoints] Do not rely on FileSystem to determing state sizes

This prevents failures on eventually consistent S3, where the operations for
keys (=entries in the parent directory/bucket) are not guaranteed to be 
immediately
 consistent (visible) after a blob was written.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/95e9004e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/95e9004e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/95e9004e

Branch: refs/heads/master
Commit: 95e9004e36fffae755eab7aa3d5d0d5e8bfb7113
Parents: 6f237cf
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Sep 23 15:16:27 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Sep 26 14:11:05 2016 +0200

----------------------------------------------------------------------
 .../runtime/checkpoint/CompletedCheckpoint.java |  2 +-
 .../flink/runtime/checkpoint/TaskState.java     |  2 +-
 .../savepoint/SavepointV1Serializer.java        |  6 ++--
 .../flink/runtime/state/ChainedStateHandle.java |  2 +-
 .../runtime/state/KeyGroupsStateHandle.java     |  2 +-
 .../state/RetrievableStreamStateHandle.java     |  9 +++---
 .../apache/flink/runtime/state/StateObject.java |  6 ++--
 .../state/filesystem/FileStateHandle.java       | 32 ++++++++++++--------
 .../filesystem/FsCheckpointStreamFactory.java   |  9 +++++-
 .../FileSystemStateStorageHelper.java           | 15 +++------
 ...ZooKeeperCompletedCheckpointStoreITCase.java |  2 +-
 .../stats/SimpleCheckpointStatsTrackerTest.java | 18 +++--------
 .../state/AbstractCloseableHandleTest.java      |  6 ++--
 .../tasks/InterruptSensitiveRestoreTest.java    |  3 +-
 14 files changed, 58 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index e412006..7cb3916 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -108,7 +108,7 @@ public class CompletedCheckpoint implements StateObject {
        }
 
        @Override
-       public long getStateSize() throws Exception {
+       public long getStateSize() throws IOException {
                long result = 0L;
 
                for (TaskState taskState : taskStates.values()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
index 9025090..657dd60 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
@@ -152,7 +152,7 @@ public class TaskState implements StateObject {
 
 
        @Override
-       public long getStateSize() throws Exception {
+       public long getStateSize() throws IOException {
                long result = 0L;
 
                for (int i = 0; i < parallelism; i++) {

http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
index 8e05b81..f07f44f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
@@ -197,6 +197,7 @@ class SavepointV1Serializer implements 
SavepointSerializer<SavepointV1> {
                } else if (stateHandle instanceof FileStateHandle) {
                        dos.writeByte(FILE_STREAM_STATE_HANDLE);
                        FileStateHandle fileStateHandle = (FileStateHandle) 
stateHandle;
+                       dos.writeLong(stateHandle.getStateSize());
                        dos.writeUTF(fileStateHandle.getFilePath().toString());
 
                } else if (stateHandle instanceof ByteStreamStateHandle) {
@@ -218,12 +219,13 @@ class SavepointV1Serializer implements 
SavepointSerializer<SavepointV1> {
                if (NULL_HANDLE == type) {
                        return null;
                } else if (FILE_STREAM_STATE_HANDLE == type) {
+                       long size = dis.readLong();
                        String pathString = dis.readUTF();
-                       return new FileStateHandle(new Path(pathString));
+                       return new FileStateHandle(new Path(pathString), size);
                } else if (BYTE_STREAM_STATE_HANDLE == type) {
                        int numBytes = dis.readInt();
                        byte[] data = new byte[numBytes];
-                       dis.read(data);
+                       dis.readFully(data);
                        return new ByteStreamStateHandle(data);
                } else {
                        throw new IOException("Unknown implementation of 
StreamStateHandle, code: " + type);

http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java
index 9b308a3..74057ee 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java
@@ -85,7 +85,7 @@ public class ChainedStateHandle<T extends StateObject> 
implements StateObject {
        }
 
        @Override
-       public long getStateSize() throws Exception {
+       public long getStateSize() throws IOException {
                long sumStateSize = 0;
 
                if (operatorStateHandles != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
index 0a36f92..7f87e86 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
@@ -118,7 +118,7 @@ public class KeyGroupsStateHandle implements StateObject {
        }
 
        @Override
-       public long getStateSize() throws Exception {
+       public long getStateSize() throws IOException {
                return stateHandle.getStateSize();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
index c6fd02c..9ecc4c9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
@@ -20,8 +20,6 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.RetrievableStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
@@ -41,6 +39,7 @@ public class RetrievableStreamStateHandle<T extends 
Serializable> implements
                StreamStateHandle, RetrievableStateHandle<T>, Closeable {
 
        private static final long serialVersionUID = 314567453677355L;
+
        /** wrapped inner stream state handle from which we deserialize on 
retrieval */
        private final StreamStateHandle wrappedStreamStateHandle;
 
@@ -48,9 +47,9 @@ public class RetrievableStreamStateHandle<T extends 
Serializable> implements
                this.wrappedStreamStateHandle = 
Preconditions.checkNotNull(streamStateHandle);
        }
 
-       public RetrievableStreamStateHandle(Path filePath) {
+       public RetrievableStreamStateHandle(Path filePath, long stateSize) {
                Preconditions.checkNotNull(filePath);
-               this.wrappedStreamStateHandle = new FileStateHandle(filePath);
+               this.wrappedStreamStateHandle = new FileStateHandle(filePath, 
stateSize);
        }
 
        @Override
@@ -71,7 +70,7 @@ public class RetrievableStreamStateHandle<T extends 
Serializable> implements
        }
 
        @Override
-       public long getStateSize() throws Exception {
+       public long getStateSize() throws IOException {
                return wrappedStreamStateHandle.getStateSize();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java
index 47103c1..4c65318 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.state;
 
+import java.io.IOException;
+
 /**
  * Base of all types that represent checkpointed state. Specializations are for
  * example {@link StateHandle StateHandles} (directly resolve to state).
@@ -47,7 +49,7 @@ public interface StateObject extends java.io.Closeable, 
java.io.Serializable {
         * <p>If the the size is not known, return {@code 0}.
         *
         * @return Size of the state in bytes.
-        * @throws Exception If the operation fails during size retrieval.
+        * @throws IOException If the operation fails during size retrieval.
         */
-       long getStateSize() throws Exception;
+       long getStateSize() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
index 5ae751b..f361263 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
@@ -26,7 +26,9 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 
 import java.io.IOException;
 
-import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 
 /**
  * {@link StreamStateHandle} for state that was written to a file stream. The 
written data is
@@ -36,14 +38,13 @@ public class FileStateHandle extends 
AbstractCloseableHandle implements StreamSt
 
        private static final long serialVersionUID = 350284443258002355L;
 
-       /**
-        * The path to the file in the filesystem, fully describing the file 
system
-        */
+       /** The path to the file in the filesystem, fully describing the file 
system */
        private final Path filePath;
 
-       /**
-        * Cached file system handle
-        */
+       /** The size of the state in the file */
+       private final long stateSize;
+
+       /** Cached file system handle */
        private transient FileSystem fs;
 
        /**
@@ -51,8 +52,10 @@ public class FileStateHandle extends AbstractCloseableHandle 
implements StreamSt
         *
         * @param filePath The path to the file that stores the state.
         */
-       public FileStateHandle(Path filePath) {
-               this.filePath = requireNonNull(filePath);
+       public FileStateHandle(Path filePath, long stateSize) {
+               checkArgument(stateSize >= -1);
+               this.filePath = checkNotNull(filePath);
+               this.stateSize = stateSize;
        }
 
        /**
@@ -86,8 +89,7 @@ public class FileStateHandle extends AbstractCloseableHandle 
implements StreamSt
                // fail (and be ignored) when some files still exist
                try {
                        getFileSystem().delete(filePath.getParent(), false);
-               } catch (IOException ignored) {
-               }
+               } catch (IOException ignored) {}
        }
 
        /**
@@ -98,7 +100,7 @@ public class FileStateHandle extends AbstractCloseableHandle 
implements StreamSt
         */
        @Override
        public long getStateSize() throws IOException {
-               return getFileSystem().getFileStatus(filePath).getLen();
+               return stateSize;
        }
 
        /**
@@ -114,6 +116,7 @@ public class FileStateHandle extends 
AbstractCloseableHandle implements StreamSt
                return fs;
        }
 
+       // 
------------------------------------------------------------------------
 
        @Override
        public boolean equals(Object o) {
@@ -133,4 +136,9 @@ public class FileStateHandle extends 
AbstractCloseableHandle implements StreamSt
        public int hashCode() {
                return filePath.hashCode();
        }
+
+       @Override
+       public String toString() {
+               return String.format("File State: %s [%d bytes]", filePath, 
stateSize);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
index c027558..e4f7eba 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
@@ -301,9 +301,16 @@ public class FsCheckpointStreamFactory implements 
CheckpointStreamFactory {
                                        }
                                        else {
                                                flush();
+
+                                               long size = -1;
+                                               // make a best effort attempt 
to figure out the size
+                                               try {
+                                                       size = 
outStream.getPos();
+                                               } catch (Exception ignored) {}
+                                               
                                                outStream.close();
                                                closed = true;
-                                               return new 
FileStateHandle(statePath);
+                                               return new 
FileStateHandle(statePath, size);
                                        }
                                }
                                else {

http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java
index a534b40..7658afb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java
@@ -25,10 +25,10 @@ import 
org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.state.RetrievableStreamStateHandle;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
-import java.io.ObjectOutputStream;
 import java.io.Serializable;
 
 /**
@@ -61,19 +61,14 @@ public class FileSystemStateStorageHelper<T extends 
Serializable> implements Ret
 
                for (int attempt = 0; attempt < 10; attempt++) {
                        Path filePath = getNewFilePath();
-                       FSDataOutputStream outStream;
-                       try {
-                               outStream = fs.create(filePath, false);
+
+                       try (FSDataOutputStream outStream = fs.create(filePath, 
false)) {
+                               InstantiationUtil.serializeObject(outStream, 
state);
+                               return new 
RetrievableStreamStateHandle<T>(filePath, outStream.getPos());
                        }
                        catch (Exception e) {
                                latestException = e;
-                               continue;
-                       }
-
-                       try(ObjectOutputStream os = new 
ObjectOutputStream(outStream)) {
-                               os.writeObject(state);
                        }
-                       return new RetrievableStreamStateHandle<T>(filePath);
                }
 
                throw new Exception("Could not open output stream for state 
backend", latestException);

http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index f273797..6a8d072 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -183,7 +183,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
                }
 
                @Override
-               public long getStateSize() throws Exception {
+               public long getStateSize() throws IOException {
                        return 0;
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
index c513e26..504143b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
@@ -334,7 +334,7 @@ public class SimpleCheckpointStatsTrackerTest {
                                        StreamStateHandle proxy = new 
StateHandleProxy(new Path(), proxySize);
 
                                        SubtaskState subtaskState = new 
SubtaskState(
-                                               new 
ChainedStateHandle<>(Arrays.asList(proxy)),
+                                               new 
ChainedStateHandle<>(Collections.singletonList(proxy)),
                                                duration);
 
                                        taskState.putState(subtaskIndex, 
subtaskState);
@@ -371,21 +371,11 @@ public class SimpleCheckpointStatsTrackerTest {
 
                private static final long serialVersionUID = 35356735683568L;
 
-               public StateHandleProxy(Path filePath, long proxySize) {
-                       super(filePath);
-                       this.proxySize = proxySize;
-               }
-
-               private long proxySize;
-
-               @Override
-               public void discardState() throws Exception {
-
+               public StateHandleProxy(Path filePath, long size) {
+                       super(filePath, size);
                }
 
                @Override
-               public long getStateSize() {
-                       return proxySize;
-               }
+               public void discardState() {}
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractCloseableHandleTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractCloseableHandleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractCloseableHandleTest.java
index 40e1852..e613105 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractCloseableHandleTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractCloseableHandleTest.java
@@ -87,12 +87,10 @@ public class AbstractCloseableHandleTest {
                private static final long serialVersionUID = 1L;
 
                @Override
-               public void discardState() throws Exception {
-
-               }
+               public void discardState() {}
 
                @Override
-               public long getStateSize() throws Exception {
+               public long getStateSize() {
                        return 0;
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index 47f1bd5..9f52e9c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -198,6 +198,7 @@ public class InterruptSensitiveRestoreTest {
                        // an interrupt on a waiting object leads to an 
infinite loop
                        try {
                                synchronized (this) {
+                                       //noinspection WaitNotInLoop
                                        wait();
                                }
                        }
@@ -216,7 +217,7 @@ public class InterruptSensitiveRestoreTest {
                public void discardState() throws Exception {}
 
                @Override
-               public long getStateSize() throws Exception {
+               public long getStateSize() throws IOException {
                        return 0;
                }
        }

Reply via email to