http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java index dea3452..d62b13e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java @@ -22,7 +22,7 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.utils.ZKPaths; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.RetrievableStateHandle; import org.apache.flink.util.InstantiationUtil; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -40,9 +40,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** * State handles backed by ZooKeeper. * - * <p>Added state is persisted via {@link StateHandle}s, which in turn are written to - * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper - * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs. + * <p>Added state is persisted via {@link RetrievableStateHandle RetrievableStateHandles}, + * which in turn are written to ZooKeeper. This level of indirection is necessary to keep the + * amount of data in ZooKeeper small. ZooKeeper is build for data in the KB range whereas + * state can grow to multiple MBs. * * <p>State modifications require some care, because it is possible that certain failures bring * the state handle backend and ZooKeeper out of sync. @@ -72,7 +73,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> { /** Curator ZooKeeper client */ private final CuratorFramework client; - private final StateStorageHelper<T> storage; + private final RetrievableStateStorageHelper<T> storage; /** * Creates a {@link ZooKeeperStateHandleStore}. @@ -84,7 +85,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> { */ public ZooKeeperStateHandleStore( CuratorFramework client, - StateStorageHelper storage) throws IOException { + RetrievableStateStorageHelper<T> storage) throws IOException { this.client = checkNotNull(client, "Curator client"); this.storage = checkNotNull(storage, "State storage"); @@ -94,9 +95,9 @@ public class ZooKeeperStateHandleStore<T extends Serializable> { * Creates a state handle and stores it in ZooKeeper with create mode {@link * CreateMode#PERSISTENT}. * - * @see #add(String, Serializable, CreateMode) + * @see #add(String, T, CreateMode) */ - public StateHandle<T> add(String pathInZooKeeper, T state) throws Exception { + public RetrievableStateHandle<T> add(String pathInZooKeeper, T state) throws Exception { return add(pathInZooKeeper, state, CreateMode.PERSISTENT); } @@ -111,39 +112,39 @@ public class ZooKeeperStateHandleStore<T extends Serializable> { * start with a '/') * @param state State to be added * @param createMode The create mode for the new path in ZooKeeper - * @return Created {@link StateHandle} + * + * @return The Created {@link RetrievableStateHandle}. * @throws Exception If a ZooKeeper or state handle operation fails */ - public StateHandle<T> add( + public RetrievableStateHandle<T> add( String pathInZooKeeper, T state, CreateMode createMode) throws Exception { checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); checkNotNull(state, "State"); - StateHandle<T> stateHandle = storage.store(state); + RetrievableStateHandle<T> storeHandle = storage.store(state); boolean success = false; try { // Serialize the state handle. This writes the state to the backend. - byte[] serializedStateHandle = InstantiationUtil.serializeObject(stateHandle); + byte[] serializedStoreHandle = InstantiationUtil.serializeObject(storeHandle); // Write state handle (not the actual state) to ZooKeeper. This is expected to be // smaller than the state itself. This level of indirection makes sure that data in // ZooKeeper is small, because ZooKeeper is designed for data in the KB range, but // the state can be larger. - client.create().withMode(createMode).forPath(pathInZooKeeper, serializedStateHandle); + client.create().withMode(createMode).forPath(pathInZooKeeper, serializedStoreHandle); success = true; - - return stateHandle; + return storeHandle; } finally { if (!success) { // Cleanup the state handle if it was not written to ZooKeeper. - if (stateHandle != null) { - stateHandle.discardState(); + if (storeHandle != null) { + storeHandle.discardState(); } } } @@ -161,31 +162,29 @@ public class ZooKeeperStateHandleStore<T extends Serializable> { checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); checkNotNull(state, "State"); - StateHandle<T> oldStateHandle = get(pathInZooKeeper); + RetrievableStateHandle<T> oldStateHandle = get(pathInZooKeeper); - StateHandle<T> stateHandle = storage.store(state); + RetrievableStateHandle<T> newStateHandle = storage.store(state); boolean success = false; try { // Serialize the new state handle. This writes the state to the backend. - byte[] serializedStateHandle = InstantiationUtil.serializeObject(stateHandle); + byte[] serializedStateHandle = InstantiationUtil.serializeObject(newStateHandle); // Replace state handle in ZooKeeper. client.setData() .withVersion(expectedVersion) .forPath(pathInZooKeeper, serializedStateHandle); - success = true; - } - finally { - if (success) { + } finally { + if(success) { oldStateHandle.discardState(); - } - else { - stateHandle.discardState(); + } else { + newStateHandle.discardState(); } } + } /** @@ -216,13 +215,11 @@ public class ZooKeeperStateHandleStore<T extends Serializable> { * @throws Exception If a ZooKeeper or state handle operation fails */ @SuppressWarnings("unchecked") - public StateHandle<T> get(String pathInZooKeeper) throws Exception { + public RetrievableStateHandle<T> get(String pathInZooKeeper) throws Exception { checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); byte[] data = client.getData().forPath(pathInZooKeeper); - - return (StateHandle<T>) InstantiationUtil - .deserializeObject(data, ClassLoader.getSystemClassLoader()); + return InstantiationUtil.deserializeObject(data); } /** @@ -234,8 +231,8 @@ public class ZooKeeperStateHandleStore<T extends Serializable> { * @throws Exception If a ZooKeeper or state handle operation fails */ @SuppressWarnings("unchecked") - public List<Tuple2<StateHandle<T>, String>> getAll() throws Exception { - final List<Tuple2<StateHandle<T>, String>> stateHandles = new ArrayList<>(); + public List<Tuple2<RetrievableStateHandle<T>, String>> getAll() throws Exception { + final List<Tuple2<RetrievableStateHandle<T>, String>> stateHandles = new ArrayList<>(); boolean success = false; @@ -254,7 +251,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> { path = "/" + path; try { - final StateHandle<T> stateHandle = get(path); + final RetrievableStateHandle<T> stateHandle = get(path); stateHandles.add(new Tuple2<>(stateHandle, path)); } catch (KeeperException.NoNodeException ignored) { // Concurrent deletion, retry @@ -272,6 +269,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> { return stateHandles; } + /** * Gets all available state handles from ZooKeeper sorted by name (ascending). * @@ -281,8 +279,8 @@ public class ZooKeeperStateHandleStore<T extends Serializable> { * @throws Exception If a ZooKeeper or state handle operation fails */ @SuppressWarnings("unchecked") - public List<Tuple2<StateHandle<T>, String>> getAllSortedByName() throws Exception { - final List<Tuple2<StateHandle<T>, String>> stateHandles = new ArrayList<>(); + public List<Tuple2<RetrievableStateHandle<T>, String>> getAllSortedByName() throws Exception { + final List<Tuple2<RetrievableStateHandle<T>, String>> stateHandles = new ArrayList<>(); boolean success = false; @@ -303,7 +301,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> { path = "/" + path; try { - final StateHandle<T> stateHandle = get(path); + final RetrievableStateHandle<T> stateHandle = get(path); stateHandles.add(new Tuple2<>(stateHandle, path)); } catch (KeeperException.NoNodeException ignored) { // Concurrent deletion, retry @@ -364,7 +362,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> { public void removeAndDiscardState(String pathInZooKeeper) throws Exception { checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); - StateHandle<T> stateHandle = get(pathInZooKeeper); + RetrievableStateHandle<T> stateHandle = get(pathInZooKeeper); // Delete the state handle from ZooKeeper first client.delete().deletingChildrenIfNeeded().forPath(pathInZooKeeper); @@ -381,7 +379,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> { * @throws Exception If a ZooKeeper or state handle operation fails */ public void removeAndDiscardAllState() throws Exception { - final List<Tuple2<StateHandle<T>, String>> allStateHandles = getAll(); + final List<Tuple2<RetrievableStateHandle<T>, String>> allStateHandles = getAll(); ZKPaths.deleteChildren( client.getZookeeperClient().getZooKeeper(), @@ -389,7 +387,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> { false); // Discard the state handles only after they have been successfully deleted from ZooKeeper. - for (Tuple2<StateHandle<T>, String> stateHandleAndPath : allStateHandles) { + for (Tuple2<RetrievableStateHandle<T>, String> stateHandleAndPath : allStateHandles) { stateHandleAndPath.f0.discardState(); } }
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java index 6692ef0..a534b40 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java @@ -21,22 +21,22 @@ package org.apache.flink.runtime.zookeeper.filesystem; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; -import org.apache.flink.runtime.state.StateHandle; -import org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle; +import org.apache.flink.runtime.state.RetrievableStateHandle; +import org.apache.flink.runtime.state.RetrievableStreamStateHandle; +import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper; import org.apache.flink.util.FileUtils; import org.apache.flink.util.Preconditions; -import org.apache.flink.runtime.zookeeper.StateStorageHelper; import java.io.IOException; import java.io.ObjectOutputStream; import java.io.Serializable; /** - * {@link StateStorageHelper} implementation which stores the state in the given filesystem path. + * {@link RetrievableStateStorageHelper} implementation which stores the state in the given filesystem path. * - * @param <T> + * @param <T> The type of the data that can be stored by this storage helper. */ -public class FileSystemStateStorageHelper<T extends Serializable> implements StateStorageHelper<T> { +public class FileSystemStateStorageHelper<T extends Serializable> implements RetrievableStateStorageHelper<T> { private final Path rootPath; @@ -56,7 +56,7 @@ public class FileSystemStateStorageHelper<T extends Serializable> implements Sta } @Override - public StateHandle<T> store(T state) throws Exception { + public RetrievableStateHandle<T> store(T state) throws Exception { Exception latestException = null; for (int attempt = 0; attempt < 10; attempt++) { @@ -73,8 +73,7 @@ public class FileSystemStateStorageHelper<T extends Serializable> implements Sta try(ObjectOutputStream os = new ObjectOutputStream(outStream)) { os.writeObject(state); } - - return new FileSerializableStateHandle<>(filePath); + return new RetrievableStreamStateHandle<T>(filePath); } throw new Exception("Could not open output stream for state backend", latestException); http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 356f1a9..407fa01 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -738,29 +738,18 @@ class JobManager( sender() ! TriggerSavepointFailure(jobId, new IllegalArgumentException("Unknown job.")) } - case DisposeSavepoint(savepointPath, blobKeys) => + case DisposeSavepoint(savepointPath) => val senderRef = sender() future { try { log.info(s"Disposing savepoint at '$savepointPath'.") - if (blobKeys.isDefined) { - // We don't need a real ID here for the library cache manager - val jid = new JobID() + val savepoint = savepointStore.loadSavepoint(savepointPath) - try { - libraryCacheManager.registerJob(jid, blobKeys.get, java.util.Collections.emptyList()) - val classLoader = libraryCacheManager.getClassLoader(jid) + log.debug(s"$savepoint") - // Discard with user code loader - savepointStore.disposeSavepoint(savepointPath, classLoader) - } finally { - libraryCacheManager.unregisterJob(jid) - } - } else { - // Discard with system class loader - savepointStore.disposeSavepoint(savepointPath, getClass.getClassLoader) - } + // Dispose the savepoint + savepointStore.disposeSavepoint(savepointPath) senderRef ! DisposeSavepointSuccess } catch { http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala index 40c4dcf..5e2b547 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala @@ -490,13 +490,9 @@ object JobManagerMessages { * Disposes a savepoint. * * @param savepointPath The path of the savepoint to dispose. - * @param blobKeys BLOB keys if a user program JAR was uploaded for disposal. - * This is required when we dispose state which contains - * custom state instances (e.g. reducing state, rocksDB state). */ case class DisposeSavepoint( - savepointPath: String, - blobKeys: Option[java.util.List[BlobKey]] = None) + savepointPath: String) extends RequiresLeaderSessionID /** Response after a successful savepoint dispose. */ http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java index 1816fc9..5416292 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java @@ -28,14 +28,19 @@ import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; -import org.apache.flink.runtime.state.LocalStateHandle; -import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.ChainedStateHandle; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; + import org.apache.flink.runtime.util.SerializableObject; -import org.apache.flink.util.SerializedValue; import org.junit.Test; import org.mockito.Mockito; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import static org.junit.Assert.assertEquals; @@ -55,8 +60,11 @@ public class CheckpointStateRestoreTest { @Test public void testSetState() { try { - final SerializedValue<StateHandle<?>> serializedState = new SerializedValue<StateHandle<?>>( - new LocalStateHandle<SerializableObject>(new SerializableObject())); + + final ChainedStateHandle<StreamStateHandle> serializedState = CheckpointCoordinatorTest.generateChainedStateHandle(new SerializableObject()); + KeyGroupRange keyGroupRange = KeyGroupRange.of(0,0); + List<SerializableObject> testStates = Collections.singletonList(new SerializableObject()); + final List<KeyGroupsStateHandle> serializedKeyGroupStates = CheckpointCoordinatorTest.generateKeyGroupState(keyGroupRange, testStates); final JobID jid = new JobID(); final JobVertexID statefulId = new JobVertexID(); @@ -106,9 +114,9 @@ public class CheckpointStateRestoreTest { PendingCheckpoint pending = coord.getPendingCheckpoints().values().iterator().next(); final long checkpointId = pending.getCheckpointId(); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, serializedState, 0)); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointId, serializedState, 0)); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointId, serializedState, 0)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, serializedState, serializedKeyGroupStates)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointId, serializedState, serializedKeyGroupStates)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointId, serializedState, serializedKeyGroupStates)); coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointId)); coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec2.getAttemptId(), checkpointId)); @@ -119,11 +127,11 @@ public class CheckpointStateRestoreTest { coord.restoreLatestCheckpointedState(map, true, false); // verify that each stateful vertex got the state - verify(statefulExec1, times(1)).setInitialState(Mockito.eq(serializedState), Mockito.<Map<Integer, SerializedValue<StateHandle<?>>>>any()); - verify(statefulExec2, times(1)).setInitialState(Mockito.eq(serializedState), Mockito.<Map<Integer, SerializedValue<StateHandle<?>>>>any()); - verify(statefulExec3, times(1)).setInitialState(Mockito.eq(serializedState), Mockito.<Map<Integer, SerializedValue<StateHandle<?>>>>any()); - verify(statelessExec1, times(0)).setInitialState(Mockito.<SerializedValue<StateHandle<?>>>any(), Mockito.<Map<Integer, SerializedValue<StateHandle<?>>>>any()); - verify(statelessExec2, times(0)).setInitialState(Mockito.<SerializedValue<StateHandle<?>>>any(), Mockito.<Map<Integer, SerializedValue<StateHandle<?>>>>any()); + verify(statefulExec1, times(1)).setInitialState(Mockito.eq(serializedState), Mockito.<List<KeyGroupsStateHandle>>any()); + verify(statefulExec2, times(1)).setInitialState(Mockito.eq(serializedState), Mockito.<List<KeyGroupsStateHandle>>any()); + verify(statefulExec3, times(1)).setInitialState(Mockito.eq(serializedState), Mockito.<List<KeyGroupsStateHandle>>any()); + verify(statelessExec1, times(0)).setInitialState(Mockito.<ChainedStateHandle<StreamStateHandle>>any(), Mockito.<List<KeyGroupsStateHandle>>any()); + verify(statelessExec2, times(0)).setInitialState(Mockito.<ChainedStateHandle<StreamStateHandle>>any(), Mockito.<List<KeyGroupsStateHandle>>any()); } catch (Exception e) { e.printStackTrace(); @@ -134,8 +142,10 @@ public class CheckpointStateRestoreTest { @Test public void testStateOnlyPartiallyAvailable() { try { - final SerializedValue<StateHandle<?>> serializedState = new SerializedValue<StateHandle<?>>( - new LocalStateHandle<SerializableObject>(new SerializableObject())); + final ChainedStateHandle<StreamStateHandle> serializedState = CheckpointCoordinatorTest.generateChainedStateHandle(new SerializableObject()); + KeyGroupRange keyGroupRange = KeyGroupRange.of(0,0); + List<SerializableObject> testStates = Collections.singletonList(new SerializableObject()); + final List<KeyGroupsStateHandle> serializedKeyGroupStates = CheckpointCoordinatorTest.generateKeyGroupState(keyGroupRange, testStates); final JobID jid = new JobID(); final JobVertexID statefulId = new JobVertexID(); @@ -186,9 +196,9 @@ public class CheckpointStateRestoreTest { final long checkpointId = pending.getCheckpointId(); // the difference to the test "testSetState" is that one stateful subtask does not report state - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, serializedState, 0)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, serializedState, serializedKeyGroupStates)); coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointId)); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointId, serializedState, 0)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointId, serializedState, serializedKeyGroupStates)); coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointId)); coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec2.getAttemptId(), checkpointId)); http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java index 634e177..6182ffd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java @@ -21,8 +21,9 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.messages.CheckpointMessagesTest; -import org.apache.flink.runtime.state.StateHandle; -import org.apache.flink.util.SerializedValue; +import org.apache.flink.runtime.state.ChainedStateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; + import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -107,8 +108,6 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger { // The ZooKeeper implementation discards asynchronously expected[i - 1].awaitDiscard(); assertTrue(expected[i - 1].isDiscarded()); - assertEquals(userClassLoader, expected[i - 1].getDiscardClassLoader()); - assertEquals(1, checkpoints.getNumberOfRetainedCheckpoints()); } } @@ -183,7 +182,6 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger { // The ZooKeeper implementation discards asynchronously checkpoint.awaitDiscard(); assertTrue(checkpoint.isDiscarded()); - assertEquals(userClassLoader, checkpoint.getDiscardClassLoader()); } } @@ -199,14 +197,14 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger { JobVertexID jvid = new JobVertexID(); Map<JobVertexID, TaskState> taskGroupStates = new HashMap<>(); - TaskState taskState = new TaskState(jvid, numberOfStates); + TaskState taskState = new TaskState(jvid, numberOfStates, numberOfStates); taskGroupStates.put(jvid, taskState); for (int i = 0; i < numberOfStates; i++) { - SerializedValue<StateHandle<?>> stateHandle = new SerializedValue<StateHandle<?>>( + ChainedStateHandle<StreamStateHandle> stateHandle = CheckpointCoordinatorTest.generateChainedStateHandle( new CheckpointMessagesTest.MyHandle()); - taskState.putState(i, new SubtaskState(stateHandle, 0, 0)); + taskState.putState(i, new SubtaskState(stateHandle, 0)); } return new TestCompletedCheckpoint(new JobID(), id, 0, taskGroupStates); @@ -230,8 +228,6 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger { // Latch for test variants which discard asynchronously private transient final CountDownLatch discardLatch = new CountDownLatch(1); - private transient ClassLoader discardClassLoader; - public TestCompletedCheckpoint( JobID jobId, long checkpointId, @@ -242,11 +238,10 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger { } @Override - public void discard(ClassLoader userClassLoader) throws Exception { - super.discard(userClassLoader); + public void discardState() throws Exception { + super.discardState(); if (!isDiscarded) { - this.discardClassLoader = userClassLoader; this.isDiscarded = true; if (discardLatch != null) { @@ -265,10 +260,6 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger { } } - public ClassLoader getDiscardClassLoader() { - return discardClassLoader; - } - @Override public boolean equals(Object o) { if (this == o) return true; http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java index 90a6836..9b04244 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java @@ -45,14 +45,14 @@ public class CompletedCheckpointTest { // Verify discard call is forwarded to state CompletedCheckpoint checkpoint = new CompletedCheckpoint(new JobID(), 0, 0, 1, taskStates, true); - checkpoint.discard(ClassLoader.getSystemClassLoader()); - verify(state, times(1)).discard(Matchers.any(ClassLoader.class)); + checkpoint.discardState(); + verify(state, times(1)).discardState(); Mockito.reset(state); // Verify discard call is not forwarded to state checkpoint = new CompletedCheckpoint(new JobID(), 0, 0, 1, taskStates, false); - checkpoint.discard(ClassLoader.getSystemClassLoader()); - verify(state, times(0)).discard(Matchers.any(ClassLoader.class)); + checkpoint.discardState(); + verify(state, times(0)).discardState(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java index d235e61..fd4e02d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java @@ -66,7 +66,7 @@ public class PendingCheckpointTest { setTaskState(pending, state); pending.abortDeclined(); - verify(state, times(1)).discard(Matchers.any(ClassLoader.class)); + verify(state, times(1)).discardState(); // Abort error Mockito.reset(state); @@ -75,7 +75,7 @@ public class PendingCheckpointTest { setTaskState(pending, state); pending.abortError(new Exception("Expected Test Exception")); - verify(state, times(1)).discard(Matchers.any(ClassLoader.class)); + verify(state, times(1)).discardState(); // Abort expired Mockito.reset(state); @@ -84,7 +84,7 @@ public class PendingCheckpointTest { setTaskState(pending, state); pending.abortExpired(); - verify(state, times(1)).discard(Matchers.any(ClassLoader.class)); + verify(state, times(1)).discardState(); // Abort subsumed Mockito.reset(state); @@ -93,7 +93,7 @@ public class PendingCheckpointTest { setTaskState(pending, state); pending.abortSubsumed(); - verify(state, times(1)).discard(Matchers.any(ClassLoader.class)); + verify(state, times(1)).discardState(); } /** @@ -106,21 +106,20 @@ public class PendingCheckpointTest { PendingCheckpoint pending = createPendingCheckpoint(); PendingCheckpointTest.setTaskState(pending, state); - pending.acknowledgeTask(ATTEMPT_ID, null, 0, null); + pending.acknowledgeTask(ATTEMPT_ID, null, null); CompletedCheckpoint checkpoint = pending.finalizeCheckpoint(); // Does discard state - checkpoint.discard(ClassLoader.getSystemClassLoader()); - verify(state, times(1)).discard(Matchers.any(ClassLoader.class)); + checkpoint.discardState(); + verify(state, times(1)).discardState(); } // ------------------------------------------------------------------------ private static PendingCheckpoint createPendingCheckpoint() { - ClassLoader classLoader = ClassLoader.getSystemClassLoader(); Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(ACK_TASKS); - return new PendingCheckpoint(new JobID(), 0, 1, ackTasks, classLoader); + return new PendingCheckpoint(new JobID(), 0, 1, ackTasks); } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingSavepointTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingSavepointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingSavepointTest.java index 6ae6e1c..7258545 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingSavepointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingSavepointTest.java @@ -71,7 +71,7 @@ public class PendingSavepointTest { PendingCheckpointTest.setTaskState(pending, state); pending.abortDeclined(); - verify(state, times(1)).discard(Matchers.any(ClassLoader.class)); + verify(state, times(1)).discardState(); // Abort error Mockito.reset(state); @@ -81,7 +81,7 @@ public class PendingSavepointTest { Future<String> future = pending.getCompletionFuture(); pending.abortError(new Exception("Expected Test Exception")); - verify(state, times(1)).discard(Matchers.any(ClassLoader.class)); + verify(state, times(1)).discardState(); assertTrue(future.failed().isCompleted()); // Abort expired @@ -92,7 +92,7 @@ public class PendingSavepointTest { future = pending.getCompletionFuture(); pending.abortExpired(); - verify(state, times(1)).discard(Matchers.any(ClassLoader.class)); + verify(state, times(1)).discardState(); assertTrue(future.failed().isCompleted()); // Abort subsumed @@ -117,13 +117,13 @@ public class PendingSavepointTest { Future<String> future = pending.getCompletionFuture(); - pending.acknowledgeTask(ATTEMPT_ID, null, 0, null); + pending.acknowledgeTask(ATTEMPT_ID, null, null); CompletedCheckpoint checkpoint = pending.finalizeCheckpoint(); // Does _NOT_ discard state - checkpoint.discard(ClassLoader.getSystemClassLoader()); - verify(state, times(0)).discard(Matchers.any(ClassLoader.class)); + checkpoint.discardState(); + verify(state, times(0)).discardState(); // Future is completed String path = Await.result(future, Duration.Zero()); @@ -133,9 +133,8 @@ public class PendingSavepointTest { // ------------------------------------------------------------------------ private static PendingSavepoint createPendingSavepoint() { - ClassLoader classLoader = ClassLoader.getSystemClassLoader(); Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(ACK_TASKS); - return new PendingSavepoint(new JobID(), 0, 1, ackTasks, classLoader, new HeapSavepointStore()); + return new PendingSavepoint(new JobID(), 0, 1, ackTasks, new HeapSavepointStore()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java index 380ba2c..f273797 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java @@ -19,9 +19,8 @@ package org.apache.flink.runtime.checkpoint; import org.apache.curator.framework.CuratorFramework; -import org.apache.flink.runtime.state.LocalStateHandle; -import org.apache.flink.runtime.state.StateHandle; -import org.apache.flink.runtime.zookeeper.StateStorageHelper; +import org.apache.flink.runtime.state.RetrievableStateHandle; +import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper; import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment; import org.junit.AfterClass; import org.junit.Before; @@ -29,6 +28,8 @@ import org.junit.Test; import scala.concurrent.duration.Deadline; import scala.concurrent.duration.FiniteDuration; +import java.io.IOException; +import java.io.Serializable; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; @@ -61,10 +62,10 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint int maxNumberOfCheckpointsToRetain, ClassLoader userLoader) throws Exception { return new ZooKeeperCompletedCheckpointStore(maxNumberOfCheckpointsToRetain, userLoader, - ZooKeeper.createClient(), CheckpointsPath, new StateStorageHelper<CompletedCheckpoint>() { + ZooKeeper.createClient(), CheckpointsPath, new RetrievableStateStorageHelper<CompletedCheckpoint>() { @Override - public StateHandle<CompletedCheckpoint> store(CompletedCheckpoint state) throws Exception { - return new LocalStateHandle<>(state); + public RetrievableStateHandle<CompletedCheckpoint> store(CompletedCheckpoint state) throws Exception { + return new HeapRetrievableStateHandle<CompletedCheckpoint>(state); } }); } @@ -160,4 +161,35 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint CompletedCheckpoint recovered = store.getLatestCheckpoint(); assertEquals(checkpoint, recovered); } + + static class HeapRetrievableStateHandle<T extends Serializable> implements RetrievableStateHandle<T> { + + private static final long serialVersionUID = -268548467968932L; + + public HeapRetrievableStateHandle(T state) { + this.state = state; + } + + private T state; + + @Override + public T retrieveState() throws Exception { + return state; + } + + @Override + public void discardState() throws Exception { + state = null; + } + + @Override + public long getStateSize() throws Exception { + return 0; + } + + @Override + public void close() throws IOException { + + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java index 6b8c651..3e2de80 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java @@ -58,7 +58,7 @@ public class FsSavepointStoreTest { assertEquals(0, tmp.getRoot().listFiles().length); // Store - SavepointV0 stored = new SavepointV0(1929292, SavepointV0Test.createTaskStates(4, 24)); + SavepointV1 stored = new SavepointV1(1929292, SavepointV1Test.createTaskStates(4, 24)); String path = store.storeSavepoint(stored); assertEquals(1, tmp.getRoot().listFiles().length); @@ -67,7 +67,7 @@ public class FsSavepointStoreTest { assertEquals(stored, loaded); // Dispose - store.disposeSavepoint(path, ClassLoader.getSystemClassLoader()); + store.disposeSavepoint(path); assertEquals(0, tmp.getRoot().listFiles().length); } @@ -122,7 +122,7 @@ public class FsSavepointStoreTest { assertEquals(1, tmp.getRoot().listFiles().length); // Savepoint v0 - Savepoint savepoint = new SavepointV0(checkpointId, SavepointV0Test.createTaskStates(4, 32)); + Savepoint savepoint = new SavepointV1(checkpointId, SavepointV1Test.createTaskStates(4, 32)); String pathSavepoint = store.storeSavepoint(savepoint); assertEquals(2, tmp.getRoot().listFiles().length); @@ -208,7 +208,7 @@ public class FsSavepointStoreTest { } @Override - public void dispose(ClassLoader classLoader) { + public void dispose() { } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java index 6a85195..d703bd6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java @@ -65,7 +65,7 @@ public class SavepointLoaderTest { true); // Store savepoint - SavepointV0 savepoint = new SavepointV0(stored.getCheckpointID(), taskStates.values()); + SavepointV1 savepoint = new SavepointV1(stored.getCheckpointID(), taskStates.values()); SavepointStore store = new HeapSavepointStore(); String path = store.storeSavepoint(savepoint); @@ -84,8 +84,8 @@ public class SavepointLoaderTest { assertEquals(stored.getCheckpointID(), loaded.getCheckpointID()); // The loaded checkpoint should not discard state when its discarded - loaded.discard(ClassLoader.getSystemClassLoader()); - verify(state, times(0)).discard(any(ClassLoader.class)); + loaded.discardState(); + verify(state, times(0)).discardState(); // 2) Load and validate: parallelism mismatch when(vertex.getParallelism()).thenReturn(222); http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0SerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0SerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0SerializerTest.java deleted file mode 100644 index b656d90..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0SerializerTest.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.checkpoint.savepoint; - -import org.apache.commons.io.output.ByteArrayOutputStream; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.junit.Test; - -import java.io.ByteArrayInputStream; - -import static org.junit.Assert.assertEquals; - -public class SavepointV0SerializerTest { - - /** - * Test serialization of {@link SavepointV0} instance. - */ - @Test - public void testSerializeDeserializeV1() throws Exception { - SavepointV0 expected = new SavepointV0(123123, SavepointV0Test.createTaskStates(8, 32)); - - SavepointV0Serializer serializer = SavepointV0Serializer.INSTANCE; - - // Serialize - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - serializer.serialize(expected, new DataOutputViewStreamWrapper(baos)); - byte[] bytes = baos.toByteArray(); - - // Deserialize - ByteArrayInputStream bais = new ByteArrayInputStream(bytes); - Savepoint actual = serializer.deserialize(new DataInputViewStreamWrapper(bais)); - - assertEquals(expected, actual); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Test.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Test.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Test.java deleted file mode 100644 index 4d72c42..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Test.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.checkpoint.savepoint; - -import org.apache.flink.runtime.checkpoint.SubtaskState; -import org.apache.flink.runtime.checkpoint.TaskState; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.messages.CheckpointMessagesTest; -import org.apache.flink.runtime.state.StateHandle; -import org.apache.flink.util.SerializedValue; -import org.junit.Test; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.ThreadLocalRandom; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -public class SavepointV0Test { - - /** - * Simple test of savepoint methods. - */ - @Test - public void testSavepointV0() throws Exception { - long checkpointId = ThreadLocalRandom.current().nextLong(Integer.MAX_VALUE); - int numTaskStates = 4; - int numSubtaskStates = 16; - - Collection<TaskState> expected = createTaskStates(numTaskStates, numSubtaskStates); - - SavepointV0 savepoint = new SavepointV0(checkpointId, expected); - - assertEquals(SavepointV0.VERSION, savepoint.getVersion()); - assertEquals(checkpointId, savepoint.getCheckpointId()); - assertEquals(expected, savepoint.getTaskStates()); - - assertFalse(savepoint.getTaskStates().isEmpty()); - savepoint.dispose(ClassLoader.getSystemClassLoader()); - assertTrue(savepoint.getTaskStates().isEmpty()); - } - - static Collection<TaskState> createTaskStates(int numTaskStates, int numSubtaskStates) throws IOException { - List<TaskState> taskStates = new ArrayList<>(numTaskStates); - - for (int i = 0; i < numTaskStates; i++) { - TaskState taskState = new TaskState(new JobVertexID(), numSubtaskStates); - for (int j = 0; j < numSubtaskStates; j++) { - SerializedValue<StateHandle<?>> stateHandle = new SerializedValue<StateHandle<?>>( - new CheckpointMessagesTest.MyHandle()); - - taskState.putState(i, new SubtaskState(stateHandle, 0, 0)); - } - - taskStates.add(taskState); - } - - return taskStates; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java new file mode 100644 index 0000000..bad836b --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.savepoint; + +import org.apache.commons.io.output.ByteArrayOutputStream; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.junit.Test; + +import java.io.ByteArrayInputStream; + +import static org.junit.Assert.assertEquals; + +public class SavepointV1SerializerTest { + + /** + * Test serialization of {@link SavepointV1} instance. + */ + @Test + public void testSerializeDeserializeV1() throws Exception { + SavepointV1 expected = new SavepointV1(123123, SavepointV1Test.createTaskStates(8, 32)); + + SavepointV1Serializer serializer = SavepointV1Serializer.INSTANCE; + + // Serialize + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + serializer.serialize(expected, new DataOutputViewStreamWrapper(baos)); + byte[] bytes = baos.toByteArray(); + + // Deserialize + ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + Savepoint actual = serializer.deserialize(new DataInputViewStreamWrapper(bais)); + + assertEquals(expected, actual); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Test.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Test.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Test.java new file mode 100644 index 0000000..ef10032 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Test.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.savepoint; + +import org.apache.flink.runtime.checkpoint.SubtaskState; +import org.apache.flink.runtime.checkpoint.TaskState; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.state.ChainedStateHandle; +import org.apache.flink.runtime.state.KeyGroupRangeOffsets; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class SavepointV1Test { + + /** + * Simple test of savepoint methods. + */ + @Test + public void testSavepointV1() throws Exception { + long checkpointId = ThreadLocalRandom.current().nextLong(Integer.MAX_VALUE); + int numTaskStates = 4; + int numSubtaskStates = 16; + + Collection<TaskState> expected = createTaskStates(numTaskStates, numSubtaskStates); + + SavepointV1 savepoint = new SavepointV1(checkpointId, expected); + + assertEquals(SavepointV1.VERSION, savepoint.getVersion()); + assertEquals(checkpointId, savepoint.getCheckpointId()); + assertEquals(expected, savepoint.getTaskStates()); + + assertFalse(savepoint.getTaskStates().isEmpty()); + savepoint.dispose(); + assertTrue(savepoint.getTaskStates().isEmpty()); + } + + static Collection<TaskState> createTaskStates(int numTaskStates, int numSubtaskStates) throws IOException { + List<TaskState> taskStates = new ArrayList<>(numTaskStates); + + for (int i = 0; i < numTaskStates; i++) { + TaskState taskState = new TaskState(new JobVertexID(), numSubtaskStates, numSubtaskStates); + for (int j = 0; j < numSubtaskStates; j++) { + StreamStateHandle stateHandle = new ByteStreamStateHandle("Hello".getBytes()); + taskState.putState(i, new SubtaskState( + new ChainedStateHandle<>(Collections.singletonList(stateHandle)), 0)); + } + + taskState.putKeyedState( + 0, + new KeyGroupsStateHandle( + new KeyGroupRangeOffsets(1,1, new long[] {42}), new ByteStreamStateHandle("Hello".getBytes()))); + + taskStates.add(taskState); + } + + return taskStates; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java index 12bbf82..c513e26 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java @@ -19,17 +19,18 @@ package org.apache.flink.runtime.checkpoint.stats; import org.apache.flink.api.common.JobID; +import org.apache.flink.core.fs.Path; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.checkpoint.TaskState; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.state.StateHandle; -import org.apache.flink.util.SerializedValue; +import org.apache.flink.runtime.state.ChainedStateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; import org.junit.Test; -import java.io.IOException; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Arrays; @@ -51,7 +52,7 @@ import static org.mockito.Mockito.when; public class SimpleCheckpointStatsTrackerTest { private static final Random RAND = new Random(); - + @Test public void testNoCompletedCheckpointYet() throws Exception { CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker( @@ -154,7 +155,7 @@ public class SimpleCheckpointStatsTrackerTest { private static void verifyJobStats( CheckpointStatsTracker tracker, int historySize, - CompletedCheckpoint[] checkpoints) { + CompletedCheckpoint[] checkpoints) throws Exception { assertTrue(tracker.getJobStats().isDefined()); JobCheckpointStats jobStats = tracker.getJobStats().get(); @@ -275,14 +276,15 @@ public class SimpleCheckpointStatsTrackerTest { } private static CompletedCheckpoint[] generateRandomCheckpoints( - int numCheckpoints) throws IOException { + int numCheckpoints) throws Exception { // Config JobID jobId = new JobID(); int minNumOperators = 4; int maxNumOperators = 32; - int minParallelism = 4; - int maxParallelism = 16; + int minOperatorParallelism = 4; + int maxOperatorParallelism = 16; + int maxParallelism = 32; // Use yuge numbers here in order to test that summing up state sizes // does not overflow. This was a bug in the initial version, because @@ -299,7 +301,7 @@ public class SimpleCheckpointStatsTrackerTest { for (int i = 0; i < numOperators; i++) { operatorIds[i] = new JobVertexID(); - operatorParallelism[i] = RAND.nextInt(maxParallelism - minParallelism + 1) + minParallelism; + operatorParallelism[i] = RAND.nextInt(maxOperatorParallelism - minOperatorParallelism + 1) + minOperatorParallelism; } // Generate checkpoints @@ -317,7 +319,7 @@ public class SimpleCheckpointStatsTrackerTest { JobVertexID operatorId = operatorIds[operatorIndex]; int parallelism = operatorParallelism[operatorIndex]; - TaskState taskState = new TaskState(operatorId, parallelism); + TaskState taskState = new TaskState(operatorId, parallelism, maxParallelism); taskGroupStates.put(operatorId, taskState); @@ -328,9 +330,11 @@ public class SimpleCheckpointStatsTrackerTest { completionDuration = duration; } + final long proxySize = minStateSize + ((long) (RAND.nextDouble() * (maxStateSize - minStateSize))); + StreamStateHandle proxy = new StateHandleProxy(new Path(), proxySize); + SubtaskState subtaskState = new SubtaskState( - new SerializedValue<StateHandle<?>>(null), - minStateSize + ((long) (RAND.nextDouble() * (maxStateSize - minStateSize))), + new ChainedStateHandle<>(Arrays.asList(proxy)), duration); taskState.putState(subtaskIndex, subtaskState); @@ -356,10 +360,32 @@ public class SimpleCheckpointStatsTrackerTest { ExecutionJobVertex v = mock(ExecutionJobVertex.class); when(v.getJobVertexId()).thenReturn(operatorId); when(v.getParallelism()).thenReturn(parallelism); - + jobVertices.add(v); } return jobVertices; } + + private static class StateHandleProxy extends FileStateHandle { + + private static final long serialVersionUID = 35356735683568L; + + public StateHandleProxy(Path filePath, long proxySize) { + super(filePath); + this.proxySize = proxySize; + } + + private long proxySize; + + @Override + public void discardState() throws Exception { + + } + + @Override + public long getStateSize() { + return proxySize; + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java index 0e1c7c5..6b80c3d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java @@ -53,8 +53,11 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.runtime.state.LocalStateHandle; -import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.ChainedStateHandle; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.RetrievableStreamStateHandle; +import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; import org.apache.flink.runtime.taskmanager.TaskManager; import org.apache.flink.runtime.testingUtils.TestingJobManager; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; @@ -62,6 +65,7 @@ import org.apache.flink.runtime.testingUtils.TestingMessages; import org.apache.flink.runtime.testingUtils.TestingTaskManager; import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.InstantiationUtil; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Rule; @@ -141,38 +145,38 @@ public class JobManagerHARecoveryTest { instanceManager.addInstanceListener(scheduler); archive = system.actorOf(Props.create( - MemoryArchivist.class, - 10), "archive"); + MemoryArchivist.class, + 10), "archive"); Props jobManagerProps = Props.create( - TestingJobManager.class, - flinkConfiguration, - new ForkJoinPool(), - instanceManager, - scheduler, - new BlobLibraryCacheManager(new BlobServer(flinkConfiguration), 3600000), - archive, - new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100), - timeout, - myLeaderElectionService, - mySubmittedJobGraphStore, - checkpointStateFactory, - new HeapSavepointStore(), - jobRecoveryTimeout, - Option.apply(null)); + TestingJobManager.class, + flinkConfiguration, + new ForkJoinPool(), + instanceManager, + scheduler, + new BlobLibraryCacheManager(new BlobServer(flinkConfiguration), 3600000), + archive, + new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100), + timeout, + myLeaderElectionService, + mySubmittedJobGraphStore, + checkpointStateFactory, + new HeapSavepointStore(), + jobRecoveryTimeout, + Option.apply(null)); jobManager = system.actorOf(jobManagerProps, "jobmanager"); ActorGateway gateway = new AkkaActorGateway(jobManager, leaderSessionID); taskManager = TaskManager.startTaskManagerComponentsAndActor( - flinkConfiguration, - ResourceID.generate(), - system, - "localhost", - Option.apply("taskmanager"), - Option.apply((LeaderRetrievalService) myLeaderRetrievalService), - true, - TestingTaskManager.class); + flinkConfiguration, + ResourceID.generate(), + system, + "localhost", + Option.apply("taskmanager"), + Option.apply((LeaderRetrievalService) myLeaderRetrievalService), + true, + TestingTaskManager.class); ActorGateway tmGateway = new AkkaActorGateway(taskManager, leaderSessionID); @@ -199,12 +203,12 @@ public class JobManagerHARecoveryTest { BlockingStatefulInvokable.initializeStaticHelpers(slots); Future<Object> isLeader = gateway.ask( - TestingJobManagerMessages.getNotifyWhenLeader(), - deadline.timeLeft()); + TestingJobManagerMessages.getNotifyWhenLeader(), + deadline.timeLeft()); Future<Object> isConnectedToJobManager = tmGateway.ask( - new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager), - deadline.timeLeft()); + new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager), + deadline.timeLeft()); // tell jobManager that he's the leader myLeaderElectionService.isLeader(leaderSessionID); @@ -216,8 +220,8 @@ public class JobManagerHARecoveryTest { // submit blocking job Future<Object> jobSubmitted = gateway.ask( - new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED), - deadline.timeLeft()); + new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED), + deadline.timeLeft()); Await.ready(jobSubmitted, deadline.timeLeft()); @@ -298,7 +302,7 @@ public class JobManagerHARecoveryTest { public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception { checkpoints.addLast(checkpoint); if (checkpoints.size() > 1) { - checkpoints.removeFirst().discard(ClassLoader.getSystemClassLoader()); + checkpoints.removeFirst().discardState(); } } @@ -342,10 +346,12 @@ public class JobManagerHARecoveryTest { } @Override - public void start() {} + public void start() { + } @Override - public void stop() {} + public void stop() { + } @Override public CompletedCheckpointStore createCheckpointStore(JobID jobId, ClassLoader userClassLoader) throws Exception { @@ -408,7 +414,7 @@ public class JobManagerHARecoveryTest { @Override public void invoke() throws Exception { - while(blocking) { + while (blocking) { synchronized (lock) { lock.wait(); } @@ -424,7 +430,7 @@ public class JobManagerHARecoveryTest { } } - public static class BlockingStatefulInvokable extends BlockingInvokable implements StatefulTask<StateHandle<Long>> { + public static class BlockingStatefulInvokable extends BlockingInvokable implements StatefulTask { private static final int NUM_CHECKPOINTS_TO_COMPLETE = 5; @@ -435,18 +441,28 @@ public class JobManagerHARecoveryTest { private int completedCheckpoints = 0; @Override - public void setInitialState(StateHandle<Long> stateHandle) throws Exception { + public void setInitialState(ChainedStateHandle<StreamStateHandle> chainedState, List<KeyGroupsStateHandle> keyGroupsState) throws Exception { int subtaskIndex = getIndexInSubtaskGroup(); if (subtaskIndex < recoveredStates.length) { - recoveredStates[subtaskIndex] = stateHandle.getState(getUserCodeClassLoader()); + recoveredStates[subtaskIndex] = InstantiationUtil.deserializeObject(chainedState.get(0).openInputStream()); } } @Override public boolean triggerCheckpoint(long checkpointId, long timestamp) { - StateHandle<Long> state = new LocalStateHandle<>(checkpointId); - getEnvironment().acknowledgeCheckpoint(checkpointId, state); - return true; + try { + ByteStreamStateHandle byteStreamStateHandle = new ByteStreamStateHandle( + InstantiationUtil.serializeObject(checkpointId)); + RetrievableStreamStateHandle<Long> state = new RetrievableStreamStateHandle<Long>(byteStreamStateHandle); + ChainedStateHandle<StreamStateHandle> chainedStateHandle = new ChainedStateHandle<StreamStateHandle>(Collections.singletonList(state)); + getEnvironment().acknowledgeCheckpoint( + checkpointId, + chainedStateHandle, + Collections.<KeyGroupsStateHandle>emptyList()); + return true; + } catch (Exception ex) { + throw new RuntimeException(ex); + } } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java index 426dfba..6ef184d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java @@ -19,16 +19,17 @@ package org.apache.flink.runtime.jobmanager; import akka.actor.ActorRef; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.akka.ListeningBehaviour; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener; -import org.apache.flink.runtime.state.LocalStateHandle; -import org.apache.flink.runtime.state.StateHandle; -import org.apache.flink.runtime.zookeeper.StateStorageHelper; +import org.apache.flink.runtime.state.RetrievableStateHandle; +import org.apache.flink.runtime.state.RetrievableStreamStateHandle; +import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; +import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper; import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment; +import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.TestLogger; import org.junit.AfterClass; import org.junit.Before; @@ -36,6 +37,7 @@ import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -57,10 +59,12 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger { private final static ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1); - private final static StateStorageHelper<SubmittedJobGraph> localStateStorage = new StateStorageHelper<SubmittedJobGraph>() { + private final static RetrievableStateStorageHelper<SubmittedJobGraph> localStateStorage = new RetrievableStateStorageHelper<SubmittedJobGraph>() { @Override - public StateHandle<SubmittedJobGraph> store(SubmittedJobGraph state) throws Exception { - return new LocalStateHandle<>(state); + public RetrievableStateHandle<SubmittedJobGraph> store(SubmittedJobGraph state) throws IOException { + ByteStreamStateHandle byteStreamStateHandle = new ByteStreamStateHandle( + InstantiationUtil.serializeObject(state)); + return new RetrievableStreamStateHandle<SubmittedJobGraph>(byteStreamStateHandle); } }; http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java index 73bf204..c6eb249 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java @@ -18,32 +18,38 @@ package org.apache.flink.runtime.messages; -import static org.junit.Assert.*; - import org.apache.flink.api.common.JobID; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; +import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete; import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint; -import org.apache.flink.runtime.state.StateHandle; -import org.apache.flink.core.testutils.CommonTestUtils; -import org.apache.flink.util.SerializedValue; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.StreamStateHandle; import org.junit.Test; import java.io.IOException; import java.io.Serializable; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; public class CheckpointMessagesTest { - + @Test public void testTriggerAndConfirmCheckpoint() { try { NotifyCheckpointComplete cc = new NotifyCheckpointComplete(new JobID(), new ExecutionAttemptID(), 45287698767345L, 467L); testSerializabilityEqualsHashCode(cc); - + TriggerCheckpoint tc = new TriggerCheckpoint(new JobID(), new ExecutionAttemptID(), 347652734L, 7576752L); testSerializabilityEqualsHashCode(tc); - + } catch (Exception e) { e.printStackTrace(); @@ -55,35 +61,40 @@ public class CheckpointMessagesTest { public void testConfirmTaskCheckpointed() { try { AcknowledgeCheckpoint noState = new AcknowledgeCheckpoint( - new JobID(), new ExecutionAttemptID(), 569345L); + new JobID(), new ExecutionAttemptID(), 569345L); + + KeyGroupRange keyGroupRange = KeyGroupRange.of(42,42); AcknowledgeCheckpoint withState = new AcknowledgeCheckpoint( - new JobID(), new ExecutionAttemptID(), 87658976143L, - new SerializedValue<StateHandle<?>>(new MyHandle()), 0); - + new JobID(), + new ExecutionAttemptID(), + 87658976143L, + CheckpointCoordinatorTest.generateChainedStateHandle(new MyHandle()), + CheckpointCoordinatorTest.generateKeyGroupState( + keyGroupRange, Collections.singletonList(new MyHandle()))); + testSerializabilityEqualsHashCode(noState); testSerializabilityEqualsHashCode(withState); - } - catch (Exception e) { + } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } } - + private static void testSerializabilityEqualsHashCode(Serializable o) throws IOException { Object copy = CommonTestUtils.createCopySerializable(o); + System.out.println(o.getClass() +" "+copy.getClass()); assertEquals(o, copy); assertEquals(o.hashCode(), copy.hashCode()); assertNotNull(o.toString()); assertNotNull(copy.toString()); } - - public static class MyHandle implements StateHandle<Serializable> { + + public static class MyHandle implements StreamStateHandle { private static final long serialVersionUID = 8128146204128728332L; - @Override - public Serializable getState(ClassLoader userCodeClassLoader) { + public Serializable get(ClassLoader userCodeClassLoader) { return null; } @@ -107,5 +118,10 @@ public class CheckpointMessagesTest { @Override public void close() throws IOException {} + + @Override + public FSDataInputStream openInputStream() throws Exception { + return null; + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java index 87540bc..19317f9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java @@ -36,10 +36,13 @@ import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.query.TaskKvStateRegistry; -import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.ChainedStateHandle; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.concurrent.Future; @@ -149,10 +152,15 @@ public class DummyEnvironment implements Environment { } @Override - public void acknowledgeCheckpoint(long checkpointId) {} + public void acknowledgeCheckpoint(long checkpointId) { + + } @Override - public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {} + public void acknowledgeCheckpoint(long checkpointId, + ChainedStateHandle<StreamStateHandle> chainedStateHandle, + List<KeyGroupsStateHandle> keyGroupStateHandles) { + } @Override public void failExternally(Throwable cause) { http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java index 7b966c3..2c76399 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java @@ -45,7 +45,10 @@ import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.query.TaskKvStateRegistry; -import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.ChainedStateHandle; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; + import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.types.Record; import org.apache.flink.util.MutableObjectIterator; @@ -96,9 +99,13 @@ public class MockEnvironment implements Environment { private final int bufferSize; public MockEnvironment(String taskName, long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) { + this(taskName, memorySize, inputSplitProvider, bufferSize, new Configuration()); + } + + public MockEnvironment(String taskName, long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize, Configuration taskConfiguration) { this.taskInfo = new TaskInfo(taskName, 0, 1, 0); this.jobConfiguration = new Configuration(); - this.taskConfiguration = new Configuration(); + this.taskConfiguration = taskConfiguration; this.inputs = new LinkedList<InputGate>(); this.outputs = new LinkedList<ResultPartitionWriter>(); @@ -298,7 +305,9 @@ public class MockEnvironment implements Environment { } @Override - public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) { + public void acknowledgeCheckpoint(long checkpointId, + ChainedStateHandle<StreamStateHandle> chainedStateHandle, + List<KeyGroupsStateHandle> keyGroupStateHandles) { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractCloseableHandleTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractCloseableHandleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractCloseableHandleTest.java index ad3339a..40e1852 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractCloseableHandleTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractCloseableHandleTest.java @@ -85,5 +85,15 @@ public class AbstractCloseableHandleTest { private static final class CloseableHandle extends AbstractCloseableHandle { private static final long serialVersionUID = 1L; + + @Override + public void discardState() throws Exception { + + } + + @Override + public long getStateSize() throws Exception { + return 0; + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java index 0f1c0f7..04fa089 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java @@ -24,7 +24,8 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.core.fs.Path; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; -import org.apache.flink.runtime.state.filesystem.FileStreamStateHandle; + +import org.apache.flink.runtime.state.filesystem.FileStateHandle; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; @@ -86,7 +87,12 @@ public class FileStateBackendTest extends StateBackendTestBase<FsStateBackend> { // no file operations should be possible right now try { - backend.checkpointStateSerializable("exception train rolling in", 2L, System.currentTimeMillis()); + FsStateBackend.FsCheckpointStateOutputStream out = backend.createCheckpointStateOutputStream( + 2L, + System.currentTimeMillis()); + + out.write(1); + out.closeAndGetHandle(); fail("should fail with an exception"); } catch (IllegalStateException e) { // supreme! @@ -114,43 +120,6 @@ public class FileStateBackendTest extends StateBackendTestBase<FsStateBackend> { } @Test - public void testSerializableState() { - File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()); - try { - FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir))); - backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test-op", IntSerializer.INSTANCE); - - File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath()); - - String state1 = "dummy state"; - String state2 = "row row row your boat"; - Integer state3 = 42; - - StateHandle<String> handle1 = backend.checkpointStateSerializable(state1, 439568923746L, System.currentTimeMillis()); - StateHandle<String> handle2 = backend.checkpointStateSerializable(state2, 439568923746L, System.currentTimeMillis()); - StateHandle<Integer> handle3 = backend.checkpointStateSerializable(state3, 439568923746L, System.currentTimeMillis()); - - assertEquals(state1, handle1.getState(getClass().getClassLoader())); - handle1.discardState(); - - assertEquals(state2, handle2.getState(getClass().getClassLoader())); - handle2.discardState(); - - assertEquals(state3, handle3.getState(getClass().getClassLoader())); - handle3.discardState(); - - assertTrue(isDirectoryEmpty(checkpointDir)); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { - deleteDirectorySilently(tempDir); - } - } - - @Test public void testStateOutputStream() { File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()); try { @@ -185,16 +154,16 @@ public class FileStateBackendTest extends StateBackendTestBase<FsStateBackend> { stream2.write(state2); stream3.write(state3); - FileStreamStateHandle handle1 = (FileStreamStateHandle) stream1.closeAndGetHandle(); + FileStateHandle handle1 = (FileStateHandle) stream1.closeAndGetHandle(); ByteStreamStateHandle handle2 = (ByteStreamStateHandle) stream2.closeAndGetHandle(); ByteStreamStateHandle handle3 = (ByteStreamStateHandle) stream3.closeAndGetHandle(); // use with try-with-resources - FileStreamStateHandle handle4; + StreamStateHandle handle4; try (AbstractStateBackend.CheckpointStateOutputStream stream4 = backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis())) { stream4.write(state4); - handle4 = (FileStreamStateHandle) stream4.closeAndGetHandle(); + handle4 = stream4.closeAndGetHandle(); } // close before accessing handle @@ -209,18 +178,18 @@ public class FileStateBackendTest extends StateBackendTestBase<FsStateBackend> { // uh-huh } - validateBytesInStream(handle1.getState(getClass().getClassLoader()), state1); + validateBytesInStream(handle1.openInputStream(), state1); handle1.discardState(); assertFalse(isDirectoryEmpty(checkpointDir)); ensureLocalFileDeleted(handle1.getFilePath()); - validateBytesInStream(handle2.getState(getClass().getClassLoader()), state2); + validateBytesInStream(handle2.openInputStream(), state2); handle2.discardState(); - validateBytesInStream(handle3.getState(getClass().getClassLoader()), state3); + validateBytesInStream(handle3.openInputStream(), state3); handle3.discardState(); - validateBytesInStream(handle4.getState(getClass().getClassLoader()), state4); + validateBytesInStream(handle4.openInputStream(), state4); handle4.discardState(); assertTrue(isDirectoryEmpty(checkpointDir)); }