[FLINK-7213] Introduce state management by OperatorID in TaskManager
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b71154a7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b71154a7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b71154a7 Branch: refs/heads/master Commit: b71154a734ea9f4489dffe1be6761efbb90cff41 Parents: 3b0321a Author: Stefan Richter <s.rich...@data-artisans.com> Authored: Mon Jun 26 18:07:59 2017 +0200 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Tue Aug 15 14:56:54 2017 +0200 ---------------------------------------------------------------------- .../state/RocksDBAsyncSnapshotTest.java | 21 +- .../checkpoint/CheckpointCoordinator.java | 7 +- .../CheckpointCoordinatorGateway.java | 2 +- .../flink/runtime/checkpoint/OperatorState.java | 4 +- .../checkpoint/OperatorSubtaskState.java | 224 ++++++--- .../runtime/checkpoint/PendingCheckpoint.java | 54 +- .../RoundRobinOperatorStateRepartitioner.java | 4 + .../checkpoint/StateAssignmentOperation.java | 177 ++++--- .../runtime/checkpoint/TaskStateSnapshot.java | 139 ++++++ .../savepoint/SavepointV2Serializer.java | 20 +- .../deployment/TaskDeploymentDescriptor.java | 8 +- .../flink/runtime/execution/Environment.java | 9 +- .../flink/runtime/executiongraph/Execution.java | 8 +- .../runtime/executiongraph/ExecutionVertex.java | 8 +- .../runtime/jobgraph/tasks/StatefulTask.java | 8 +- .../flink/runtime/jobmaster/JobMaster.java | 5 +- .../checkpoint/AcknowledgeCheckpoint.java | 8 +- .../state/StateInitializationContextImpl.java | 11 +- .../flink/runtime/state/TaskStateHandles.java | 172 ------- .../rpc/RpcCheckpointResponder.java | 4 +- .../ActorGatewayCheckpointResponder.java | 4 +- .../taskmanager/CheckpointResponder.java | 4 +- .../runtime/taskmanager/RuntimeEnvironment.java | 4 +- .../apache/flink/runtime/taskmanager/Task.java | 8 +- .../CheckpointCoordinatorFailureTest.java | 49 +- .../checkpoint/CheckpointCoordinatorTest.java | 498 ++++++++++--------- .../checkpoint/CheckpointStateRestoreTest.java | 49 +- .../CompletedCheckpointStoreTest.java | 2 +- .../checkpoint/PendingCheckpointTest.java | 2 +- .../TaskDeploymentDescriptorTest.java | 4 +- .../ExecutionVertexLocalityTest.java | 10 +- .../jobmanager/JobManagerHARecoveryTest.java | 60 +-- .../messages/CheckpointMessagesTest.java | 23 +- .../operators/testutils/DummyEnvironment.java | 4 +- .../operators/testutils/MockEnvironment.java | 6 +- .../runtime/taskmanager/TaskAsyncCallTest.java | 6 +- .../flink/runtime/taskmanager/TaskStopTest.java | 26 +- .../runtime/util/JvmExitOnFatalErrorTest.java | 7 +- .../flink/streaming/api/graph/StreamConfig.java | 13 +- .../api/graph/StreamingJobGraphGenerator.java | 11 +- .../api/operators/AbstractStreamOperator.java | 19 +- .../streaming/api/operators/StreamOperator.java | 6 +- .../runtime/tasks/OperatorStateHandles.java | 19 - .../streaming/runtime/tasks/StreamTask.java | 196 +++----- .../AbstractUdfStreamOperatorLifecycleTest.java | 5 +- .../operators/async/AsyncWaitOperatorTest.java | 16 +- .../streaming/runtime/io/BarrierBufferTest.java | 4 +- .../runtime/io/BarrierTrackerTest.java | 4 +- .../runtime/operators/StreamTaskTimerTest.java | 2 + .../TestProcessingTimeServiceTest.java | 2 + .../runtime/tasks/BlockingCheckpointsTest.java | 2 + .../tasks/InterruptSensitiveRestoreTest.java | 55 +- .../runtime/tasks/OneInputStreamTaskTest.java | 34 +- .../SourceExternalCheckpointTriggerTest.java | 2 + .../runtime/tasks/SourceStreamTaskTest.java | 3 + .../runtime/tasks/StreamMockEnvironment.java | 4 +- .../StreamTaskCancellationBarrierTest.java | 3 + .../tasks/StreamTaskTerminationTest.java | 2 + .../streaming/runtime/tasks/StreamTaskTest.java | 79 ++- .../runtime/tasks/StreamTaskTestHarness.java | 2 + .../runtime/tasks/TwoInputStreamTaskTest.java | 5 + .../util/AbstractStreamOperatorTestHarness.java | 22 +- .../test/checkpointing/SavepointITCase.java | 2 +- 63 files changed, 1185 insertions(+), 986 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java index d2edf0e..c752e53 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java @@ -32,8 +32,10 @@ import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.CheckpointOptions; -import org.apache.flink.runtime.checkpoint.SubtaskState; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; @@ -74,6 +76,7 @@ import java.io.IOException; import java.lang.reflect.Field; import java.net.URI; import java.util.Arrays; +import java.util.Map; import java.util.UUID; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; @@ -81,7 +84,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.RunnableFuture; import java.util.concurrent.TimeUnit; -import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; @@ -137,6 +140,7 @@ public class RocksDBAsyncSnapshotTest { streamConfig.setStateBackend(backend); streamConfig.setStreamOperator(new AsyncCheckpointOperator()); + streamConfig.setOperatorID(new OperatorID()); final OneShotLatch delayCheckpointLatch = new OneShotLatch(); final OneShotLatch ensureCheckpointLatch = new OneShotLatch(); @@ -152,7 +156,7 @@ public class RocksDBAsyncSnapshotTest { public void acknowledgeCheckpoint( long checkpointId, CheckpointMetrics checkpointMetrics, - SubtaskState checkpointStateHandles) { + TaskStateSnapshot checkpointStateHandles) { super.acknowledgeCheckpoint(checkpointId, checkpointMetrics); @@ -164,8 +168,16 @@ public class RocksDBAsyncSnapshotTest { throw new RuntimeException(e); } + boolean hasManagedKeyedState = false; + for (Map.Entry<OperatorID, OperatorSubtaskState> entry : checkpointStateHandles.getSubtaskStateMappings()) { + OperatorSubtaskState state = entry.getValue(); + if (state != null) { + hasManagedKeyedState |= state.getManagedKeyedState() != null; + } + } + // should be one k/v state - assertNotNull(checkpointStateHandles.getManagedKeyedState()); + assertTrue(hasManagedKeyedState); // we now know that the checkpoint went through ensureCheckpointLatch.trigger(); @@ -241,6 +253,7 @@ public class RocksDBAsyncSnapshotTest { streamConfig.setStateBackend(backend); streamConfig.setStreamOperator(new AsyncCheckpointOperator()); + streamConfig.setOperatorID(new OperatorID()); StreamMockEnvironment mockEnv = new StreamMockEnvironment( testHarness.jobConfig, http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 6f41867..0b64a73 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -40,7 +40,6 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.state.SharedStateRegistry; -import org.apache.flink.runtime.state.TaskStateHandles; import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; @@ -1016,7 +1015,7 @@ public class CheckpointCoordinator { * Restores the latest checkpointed state. * * @param tasks Map of job vertices to restore. State for these vertices is - * restored via {@link Execution#setInitialState(TaskStateHandles)}. + * restored via {@link Execution#setInitialState(TaskStateSnapshot)}. * @param errorIfNoCheckpoint Fail if no completed checkpoint is available to * restore from. * @param allowNonRestoredState Allow checkpoint state that cannot be mapped @@ -1102,7 +1101,7 @@ public class CheckpointCoordinator { * mapped to any job vertex in tasks. * @param tasks Map of job vertices to restore. State for these * vertices is restored via - * {@link Execution#setInitialState(TaskStateHandles)}. + * {@link Execution#setInitialState(TaskStateSnapshot)}. * @param userClassLoader The class loader to resolve serialized classes in * legacy savepoint versions. */ @@ -1256,7 +1255,7 @@ public class CheckpointCoordinator { final JobID jobId, final ExecutionAttemptID executionAttemptID, final long checkpointId, - final SubtaskState subtaskState) { + final TaskStateSnapshot subtaskState) { if (subtaskState != null) { executor.execute(new Runnable() { http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java index 43d66ee..22244f6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java @@ -29,7 +29,7 @@ public interface CheckpointCoordinatorGateway extends RpcGateway { final ExecutionAttemptID executionAttemptID, final long checkpointId, final CheckpointMetrics checkpointMetrics, - final SubtaskState subtaskState); + final TaskStateSnapshot subtaskState); void declineCheckpoint( JobID jobID, http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java index b153028..145ff6a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java @@ -30,8 +30,8 @@ import java.util.Map; import java.util.Objects; /** - * Simple container class which contains the raw/managed/legacy operator state and key-group state handles for the sub - * tasks of an operator. + * Simple container class which contains the raw/managed/legacy operator state and key-group state handles from all sub + * tasks of an operator and therefore represents the complete state of a logical operator. */ public class OperatorState implements CompositeStateHandle { http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java index e2ae632..296b5ab 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.state.CompositeStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; @@ -25,13 +26,35 @@ import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.StateObject; import org.apache.flink.runtime.state.StateUtil; import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; /** - * Container for the state of one parallel subtask of an operator. This is part of the {@link OperatorState}. + * This class encapsulates the state for one parallel instance of an operator. The complete state of a (logical) + * operator (e.g. a flatmap operator) consists of the union of all {@link OperatorSubtaskState}s from all + * parallel tasks that physically execute parallelized, physical instances of the operator. + * + * <p>The full state of the logical operator is represented by {@link OperatorState} which consists of + * {@link OperatorSubtaskState}s. + * + * <p>Typically, we expect all collections in this class to be of size 0 or 1, because there is up to one state handle + * produced per state type (e.g. managed-keyed, raw-operator, ...). In particular, this holds when taking a snapshot. + * The purpose of having the state handles in collections is that this class is also reused in restoring state. + * Under normal circumstances, the expected size of each collection is still 0 or 1, except for scale-down. In + * scale-down, one operator subtask can become responsible for the state of multiple previous subtasks. The collections + * can then store all the state handles that are relevant to build up the new subtask state. + * + * <p>There is no collection for legacy state because it is not rescalable. */ public class OperatorSubtaskState implements CompositeStateHandle { @@ -46,27 +69,32 @@ public class OperatorSubtaskState implements CompositeStateHandle { * Can be removed when we remove the APIs for non-repartitionable operator state. */ @Deprecated + @Nullable private final StreamStateHandle legacyOperatorState; /** * Snapshot from the {@link org.apache.flink.runtime.state.OperatorStateBackend}. */ - private final OperatorStateHandle managedOperatorState; + @Nonnull + private final Collection<OperatorStateHandle> managedOperatorState; /** * Snapshot written using {@link org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream}. */ - private final OperatorStateHandle rawOperatorState; + @Nonnull + private final Collection<OperatorStateHandle> rawOperatorState; /** * Snapshot from {@link org.apache.flink.runtime.state.KeyedStateBackend}. */ - private final KeyedStateHandle managedKeyedState; + @Nonnull + private final Collection<KeyedStateHandle> managedKeyedState; /** * Snapshot written using {@link org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream}. */ - private final KeyedStateHandle rawKeyedState; + @Nonnull + private final Collection<KeyedStateHandle> rawKeyedState; /** * The state size. This is also part of the deserialized state handle. @@ -75,31 +103,79 @@ public class OperatorSubtaskState implements CompositeStateHandle { */ private final long stateSize; + @VisibleForTesting + public OperatorSubtaskState(StreamStateHandle legacyOperatorState) { + + this(legacyOperatorState, + Collections.<OperatorStateHandle>emptyList(), + Collections.<OperatorStateHandle>emptyList(), + Collections.<KeyedStateHandle>emptyList(), + Collections.<KeyedStateHandle>emptyList()); + } + + /** + * Empty state. + */ + public OperatorSubtaskState() { + this(null); + } + public OperatorSubtaskState( StreamStateHandle legacyOperatorState, - OperatorStateHandle managedOperatorState, - OperatorStateHandle rawOperatorState, - KeyedStateHandle managedKeyedState, - KeyedStateHandle rawKeyedState) { + Collection<OperatorStateHandle> managedOperatorState, + Collection<OperatorStateHandle> rawOperatorState, + Collection<KeyedStateHandle> managedKeyedState, + Collection<KeyedStateHandle> rawKeyedState) { this.legacyOperatorState = legacyOperatorState; - this.managedOperatorState = managedOperatorState; - this.rawOperatorState = rawOperatorState; - this.managedKeyedState = managedKeyedState; - this.rawKeyedState = rawKeyedState; + this.managedOperatorState = Preconditions.checkNotNull(managedOperatorState); + this.rawOperatorState = Preconditions.checkNotNull(rawOperatorState); + this.managedKeyedState = Preconditions.checkNotNull(managedKeyedState); + this.rawKeyedState = Preconditions.checkNotNull(rawKeyedState); try { long calculateStateSize = getSizeNullSafe(legacyOperatorState); - calculateStateSize += getSizeNullSafe(managedOperatorState); - calculateStateSize += getSizeNullSafe(rawOperatorState); - calculateStateSize += getSizeNullSafe(managedKeyedState); - calculateStateSize += getSizeNullSafe(rawKeyedState); + calculateStateSize += sumAllSizes(managedOperatorState); + calculateStateSize += sumAllSizes(rawOperatorState); + calculateStateSize += sumAllSizes(managedKeyedState); + calculateStateSize += sumAllSizes(rawKeyedState); stateSize = calculateStateSize; } catch (Exception e) { throw new RuntimeException("Failed to get state size.", e); } } + /** + * For convenience because the size of the collections is typically 0 or 1. Null values are translated into empty + * Collections (except for legacy state). + */ + public OperatorSubtaskState( + StreamStateHandle legacyOperatorState, + OperatorStateHandle managedOperatorState, + OperatorStateHandle rawOperatorState, + KeyedStateHandle managedKeyedState, + KeyedStateHandle rawKeyedState) { + + this(legacyOperatorState, + singletonOrEmptyOnNull(managedOperatorState), + singletonOrEmptyOnNull(rawOperatorState), + singletonOrEmptyOnNull(managedKeyedState), + singletonOrEmptyOnNull(rawKeyedState)); + } + + private static <T> Collection<T> singletonOrEmptyOnNull(T element) { + return element != null ? Collections.singletonList(element) : Collections.<T>emptyList(); + } + + private static long sumAllSizes(Collection<? extends StateObject> stateObject) throws Exception { + long size = 0L; + for (StateObject object : stateObject) { + size += getSizeNullSafe(object); + } + + return size; + } + private static long getSizeNullSafe(StateObject stateObject) throws Exception { return stateObject != null ? stateObject.getStateSize() : 0L; } @@ -111,36 +187,58 @@ public class OperatorSubtaskState implements CompositeStateHandle { * Can be removed when we remove the APIs for non-repartitionable operator state. */ @Deprecated + @Nullable public StreamStateHandle getLegacyOperatorState() { return legacyOperatorState; } - public OperatorStateHandle getManagedOperatorState() { + /** + * Returns a handle to the managed operator state. + */ + @Nonnull + public Collection<OperatorStateHandle> getManagedOperatorState() { return managedOperatorState; } - public OperatorStateHandle getRawOperatorState() { + /** + * Returns a handle to the raw operator state. + */ + @Nonnull + public Collection<OperatorStateHandle> getRawOperatorState() { return rawOperatorState; } - public KeyedStateHandle getManagedKeyedState() { + /** + * Returns a handle to the managed keyed state. + */ + @Nonnull + public Collection<KeyedStateHandle> getManagedKeyedState() { return managedKeyedState; } - public KeyedStateHandle getRawKeyedState() { + /** + * Returns a handle to the raw keyed state. + */ + @Nonnull + public Collection<KeyedStateHandle> getRawKeyedState() { return rawKeyedState; } @Override public void discardState() { try { - StateUtil.bestEffortDiscardAllStateObjects( - Arrays.asList( - legacyOperatorState, - managedOperatorState, - rawOperatorState, - managedKeyedState, - rawKeyedState)); + List<StateObject> toDispose = + new ArrayList<>(1 + + managedOperatorState.size() + + rawOperatorState.size() + + managedKeyedState.size() + + rawKeyedState.size()); + toDispose.add(legacyOperatorState); + toDispose.addAll(managedOperatorState); + toDispose.addAll(rawOperatorState); + toDispose.addAll(managedKeyedState); + toDispose.addAll(rawKeyedState); + StateUtil.bestEffortDiscardAllStateObjects(toDispose); } catch (Exception e) { LOG.warn("Error while discarding operator states.", e); } @@ -148,12 +246,17 @@ public class OperatorSubtaskState implements CompositeStateHandle { @Override public void registerSharedStates(SharedStateRegistry sharedStateRegistry) { - if (managedKeyedState != null) { - managedKeyedState.registerSharedStates(sharedStateRegistry); - } + registerSharedState(sharedStateRegistry, managedKeyedState); + registerSharedState(sharedStateRegistry, rawKeyedState); + } - if (rawKeyedState != null) { - rawKeyedState.registerSharedStates(sharedStateRegistry); + private static void registerSharedState( + SharedStateRegistry sharedStateRegistry, + Iterable<KeyedStateHandle> stateHandles) { + for (KeyedStateHandle stateHandle : stateHandles) { + if (stateHandle != null) { + stateHandle.registerSharedStates(sharedStateRegistry); + } } } @@ -175,44 +278,32 @@ public class OperatorSubtaskState implements CompositeStateHandle { OperatorSubtaskState that = (OperatorSubtaskState) o; - if (stateSize != that.stateSize) { + if (getStateSize() != that.getStateSize()) { return false; } - - if (legacyOperatorState != null ? - !legacyOperatorState.equals(that.legacyOperatorState) - : that.legacyOperatorState != null) { + if (getLegacyOperatorState() != null ? !getLegacyOperatorState().equals(that.getLegacyOperatorState()) : that.getLegacyOperatorState() != null) { return false; } - if (managedOperatorState != null ? - !managedOperatorState.equals(that.managedOperatorState) - : that.managedOperatorState != null) { + if (!getManagedOperatorState().equals(that.getManagedOperatorState())) { return false; } - if (rawOperatorState != null ? - !rawOperatorState.equals(that.rawOperatorState) - : that.rawOperatorState != null) { + if (!getRawOperatorState().equals(that.getRawOperatorState())) { return false; } - if (managedKeyedState != null ? - !managedKeyedState.equals(that.managedKeyedState) - : that.managedKeyedState != null) { + if (!getManagedKeyedState().equals(that.getManagedKeyedState())) { return false; } - return rawKeyedState != null ? - rawKeyedState.equals(that.rawKeyedState) - : that.rawKeyedState == null; - + return getRawKeyedState().equals(that.getRawKeyedState()); } @Override public int hashCode() { - int result = legacyOperatorState != null ? legacyOperatorState.hashCode() : 0; - result = 31 * result + (managedOperatorState != null ? managedOperatorState.hashCode() : 0); - result = 31 * result + (rawOperatorState != null ? rawOperatorState.hashCode() : 0); - result = 31 * result + (managedKeyedState != null ? managedKeyedState.hashCode() : 0); - result = 31 * result + (rawKeyedState != null ? rawKeyedState.hashCode() : 0); - result = 31 * result + (int) (stateSize ^ (stateSize >>> 32)); + int result = getLegacyOperatorState() != null ? getLegacyOperatorState().hashCode() : 0; + result = 31 * result + getManagedOperatorState().hashCode(); + result = 31 * result + getRawOperatorState().hashCode(); + result = 31 * result + getManagedKeyedState().hashCode(); + result = 31 * result + getRawKeyedState().hashCode(); + result = 31 * result + (int) (getStateSize() ^ (getStateSize() >>> 32)); return result; } @@ -227,4 +318,21 @@ public class OperatorSubtaskState implements CompositeStateHandle { ", stateSize=" + stateSize + '}'; } + + public boolean hasState() { + return legacyOperatorState != null + || hasState(managedOperatorState) + || hasState(rawOperatorState) + || hasState(managedKeyedState) + || hasState(rawKeyedState); + } + + private boolean hasState(Iterable<? extends StateObject> states) { + for (StateObject state : states) { + if (state != null) { + return true; + } + } + return false; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index 3472fc2..16231dd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -25,19 +25,18 @@ import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.jobgraph.OperatorID; -import org.apache.flink.runtime.state.ChainedStateHandle; -import org.apache.flink.runtime.state.KeyedStateHandle; -import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StateUtil; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.filesystem.FileStateHandle; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; + import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -353,13 +352,13 @@ public class PendingCheckpoint { * Acknowledges the task with the given execution attempt id and the given subtask state. * * @param executionAttemptId of the acknowledged task - * @param subtaskState of the acknowledged task + * @param operatorSubtaskStates of the acknowledged task * @param metrics Checkpoint metrics for the stats * @return TaskAcknowledgeResult of the operation */ public TaskAcknowledgeResult acknowledgeTask( ExecutionAttemptID executionAttemptId, - SubtaskState subtaskState, + TaskStateSnapshot operatorSubtaskStates, CheckpointMetrics metrics) { synchronized (lock) { @@ -383,21 +382,19 @@ public class PendingCheckpoint { int subtaskIndex = vertex.getParallelSubtaskIndex(); long ackTimestamp = System.currentTimeMillis(); - long stateSize = 0; - if (subtaskState != null) { - stateSize = subtaskState.getStateSize(); - - @SuppressWarnings("deprecation") - ChainedStateHandle<StreamStateHandle> nonPartitionedState = - subtaskState.getLegacyOperatorState(); - ChainedStateHandle<OperatorStateHandle> partitioneableState = - subtaskState.getManagedOperatorState(); - ChainedStateHandle<OperatorStateHandle> rawOperatorState = - subtaskState.getRawOperatorState(); - - // break task state apart into separate operator states - for (int x = 0; x < operatorIDs.size(); x++) { - OperatorID operatorID = operatorIDs.get(x); + long stateSize = 0L; + + if (operatorSubtaskStates != null) { + for (OperatorID operatorID : operatorIDs) { + + OperatorSubtaskState operatorSubtaskState = + operatorSubtaskStates.getSubtaskStateByOperatorID(operatorID); + + // if no real operatorSubtaskState was reported, we insert an empty state + if (operatorSubtaskState == null) { + operatorSubtaskState = new OperatorSubtaskState(); + } + OperatorState operatorState = operatorStates.get(operatorID); if (operatorState == null) { @@ -408,23 +405,8 @@ public class PendingCheckpoint { operatorStates.put(operatorID, operatorState); } - KeyedStateHandle managedKeyedState = null; - KeyedStateHandle rawKeyedState = null; - - // only the head operator retains the keyed state - if (x == operatorIDs.size() - 1) { - managedKeyedState = subtaskState.getManagedKeyedState(); - rawKeyedState = subtaskState.getRawKeyedState(); - } - - OperatorSubtaskState operatorSubtaskState = new OperatorSubtaskState( - nonPartitionedState != null ? nonPartitionedState.get(x) : null, - partitioneableState != null ? partitioneableState.get(x) : null, - rawOperatorState != null ? rawOperatorState.get(x) : null, - managedKeyedState, - rawKeyedState); - operatorState.putState(subtaskIndex, operatorSubtaskState); + stateSize += operatorSubtaskState.getStateSize(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java index 046096f..4513ef8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java @@ -89,6 +89,10 @@ public class RoundRobinOperatorStateRepartitioner implements OperatorStateRepart for (OperatorStateHandle psh : previousParallelSubtaskStates) { + if (psh == null) { + continue; + } + for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> e : psh.getStateNameToPartitionOffsets().entrySet()) { OperatorStateHandle.StateMetaInfo metaInfo = e.getValue(); http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java index 5712ea1..b69285e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java @@ -23,15 +23,14 @@ import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; -import org.apache.flink.runtime.state.ChainedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; -import org.apache.flink.runtime.state.TaskStateHandles; import org.apache.flink.util.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -185,7 +184,8 @@ public class StateAssignmentOperation { subNonPartitionableState); // PartitionedState - reAssignSubPartitionableState(newManagedOperatorStates, + reAssignSubPartitionableState( + newManagedOperatorStates, newRawOperatorStates, subTaskIndex, operatorIndex, @@ -193,36 +193,57 @@ public class StateAssignmentOperation { subRawOperatorState); // KeyedState - if (operatorIndex == operatorIDs.size() - 1) { - subKeyedState = reAssignSubKeyedStates(operatorState, + if (isHeadOperator(operatorIndex, operatorIDs)) { + subKeyedState = reAssignSubKeyedStates( + operatorState, keyGroupPartitions, subTaskIndex, newParallelism, oldParallelism); - } } - // check if a stateless task if (!allElementsAreNull(subNonPartitionableState) || !allElementsAreNull(subManagedOperatorState) || !allElementsAreNull(subRawOperatorState) || subKeyedState != null) { - TaskStateHandles taskStateHandles = new TaskStateHandles( + TaskStateSnapshot taskState = new TaskStateSnapshot(); - new ChainedStateHandle<>(subNonPartitionableState), - subManagedOperatorState, - subRawOperatorState, - subKeyedState != null ? subKeyedState.f0 : null, - subKeyedState != null ? subKeyedState.f1 : null); + for (int i = 0; i < operatorIDs.size(); ++i) { + + OperatorID operatorID = operatorIDs.get(i); + + Collection<KeyedStateHandle> rawKeyed = Collections.emptyList(); + Collection<KeyedStateHandle> managedKeyed = Collections.emptyList(); + + // keyed state case + if (subKeyedState != null) { + managedKeyed = subKeyedState.f0; + rawKeyed = subKeyedState.f1; + } + + OperatorSubtaskState operatorSubtaskState = + new OperatorSubtaskState( + subNonPartitionableState.get(i), + subManagedOperatorState.get(i), + subRawOperatorState.get(i), + managedKeyed, + rawKeyed + ); + + taskState.putSubtaskStateByOperatorID(operatorID, operatorSubtaskState); + } - currentExecutionAttempt.setInitialState(taskStateHandles); + currentExecutionAttempt.setInitialState(taskState); } } } + private static boolean isHeadOperator(int opIdx, List<OperatorID> operatorIDs) { + return opIdx == operatorIDs.size() - 1; + } public void checkParallelismPreconditions(List<OperatorState> operatorStates, ExecutionJobVertex executionJobVertex) { @@ -239,18 +260,18 @@ public class StateAssignmentOperation { List<Collection<OperatorStateHandle>> subManagedOperatorState, List<Collection<OperatorStateHandle>> subRawOperatorState) { - if (newMangedOperatorStates.get(operatorIndex) != null) { - subManagedOperatorState.add(newMangedOperatorStates.get(operatorIndex).get(subTaskIndex)); + if (newMangedOperatorStates.get(operatorIndex) != null && !newMangedOperatorStates.get(operatorIndex).isEmpty()) { + Collection<OperatorStateHandle> operatorStateHandles = newMangedOperatorStates.get(operatorIndex).get(subTaskIndex); + subManagedOperatorState.add(operatorStateHandles != null ? operatorStateHandles : Collections.<OperatorStateHandle>emptyList()); } else { - subManagedOperatorState.add(null); + subManagedOperatorState.add(Collections.<OperatorStateHandle>emptyList()); } - if (newRawOperatorStates.get(operatorIndex) != null) { - subRawOperatorState.add(newRawOperatorStates.get(operatorIndex).get(subTaskIndex)); + if (newRawOperatorStates.get(operatorIndex) != null && !newRawOperatorStates.get(operatorIndex).isEmpty()) { + Collection<OperatorStateHandle> operatorStateHandles = newRawOperatorStates.get(operatorIndex).get(subTaskIndex); + subRawOperatorState.add(operatorStateHandles != null ? operatorStateHandles : Collections.<OperatorStateHandle>emptyList()); } else { - subRawOperatorState.add(null); + subRawOperatorState.add(Collections.<OperatorStateHandle>emptyList()); } - - } private Tuple2<Collection<KeyedStateHandle>, Collection<KeyedStateHandle>> reAssignSubKeyedStates( @@ -265,24 +286,22 @@ public class StateAssignmentOperation { if (newParallelism == oldParallelism) { if (operatorState.getState(subTaskIndex) != null) { - KeyedStateHandle oldSubManagedKeyedState = operatorState.getState(subTaskIndex).getManagedKeyedState(); - KeyedStateHandle oldSubRawKeyedState = operatorState.getState(subTaskIndex).getRawKeyedState(); - subManagedKeyedState = oldSubManagedKeyedState != null ? Collections.singletonList( - oldSubManagedKeyedState) : null; - subRawKeyedState = oldSubRawKeyedState != null ? Collections.singletonList( - oldSubRawKeyedState) : null; + subManagedKeyedState = operatorState.getState(subTaskIndex).getManagedKeyedState(); + subRawKeyedState = operatorState.getState(subTaskIndex).getRawKeyedState(); } else { - subManagedKeyedState = null; - subRawKeyedState = null; + subManagedKeyedState = Collections.emptyList(); + subRawKeyedState = Collections.emptyList(); } } else { subManagedKeyedState = getManagedKeyedStateHandles(operatorState, keyGroupPartitions.get(subTaskIndex)); subRawKeyedState = getRawKeyedStateHandles(operatorState, keyGroupPartitions.get(subTaskIndex)); } - if (subManagedKeyedState == null && subRawKeyedState == null) { + + if (subManagedKeyedState.isEmpty() && subRawKeyedState.isEmpty()) { return null; + } else { + return new Tuple2<>(subManagedKeyedState, subRawKeyedState); } - return new Tuple2<>(subManagedKeyedState, subRawKeyedState); } @@ -318,7 +337,7 @@ public class StateAssignmentOperation { List<List<Collection<OperatorStateHandle>>> newManagedOperatorStates, List<List<Collection<OperatorStateHandle>>> newRawOperatorStates) { - //collect the old partitionalbe state + //collect the old partitionable state List<List<OperatorStateHandle>> oldManagedOperatorStates = new ArrayList<>(); List<List<OperatorStateHandle>> oldRawOperatorStates = new ArrayList<>(); @@ -351,19 +370,16 @@ public class StateAssignmentOperation { for (int i = 0; i < operatorState.getParallelism(); i++) { OperatorSubtaskState operatorSubtaskState = operatorState.getState(i); if (operatorSubtaskState != null) { - if (operatorSubtaskState.getManagedOperatorState() != null) { - if (managedOperatorState == null) { - managedOperatorState = new ArrayList<>(); - } - managedOperatorState.add(operatorSubtaskState.getManagedOperatorState()); + + if (managedOperatorState == null) { + managedOperatorState = new ArrayList<>(); } + managedOperatorState.addAll(operatorSubtaskState.getManagedOperatorState()); - if (operatorSubtaskState.getRawOperatorState() != null) { - if (rawOperatorState == null) { - rawOperatorState = new ArrayList<>(); - } - rawOperatorState.add(operatorSubtaskState.getRawOperatorState()); + if (rawOperatorState == null) { + rawOperatorState = new ArrayList<>(); } + rawOperatorState.addAll(operatorSubtaskState.getRawOperatorState()); } } @@ -382,21 +398,19 @@ public class StateAssignmentOperation { * @return all managedKeyedStateHandles which have intersection with given KeyGroupRange */ public static List<KeyedStateHandle> getManagedKeyedStateHandles( - OperatorState operatorState, - KeyGroupRange subtaskKeyGroupRange) { + OperatorState operatorState, + KeyGroupRange subtaskKeyGroupRange) { - List<KeyedStateHandle> subtaskKeyedStateHandles = null; + List<KeyedStateHandle> subtaskKeyedStateHandles = new ArrayList<>(); for (int i = 0; i < operatorState.getParallelism(); i++) { - if (operatorState.getState(i) != null && operatorState.getState(i).getManagedKeyedState() != null) { - KeyedStateHandle intersectedKeyedStateHandle = operatorState.getState(i).getManagedKeyedState().getIntersection(subtaskKeyGroupRange); + if (operatorState.getState(i) != null) { - if (intersectedKeyedStateHandle != null) { - if (subtaskKeyedStateHandles == null) { - subtaskKeyedStateHandles = new ArrayList<>(); - } - subtaskKeyedStateHandles.add(intersectedKeyedStateHandle); - } + Collection<KeyedStateHandle> keyedStateHandles = operatorState.getState(i).getManagedKeyedState(); + extractIntersectingState( + keyedStateHandles, + subtaskKeyGroupRange, + subtaskKeyedStateHandles); } } @@ -415,22 +429,40 @@ public class StateAssignmentOperation { OperatorState operatorState, KeyGroupRange subtaskKeyGroupRange) { - List<KeyedStateHandle> subtaskKeyedStateHandles = null; + List<KeyedStateHandle> extractedKeyedStateHandles = new ArrayList<>(); for (int i = 0; i < operatorState.getParallelism(); i++) { - if (operatorState.getState(i) != null && operatorState.getState(i).getRawKeyedState() != null) { - KeyedStateHandle intersectedKeyedStateHandle = operatorState.getState(i).getRawKeyedState().getIntersection(subtaskKeyGroupRange); + if (operatorState.getState(i) != null) { + Collection<KeyedStateHandle> rawKeyedState = operatorState.getState(i).getRawKeyedState(); + extractIntersectingState( + rawKeyedState, + subtaskKeyGroupRange, + extractedKeyedStateHandles); + } + } + + return extractedKeyedStateHandles; + } + + /** + * Extracts certain key group ranges from the given state handles and adds them to the collector. + */ + private static void extractIntersectingState( + Collection<KeyedStateHandle> originalSubtaskStateHandles, + KeyGroupRange rangeToExtract, + List<KeyedStateHandle> extractedStateCollector) { + + for (KeyedStateHandle keyedStateHandle : originalSubtaskStateHandles) { + + if (keyedStateHandle != null) { + + KeyedStateHandle intersectedKeyedStateHandle = keyedStateHandle.getIntersection(rangeToExtract); if (intersectedKeyedStateHandle != null) { - if (subtaskKeyedStateHandles == null) { - subtaskKeyedStateHandles = new ArrayList<>(); - } - subtaskKeyedStateHandles.add(intersectedKeyedStateHandle); + extractedStateCollector.add(intersectedKeyedStateHandle); } } } - - return subtaskKeyedStateHandles; } /** @@ -554,7 +586,7 @@ public class StateAssignmentOperation { int newParallelism) { if (chainOpParallelStates == null) { - return null; + return Collections.emptyList(); } //We only redistribute if the parallelism of the operator changed from previous executions @@ -567,20 +599,23 @@ public class StateAssignmentOperation { List<Collection<OperatorStateHandle>> repackStream = new ArrayList<>(newParallelism); for (OperatorStateHandle operatorStateHandle : chainOpParallelStates) { - Map<String, OperatorStateHandle.StateMetaInfo> partitionOffsets = + if (operatorStateHandle != null) { + Map<String, OperatorStateHandle.StateMetaInfo> partitionOffsets = operatorStateHandle.getStateNameToPartitionOffsets(); - for (OperatorStateHandle.StateMetaInfo metaInfo : partitionOffsets.values()) { - // if we find any broadcast state, we cannot take the shortcut and need to go through repartitioning - if (OperatorStateHandle.Mode.BROADCAST.equals(metaInfo.getDistributionMode())) { - return opStateRepartitioner.repartitionState( + for (OperatorStateHandle.StateMetaInfo metaInfo : partitionOffsets.values()) { + + // if we find any broadcast state, we cannot take the shortcut and need to go through repartitioning + if (OperatorStateHandle.Mode.BROADCAST.equals(metaInfo.getDistributionMode())) { + return opStateRepartitioner.repartitionState( chainOpParallelStates, newParallelism); + } } - } - repackStream.add(Collections.singletonList(operatorStateHandle)); + repackStream.add(Collections.singletonList(operatorStateHandle)); + } } return repackStream; } http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java new file mode 100644 index 0000000..c416f3f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.CompositeStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.StateUtil; +import org.apache.flink.util.Preconditions; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * This class encapsulates state handles to the snapshots of all operator instances executed within one task. A task + * can run multiple operator instances as a result of operator chaining, and all operator instances from the chain can + * register their state under their operator id. Each operator instance is a physical execution responsible for + * processing a partition of the data that goes through a logical operator. This partitioning happens to parallelize + * execution of logical operators, e.g. distributing a map function. + * + * <p>One instance of this class contains the information that one task will send to acknowledge a checkpoint request by + * the checkpoint coordinator. Tasks run operator instances in parallel, so the union of all + * {@link TaskStateSnapshot} that are collected by the checkpoint coordinator from all tasks represent the whole + * state of a job at the time of the checkpoint. + * + * <p>This class should be called TaskState once the old class with this name that we keep for backwards + * compatibility goes away. + */ +public class TaskStateSnapshot implements CompositeStateHandle { + + private static final long serialVersionUID = 1L; + + /** Mapping from an operator id to the state of one subtask of this operator */ + private final Map<OperatorID, OperatorSubtaskState> subtaskStatesByOperatorID; + + public TaskStateSnapshot() { + this(10); + } + + public TaskStateSnapshot(int size) { + this(new HashMap<OperatorID, OperatorSubtaskState>(size)); + } + + public TaskStateSnapshot(Map<OperatorID, OperatorSubtaskState> subtaskStatesByOperatorID) { + this.subtaskStatesByOperatorID = Preconditions.checkNotNull(subtaskStatesByOperatorID); + } + + /** + * Returns the subtask state for the given operator id (or null if not contained). + */ + public OperatorSubtaskState getSubtaskStateByOperatorID(OperatorID operatorID) { + return subtaskStatesByOperatorID.get(operatorID); + } + + /** + * Maps the given operator id to the given subtask state. Returns the subtask state of a previous mapping, if such + * a mapping existed or null otherwise. + */ + public OperatorSubtaskState putSubtaskStateByOperatorID(OperatorID operatorID, OperatorSubtaskState state) { + return subtaskStatesByOperatorID.put(operatorID, Preconditions.checkNotNull(state)); + } + + /** + * Returns the set of all mappings from operator id to the corresponding subtask state. + */ + public Set<Map.Entry<OperatorID, OperatorSubtaskState>> getSubtaskStateMappings() { + return subtaskStatesByOperatorID.entrySet(); + } + + @Override + public void discardState() throws Exception { + StateUtil.bestEffortDiscardAllStateObjects(subtaskStatesByOperatorID.values()); + } + + @Override + public long getStateSize() { + long size = 0L; + + for (OperatorSubtaskState subtaskState : subtaskStatesByOperatorID.values()) { + if (subtaskState != null) { + size += subtaskState.getStateSize(); + } + } + + return size; + } + + @Override + public void registerSharedStates(SharedStateRegistry stateRegistry) { + for (OperatorSubtaskState operatorSubtaskState : subtaskStatesByOperatorID.values()) { + if (operatorSubtaskState != null) { + operatorSubtaskState.registerSharedStates(stateRegistry); + } + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + TaskStateSnapshot that = (TaskStateSnapshot) o; + + return subtaskStatesByOperatorID.equals(that.subtaskStatesByOperatorID); + } + + @Override + public int hashCode() { + return subtaskStatesByOperatorID.hashCode(); + } + + @Override + public String toString() { + return "TaskOperatorSubtaskStates{" + + "subtaskStatesByOperatorID=" + subtaskStatesByOperatorID + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java index 4cbbfcf..15628a0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java @@ -240,6 +240,18 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> { // task state (de)serialization methods // ------------------------------------------------------------------------ + private static <T> T extractSingleton(Collection<T> collection) { + if (collection == null || collection.isEmpty()) { + return null; + } + + if (collection.size() == 1) { + return collection.iterator().next(); + } else { + throw new IllegalStateException("Expected singleton collection, but found size: " + collection.size()); + } + } + private static void serializeSubtaskState(OperatorSubtaskState subtaskState, DataOutputStream dos) throws IOException { dos.writeLong(-1); @@ -252,7 +264,7 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> { serializeStreamStateHandle(nonPartitionableState, dos); } - OperatorStateHandle operatorStateBackend = subtaskState.getManagedOperatorState(); + OperatorStateHandle operatorStateBackend = extractSingleton(subtaskState.getManagedOperatorState()); len = operatorStateBackend != null ? 1 : 0; dos.writeInt(len); @@ -260,7 +272,7 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> { serializeOperatorStateHandle(operatorStateBackend, dos); } - OperatorStateHandle operatorStateFromStream = subtaskState.getRawOperatorState(); + OperatorStateHandle operatorStateFromStream = extractSingleton(subtaskState.getRawOperatorState()); len = operatorStateFromStream != null ? 1 : 0; dos.writeInt(len); @@ -268,10 +280,10 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> { serializeOperatorStateHandle(operatorStateFromStream, dos); } - KeyedStateHandle keyedStateBackend = subtaskState.getManagedKeyedState(); + KeyedStateHandle keyedStateBackend = extractSingleton(subtaskState.getManagedKeyedState()); serializeKeyedStateHandle(keyedStateBackend, dos); - KeyedStateHandle keyedStateStream = subtaskState.getRawKeyedState(); + KeyedStateHandle keyedStateStream = extractSingleton(subtaskState.getRawKeyedState()); serializeKeyedStateHandle(keyedStateStream, dos); } http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java index 0578b78..1fa5eb5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java @@ -18,11 +18,11 @@ package org.apache.flink.runtime.deployment; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.JobInformation; import org.apache.flink.runtime.executiongraph.TaskInformation; -import org.apache.flink.runtime.state.TaskStateHandles; import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; @@ -64,7 +64,7 @@ public final class TaskDeploymentDescriptor implements Serializable { private final int targetSlotNumber; /** State handles for the sub task. */ - private final TaskStateHandles taskStateHandles; + private final TaskStateSnapshot taskStateHandles; public TaskDeploymentDescriptor( SerializedValue<JobInformation> serializedJobInformation, @@ -74,7 +74,7 @@ public final class TaskDeploymentDescriptor implements Serializable { int subtaskIndex, int attemptNumber, int targetSlotNumber, - TaskStateHandles taskStateHandles, + TaskStateSnapshot taskStateHandles, Collection<ResultPartitionDeploymentDescriptor> resultPartitionDeploymentDescriptors, Collection<InputGateDeploymentDescriptor> inputGateDeploymentDescriptors) { @@ -153,7 +153,7 @@ public final class TaskDeploymentDescriptor implements Serializable { return inputGates; } - public TaskStateHandles getTaskStateHandles() { + public TaskStateSnapshot getTaskStateHandles() { return taskStateHandles; } http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java index 9e9f7c4..203ee85 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java @@ -18,8 +18,6 @@ package org.apache.flink.runtime.execution; -import java.util.Map; -import java.util.concurrent.Future; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.TaskInfo; @@ -28,7 +26,7 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; -import org.apache.flink.runtime.checkpoint.SubtaskState; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; @@ -41,6 +39,9 @@ import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; +import java.util.Map; +import java.util.concurrent.Future; + /** * The Environment gives the code executed in a task access to the task's properties * (such as name, parallelism), the configurations, the data stream readers and writers, @@ -175,7 +176,7 @@ public interface Environment { * @param checkpointMetrics metrics for this checkpoint * @param subtaskState All state handles for the checkpointed state */ - void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics, SubtaskState subtaskState); + void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics, TaskStateSnapshot subtaskState); /** * Declines a checkpoint. This tells the checkpoint coordinator that this task will http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index bd5bc7f..2074820 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; @@ -41,7 +42,6 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.StackTraceSampleResponse; -import org.apache.flink.runtime.state.TaskStateHandles; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.ExceptionUtils; @@ -133,7 +133,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution private volatile Throwable failureCause; // once assigned, never changes /** The handle to the state that the task gets on restore */ - private volatile TaskStateHandles taskState; + private volatile TaskStateSnapshot taskState; // ------------------------ Accumulators & Metrics ------------------------ @@ -253,7 +253,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution return state.isTerminal(); } - public TaskStateHandles getTaskStateHandles() { + public TaskStateSnapshot getTaskStateSnapshot() { return taskState; } @@ -263,7 +263,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution * * @param checkpointStateHandles all checkpointed operator state */ - public void setInitialState(TaskStateHandles checkpointStateHandles) { + public void setInitialState(TaskStateSnapshot checkpointStateHandles) { checkState(state == CREATED, "Can only assign operator state when execution attempt is in CREATED"); this.taskState = checkpointStateHandles; } http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 0ff71e7..9aac133 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -22,7 +22,9 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.Archiveable; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor; @@ -38,11 +40,9 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.jobgraph.JobEdge; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; -import org.apache.flink.runtime.state.TaskStateHandles; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.util.EvictingBoundedList; import org.apache.flink.util.ExceptionUtils; @@ -457,7 +457,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi */ public Iterable<TaskManagerLocation> getPreferredLocationsBasedOnState() { TaskManagerLocation priorLocation; - if (currentExecution.getTaskStateHandles() != null && (priorLocation = getLatestPriorLocation()) != null) { + if (currentExecution.getTaskStateSnapshot() != null && (priorLocation = getLatestPriorLocation()) != null) { return Collections.singleton(priorLocation); } else { @@ -719,7 +719,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi TaskDeploymentDescriptor createDeploymentDescriptor( ExecutionAttemptID executionId, SimpleSlot targetSlot, - TaskStateHandles taskStateHandles, + TaskStateSnapshot taskStateHandles, int attemptNumber) throws ExecutionGraphException { // Produced intermediate results http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java index 0930011..00db01f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java @@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobgraph.tasks; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.CheckpointOptions; -import org.apache.flink.runtime.state.TaskStateHandles; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; /** * This interface must be implemented by any invokable that has recoverable state and participates @@ -35,7 +35,7 @@ public interface StatefulTask { * * @param taskStateHandles All state handle for the task. */ - void setInitialState(TaskStateHandles taskStateHandles) throws Exception; + void setInitialState(TaskStateSnapshot taskStateHandles) throws Exception; /** * This method is called to trigger a checkpoint, asynchronously by the checkpoint @@ -43,8 +43,8 @@ public interface StatefulTask { * * <p>This method is called for tasks that start the checkpoints by injecting the initial barriers, * i.e., the source tasks. In contrast, checkpoints on downstream operators, which are the result of - * receiving checkpoint barriers, invoke the {@link #triggerCheckpointOnBarrier(CheckpointMetaData, CheckpointMetrics)} - * method. + * receiving checkpoint barriers, invoke the + * {@link #triggerCheckpointOnBarrier(CheckpointMetaData, CheckpointOptions, CheckpointMetrics)} method. * * @param checkpointMetaData Meta data for about this checkpoint * @param checkpointOptions Options for performing this checkpoint http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 31036f6..25df19b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -31,7 +31,7 @@ import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; -import org.apache.flink.runtime.checkpoint.SubtaskState; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -96,6 +96,7 @@ import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import javax.annotation.Nullable; + import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -586,7 +587,7 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway { final ExecutionAttemptID executionAttemptID, final long checkpointId, final CheckpointMetrics checkpointMetrics, - final SubtaskState checkpointState) { + final TaskStateSnapshot checkpointState) { final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); final AcknowledgeCheckpoint ackMessage = http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java index 9721c2c..65e3019 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java @@ -21,7 +21,7 @@ package org.apache.flink.runtime.messages.checkpoint; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; -import org.apache.flink.runtime.checkpoint.SubtaskState; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; /** @@ -36,7 +36,7 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements private static final long serialVersionUID = -7606214777192401493L; - private final SubtaskState subtaskState; + private final TaskStateSnapshot subtaskState; private final CheckpointMetrics checkpointMetrics; @@ -47,7 +47,7 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements ExecutionAttemptID taskExecutionId, long checkpointId, CheckpointMetrics checkpointMetrics, - SubtaskState subtaskState) { + TaskStateSnapshot subtaskState) { super(job, taskExecutionId, checkpointId); @@ -64,7 +64,7 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements // properties // ------------------------------------------------------------------------ - public SubtaskState getSubtaskState() { + public TaskStateSnapshot getSubtaskState() { return subtaskState; } http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java index d82af72..031d7c7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.state; -import org.apache.commons.io.IOUtils; import org.apache.flink.api.common.state.KeyedStateStore; import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.api.java.tuple.Tuple2; @@ -26,6 +25,8 @@ import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.util.Preconditions; +import org.apache.commons.io.IOUtils; + import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -139,6 +140,7 @@ public class StateInitializationContextImpl implements StateInitializationContex } private static Collection<KeyGroupsStateHandle> transform(Collection<KeyedStateHandle> keyedStateHandles) { + if (keyedStateHandles == null) { return null; } @@ -146,13 +148,14 @@ public class StateInitializationContextImpl implements StateInitializationContex List<KeyGroupsStateHandle> keyGroupsStateHandles = new ArrayList<>(); for (KeyedStateHandle keyedStateHandle : keyedStateHandles) { - if (! (keyedStateHandle instanceof KeyGroupsStateHandle)) { + + if (keyedStateHandle instanceof KeyGroupsStateHandle) { + keyGroupsStateHandles.add((KeyGroupsStateHandle) keyedStateHandle); + } else if (keyedStateHandle != null) { throw new IllegalStateException("Unexpected state handle type, " + "expected: " + KeyGroupsStateHandle.class + ", but found: " + keyedStateHandle.getClass() + "."); } - - keyGroupsStateHandles.add((KeyGroupsStateHandle) keyedStateHandle); } return keyGroupsStateHandles; http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateHandles.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateHandles.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateHandles.java deleted file mode 100644 index 2fde548..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateHandles.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state; - -import org.apache.flink.runtime.checkpoint.SubtaskState; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; - -/** - * This class encapsulates all state handles for a task. - */ -public class TaskStateHandles implements Serializable { - - public static final TaskStateHandles EMPTY = new TaskStateHandles(); - - private static final long serialVersionUID = 267686583583579359L; - - /** - * State handle with the (non-partitionable) legacy operator state - * - * @deprecated Non-repartitionable operator state that has been deprecated. - * Can be removed when we remove the APIs for non-repartitionable operator state. - */ - @Deprecated - private final ChainedStateHandle<StreamStateHandle> legacyOperatorState; - - /** Collection of handles which represent the managed keyed state of the head operator */ - private final Collection<KeyedStateHandle> managedKeyedState; - - /** Collection of handles which represent the raw/streamed keyed state of the head operator */ - private final Collection<KeyedStateHandle> rawKeyedState; - - /** Outer list represents the operator chain, each collection holds handles for managed state of a single operator */ - private final List<Collection<OperatorStateHandle>> managedOperatorState; - - /** Outer list represents the operator chain, each collection holds handles for raw/streamed state of a single operator */ - private final List<Collection<OperatorStateHandle>> rawOperatorState; - - public TaskStateHandles() { - this(null, null, null, null, null); - } - - public TaskStateHandles(SubtaskState checkpointStateHandles) { - this(checkpointStateHandles.getLegacyOperatorState(), - transform(checkpointStateHandles.getManagedOperatorState()), - transform(checkpointStateHandles.getRawOperatorState()), - transform(checkpointStateHandles.getManagedKeyedState()), - transform(checkpointStateHandles.getRawKeyedState())); - } - - public TaskStateHandles( - ChainedStateHandle<StreamStateHandle> legacyOperatorState, - List<Collection<OperatorStateHandle>> managedOperatorState, - List<Collection<OperatorStateHandle>> rawOperatorState, - Collection<KeyedStateHandle> managedKeyedState, - Collection<KeyedStateHandle> rawKeyedState) { - - this.legacyOperatorState = legacyOperatorState; - this.managedKeyedState = managedKeyedState; - this.rawKeyedState = rawKeyedState; - this.managedOperatorState = managedOperatorState; - this.rawOperatorState = rawOperatorState; - } - - /** - * @deprecated Non-repartitionable operator state that has been deprecated. - * Can be removed when we remove the APIs for non-repartitionable operator state. - */ - @Deprecated - public ChainedStateHandle<StreamStateHandle> getLegacyOperatorState() { - return legacyOperatorState; - } - - public Collection<KeyedStateHandle> getManagedKeyedState() { - return managedKeyedState; - } - - public Collection<KeyedStateHandle> getRawKeyedState() { - return rawKeyedState; - } - - public List<Collection<OperatorStateHandle>> getRawOperatorState() { - return rawOperatorState; - } - - public List<Collection<OperatorStateHandle>> getManagedOperatorState() { - return managedOperatorState; - } - - private static List<Collection<OperatorStateHandle>> transform(ChainedStateHandle<OperatorStateHandle> in) { - if (null == in) { - return Collections.emptyList(); - } - List<Collection<OperatorStateHandle>> out = new ArrayList<>(in.getLength()); - for (int i = 0; i < in.getLength(); ++i) { - OperatorStateHandle osh = in.get(i); - out.add(osh != null ? Collections.singletonList(osh) : null); - } - return out; - } - - private static <T> List<T> transform(T in) { - return in == null ? Collections.<T>emptyList() : Collections.singletonList(in); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - TaskStateHandles that = (TaskStateHandles) o; - - if (legacyOperatorState != null ? - !legacyOperatorState.equals(that.legacyOperatorState) - : that.legacyOperatorState != null) { - return false; - } - if (managedKeyedState != null ? - !managedKeyedState.equals(that.managedKeyedState) - : that.managedKeyedState != null) { - return false; - } - if (rawKeyedState != null ? - !rawKeyedState.equals(that.rawKeyedState) - : that.rawKeyedState != null) { - return false; - } - - if (rawOperatorState != null ? - !rawOperatorState.equals(that.rawOperatorState) - : that.rawOperatorState != null) { - return false; - } - return managedOperatorState != null ? - managedOperatorState.equals(that.managedOperatorState) - : that.managedOperatorState == null; - } - - @Override - public int hashCode() { - int result = legacyOperatorState != null ? legacyOperatorState.hashCode() : 0; - result = 31 * result + (managedKeyedState != null ? managedKeyedState.hashCode() : 0); - result = 31 * result + (rawKeyedState != null ? rawKeyedState.hashCode() : 0); - result = 31 * result + (managedOperatorState != null ? managedOperatorState.hashCode() : 0); - result = 31 * result + (rawOperatorState != null ? rawOperatorState.hashCode() : 0); - return result; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java index bf60161..aba8bda 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java @@ -21,7 +21,7 @@ package org.apache.flink.runtime.taskexecutor.rpc; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; -import org.apache.flink.runtime.checkpoint.SubtaskState; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.taskmanager.CheckpointResponder; import org.apache.flink.util.Preconditions; @@ -40,7 +40,7 @@ public class RpcCheckpointResponder implements CheckpointResponder { ExecutionAttemptID executionAttemptID, long checkpointId, CheckpointMetrics checkpointMetrics, - SubtaskState subtaskState) { + TaskStateSnapshot subtaskState) { checkpointCoordinatorGateway.acknowledgeCheckpoint( jobID, http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java index ad0df71..e9f600d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java @@ -20,7 +20,7 @@ package org.apache.flink.runtime.taskmanager; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; -import org.apache.flink.runtime.checkpoint.SubtaskState; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; @@ -44,7 +44,7 @@ public class ActorGatewayCheckpointResponder implements CheckpointResponder { ExecutionAttemptID executionAttemptID, long checkpointId, CheckpointMetrics checkpointMetrics, - SubtaskState checkpointStateHandles) { + TaskStateSnapshot checkpointStateHandles) { AcknowledgeCheckpoint message = new AcknowledgeCheckpoint( jobID, executionAttemptID, checkpointId, checkpointMetrics, http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java index cc66a3f..b3584a6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java @@ -20,7 +20,7 @@ package org.apache.flink.runtime.taskmanager; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; -import org.apache.flink.runtime.checkpoint.SubtaskState; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; /** @@ -47,7 +47,7 @@ public interface CheckpointResponder { ExecutionAttemptID executionAttemptID, long checkpointId, CheckpointMetrics checkpointMetrics, - SubtaskState subtaskState); + TaskStateSnapshot subtaskState); /** * Declines the given checkpoint. http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java index 788a590..92b5886 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java @@ -26,7 +26,7 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; -import org.apache.flink.runtime.checkpoint.SubtaskState; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.io.disk.iomanager.IOManager; @@ -245,7 +245,7 @@ public class RuntimeEnvironment implements Environment { public void acknowledgeCheckpoint( long checkpointId, CheckpointMetrics checkpointMetrics, - SubtaskState checkpointStateHandles) { + TaskStateSnapshot checkpointStateHandles) { checkpointResponder.acknowledgeCheckpoint( jobId, executionId, checkpointId, checkpointMetrics, http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 596d365..04cb990 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -34,6 +34,7 @@ import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotCheckpointingException; import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException; import org.apache.flink.runtime.clusterframework.types.AllocationID; @@ -67,16 +68,17 @@ import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.query.TaskKvStateRegistry; -import org.apache.flink.runtime.state.TaskStateHandles; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.WrappingRuntimeException; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; import javax.annotation.Nullable; + import java.io.IOException; import java.net.URL; import java.util.Collection; @@ -250,7 +252,7 @@ public class Task implements Runnable, TaskActions { * The handles to the states that the task was initialized with. Will be set * to null after the initialization, to be memory friendly. */ - private volatile TaskStateHandles taskStateHandles; + private volatile TaskStateSnapshot taskStateHandles; /** Initialized from the Flink configuration. May also be set at the ExecutionConfig */ private long taskCancellationInterval; @@ -272,7 +274,7 @@ public class Task implements Runnable, TaskActions { Collection<ResultPartitionDeploymentDescriptor> resultPartitionDeploymentDescriptors, Collection<InputGateDeploymentDescriptor> inputGateDeploymentDescriptors, int targetSlotNumber, - TaskStateHandles taskStateHandles, + TaskStateSnapshot taskStateHandles, MemoryManager memManager, IOManager ioManager, NetworkEnvironment networkEnvironment,