Repository: flink Updated Branches: refs/heads/master 28ff5a3c9 -> 95e9004e3
[FLINK-4218] [checkpoints] Do not rely on FileSystem to determing state sizes This prevents failures on eventually consistent S3, where the operations for keys (=entries in the parent directory/bucket) are not guaranteed to be immediately consistent (visible) after a blob was written. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/95e9004e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/95e9004e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/95e9004e Branch: refs/heads/master Commit: 95e9004e36fffae755eab7aa3d5d0d5e8bfb7113 Parents: 6f237cf Author: Stephan Ewen <se...@apache.org> Authored: Fri Sep 23 15:16:27 2016 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Mon Sep 26 14:11:05 2016 +0200 ---------------------------------------------------------------------- .../runtime/checkpoint/CompletedCheckpoint.java | 2 +- .../flink/runtime/checkpoint/TaskState.java | 2 +- .../savepoint/SavepointV1Serializer.java | 6 ++-- .../flink/runtime/state/ChainedStateHandle.java | 2 +- .../runtime/state/KeyGroupsStateHandle.java | 2 +- .../state/RetrievableStreamStateHandle.java | 9 +++--- .../apache/flink/runtime/state/StateObject.java | 6 ++-- .../state/filesystem/FileStateHandle.java | 32 ++++++++++++-------- .../filesystem/FsCheckpointStreamFactory.java | 9 +++++- .../FileSystemStateStorageHelper.java | 15 +++------ ...ZooKeeperCompletedCheckpointStoreITCase.java | 2 +- .../stats/SimpleCheckpointStatsTrackerTest.java | 18 +++-------- .../state/AbstractCloseableHandleTest.java | 6 ++-- .../tasks/InterruptSensitiveRestoreTest.java | 3 +- 14 files changed, 58 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java index e412006..7cb3916 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java @@ -108,7 +108,7 @@ public class CompletedCheckpoint implements StateObject { } @Override - public long getStateSize() throws Exception { + public long getStateSize() throws IOException { long result = 0L; for (TaskState taskState : taskStates.values()) { http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java index 9025090..657dd60 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java @@ -152,7 +152,7 @@ public class TaskState implements StateObject { @Override - public long getStateSize() throws Exception { + public long getStateSize() throws IOException { long result = 0L; for (int i = 0; i < parallelism; i++) { http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java index 8e05b81..f07f44f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java @@ -197,6 +197,7 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> { } else if (stateHandle instanceof FileStateHandle) { dos.writeByte(FILE_STREAM_STATE_HANDLE); FileStateHandle fileStateHandle = (FileStateHandle) stateHandle; + dos.writeLong(stateHandle.getStateSize()); dos.writeUTF(fileStateHandle.getFilePath().toString()); } else if (stateHandle instanceof ByteStreamStateHandle) { @@ -218,12 +219,13 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> { if (NULL_HANDLE == type) { return null; } else if (FILE_STREAM_STATE_HANDLE == type) { + long size = dis.readLong(); String pathString = dis.readUTF(); - return new FileStateHandle(new Path(pathString)); + return new FileStateHandle(new Path(pathString), size); } else if (BYTE_STREAM_STATE_HANDLE == type) { int numBytes = dis.readInt(); byte[] data = new byte[numBytes]; - dis.read(data); + dis.readFully(data); return new ByteStreamStateHandle(data); } else { throw new IOException("Unknown implementation of StreamStateHandle, code: " + type); http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java index 9b308a3..74057ee 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java @@ -85,7 +85,7 @@ public class ChainedStateHandle<T extends StateObject> implements StateObject { } @Override - public long getStateSize() throws Exception { + public long getStateSize() throws IOException { long sumStateSize = 0; if (operatorStateHandles != null) { http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java index 0a36f92..7f87e86 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java @@ -118,7 +118,7 @@ public class KeyGroupsStateHandle implements StateObject { } @Override - public long getStateSize() throws Exception { + public long getStateSize() throws IOException { return stateHandle.getStateSize(); } http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java index c6fd02c..9ecc4c9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java @@ -20,8 +20,6 @@ package org.apache.flink.runtime.state; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.Path; -import org.apache.flink.runtime.state.RetrievableStateHandle; -import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.filesystem.FileStateHandle; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; @@ -41,6 +39,7 @@ public class RetrievableStreamStateHandle<T extends Serializable> implements StreamStateHandle, RetrievableStateHandle<T>, Closeable { private static final long serialVersionUID = 314567453677355L; + /** wrapped inner stream state handle from which we deserialize on retrieval */ private final StreamStateHandle wrappedStreamStateHandle; @@ -48,9 +47,9 @@ public class RetrievableStreamStateHandle<T extends Serializable> implements this.wrappedStreamStateHandle = Preconditions.checkNotNull(streamStateHandle); } - public RetrievableStreamStateHandle(Path filePath) { + public RetrievableStreamStateHandle(Path filePath, long stateSize) { Preconditions.checkNotNull(filePath); - this.wrappedStreamStateHandle = new FileStateHandle(filePath); + this.wrappedStreamStateHandle = new FileStateHandle(filePath, stateSize); } @Override @@ -71,7 +70,7 @@ public class RetrievableStreamStateHandle<T extends Serializable> implements } @Override - public long getStateSize() throws Exception { + public long getStateSize() throws IOException { return wrappedStreamStateHandle.getStateSize(); } http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java index 47103c1..4c65318 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.state; +import java.io.IOException; + /** * Base of all types that represent checkpointed state. Specializations are for * example {@link StateHandle StateHandles} (directly resolve to state). @@ -47,7 +49,7 @@ public interface StateObject extends java.io.Closeable, java.io.Serializable { * <p>If the the size is not known, return {@code 0}. * * @return Size of the state in bytes. - * @throws Exception If the operation fails during size retrieval. + * @throws IOException If the operation fails during size retrieval. */ - long getStateSize() throws Exception; + long getStateSize() throws IOException; } http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java index 5ae751b..f361263 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java @@ -26,7 +26,9 @@ import org.apache.flink.runtime.state.StreamStateHandle; import java.io.IOException; -import static java.util.Objects.requireNonNull; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * {@link StreamStateHandle} for state that was written to a file stream. The written data is @@ -36,14 +38,13 @@ public class FileStateHandle extends AbstractCloseableHandle implements StreamSt private static final long serialVersionUID = 350284443258002355L; - /** - * The path to the file in the filesystem, fully describing the file system - */ + /** The path to the file in the filesystem, fully describing the file system */ private final Path filePath; - /** - * Cached file system handle - */ + /** The size of the state in the file */ + private final long stateSize; + + /** Cached file system handle */ private transient FileSystem fs; /** @@ -51,8 +52,10 @@ public class FileStateHandle extends AbstractCloseableHandle implements StreamSt * * @param filePath The path to the file that stores the state. */ - public FileStateHandle(Path filePath) { - this.filePath = requireNonNull(filePath); + public FileStateHandle(Path filePath, long stateSize) { + checkArgument(stateSize >= -1); + this.filePath = checkNotNull(filePath); + this.stateSize = stateSize; } /** @@ -86,8 +89,7 @@ public class FileStateHandle extends AbstractCloseableHandle implements StreamSt // fail (and be ignored) when some files still exist try { getFileSystem().delete(filePath.getParent(), false); - } catch (IOException ignored) { - } + } catch (IOException ignored) {} } /** @@ -98,7 +100,7 @@ public class FileStateHandle extends AbstractCloseableHandle implements StreamSt */ @Override public long getStateSize() throws IOException { - return getFileSystem().getFileStatus(filePath).getLen(); + return stateSize; } /** @@ -114,6 +116,7 @@ public class FileStateHandle extends AbstractCloseableHandle implements StreamSt return fs; } + // ------------------------------------------------------------------------ @Override public boolean equals(Object o) { @@ -133,4 +136,9 @@ public class FileStateHandle extends AbstractCloseableHandle implements StreamSt public int hashCode() { return filePath.hashCode(); } + + @Override + public String toString() { + return String.format("File State: %s [%d bytes]", filePath, stateSize); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java index c027558..e4f7eba 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java @@ -301,9 +301,16 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory { } else { flush(); + + long size = -1; + // make a best effort attempt to figure out the size + try { + size = outStream.getPos(); + } catch (Exception ignored) {} + outStream.close(); closed = true; - return new FileStateHandle(statePath); + return new FileStateHandle(statePath, size); } } else { http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java index a534b40..7658afb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java @@ -25,10 +25,10 @@ import org.apache.flink.runtime.state.RetrievableStateHandle; import org.apache.flink.runtime.state.RetrievableStreamStateHandle; import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper; import org.apache.flink.util.FileUtils; +import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; import java.io.IOException; -import java.io.ObjectOutputStream; import java.io.Serializable; /** @@ -61,19 +61,14 @@ public class FileSystemStateStorageHelper<T extends Serializable> implements Ret for (int attempt = 0; attempt < 10; attempt++) { Path filePath = getNewFilePath(); - FSDataOutputStream outStream; - try { - outStream = fs.create(filePath, false); + + try (FSDataOutputStream outStream = fs.create(filePath, false)) { + InstantiationUtil.serializeObject(outStream, state); + return new RetrievableStreamStateHandle<T>(filePath, outStream.getPos()); } catch (Exception e) { latestException = e; - continue; - } - - try(ObjectOutputStream os = new ObjectOutputStream(outStream)) { - os.writeObject(state); } - return new RetrievableStreamStateHandle<T>(filePath); } throw new Exception("Could not open output stream for state backend", latestException); http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java index f273797..6a8d072 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java @@ -183,7 +183,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint } @Override - public long getStateSize() throws Exception { + public long getStateSize() throws IOException { return 0; } http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java index c513e26..504143b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java @@ -334,7 +334,7 @@ public class SimpleCheckpointStatsTrackerTest { StreamStateHandle proxy = new StateHandleProxy(new Path(), proxySize); SubtaskState subtaskState = new SubtaskState( - new ChainedStateHandle<>(Arrays.asList(proxy)), + new ChainedStateHandle<>(Collections.singletonList(proxy)), duration); taskState.putState(subtaskIndex, subtaskState); @@ -371,21 +371,11 @@ public class SimpleCheckpointStatsTrackerTest { private static final long serialVersionUID = 35356735683568L; - public StateHandleProxy(Path filePath, long proxySize) { - super(filePath); - this.proxySize = proxySize; - } - - private long proxySize; - - @Override - public void discardState() throws Exception { - + public StateHandleProxy(Path filePath, long size) { + super(filePath, size); } @Override - public long getStateSize() { - return proxySize; - } + public void discardState() {} } } http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractCloseableHandleTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractCloseableHandleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractCloseableHandleTest.java index 40e1852..e613105 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractCloseableHandleTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractCloseableHandleTest.java @@ -87,12 +87,10 @@ public class AbstractCloseableHandleTest { private static final long serialVersionUID = 1L; @Override - public void discardState() throws Exception { - - } + public void discardState() {} @Override - public long getStateSize() throws Exception { + public long getStateSize() { return 0; } } http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java index 47f1bd5..9f52e9c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java @@ -198,6 +198,7 @@ public class InterruptSensitiveRestoreTest { // an interrupt on a waiting object leads to an infinite loop try { synchronized (this) { + //noinspection WaitNotInLoop wait(); } } @@ -216,7 +217,7 @@ public class InterruptSensitiveRestoreTest { public void discardState() throws Exception {} @Override - public long getStateSize() throws Exception { + public long getStateSize() throws IOException { return 0; } }