[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15584893#comment-15584893 ]
ASF GitHub Bot commented on FLINK-4844: --------------------------------------- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83807199 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -804,199 +792,15 @@ public boolean restoreLatestCheckpointedState( LOG.info("Restoring from latest valid checkpoint: {}.", latest); - for (Map.Entry<JobVertexID, TaskState> taskGroupStateEntry: latest.getTaskStates().entrySet()) { - TaskState taskState = taskGroupStateEntry.getValue(); - ExecutionJobVertex executionJobVertex = tasks.get(taskGroupStateEntry.getKey()); - - if (executionJobVertex != null) { - // check that the number of key groups have not changed - if (taskState.getMaxParallelism() != executionJobVertex.getMaxParallelism()) { - throw new IllegalStateException("The maximum parallelism (" + - taskState.getMaxParallelism() + ") with which the latest " + - "checkpoint of the execution job vertex " + executionJobVertex + - " has been taken and the current maximum parallelism (" + - executionJobVertex.getMaxParallelism() + ") changed. This " + - "is currently not supported."); - } - - - int oldParallelism = taskState.getParallelism(); - int newParallelism = executionJobVertex.getParallelism(); - boolean parallelismChanged = oldParallelism != newParallelism; - boolean hasNonPartitionedState = taskState.hasNonPartitionedState(); - - if (hasNonPartitionedState && parallelismChanged) { - throw new IllegalStateException("Cannot restore the latest checkpoint because " + - "the operator " + executionJobVertex.getJobVertexId() + " has non-partitioned " + - "state and its parallelism changed. The operator" + executionJobVertex.getJobVertexId() + - " has parallelism " + newParallelism + " whereas the corresponding" + - "state object has a parallelism of " + oldParallelism); - } - - List<KeyGroupRange> keyGroupPartitions = createKeyGroupPartitions( - executionJobVertex.getMaxParallelism(), - newParallelism); - - // operator chain index -> list of the stored partitionables states from all parallel instances - @SuppressWarnings("unchecked") - List<OperatorStateHandle>[] chainParallelStates = - new List[taskState.getChainLength()]; - - for (int i = 0; i < oldParallelism; ++i) { - - ChainedStateHandle<OperatorStateHandle> partitionableState = - taskState.getPartitionableState(i); - - if (partitionableState != null) { - for (int j = 0; j < partitionableState.getLength(); ++j) { - OperatorStateHandle opParalleState = partitionableState.get(j); - if (opParalleState != null) { - List<OperatorStateHandle> opParallelStates = - chainParallelStates[j]; - if (opParallelStates == null) { - opParallelStates = new ArrayList<>(); - chainParallelStates[j] = opParallelStates; - } - opParallelStates.add(opParalleState); - } - } - } - } - - // operator chain index -> lists with collected states (one collection for each parallel subtasks) - @SuppressWarnings("unchecked") - List<Collection<OperatorStateHandle>>[] redistributedParallelStates = - new List[taskState.getChainLength()]; - - //TODO here we can employ different redistribution strategies for state, e.g. union state. For now we only offer round robin as the default. - OperatorStateRepartitioner repartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE; - - for (int i = 0; i < chainParallelStates.length; ++i) { - List<OperatorStateHandle> chainOpParallelStates = chainParallelStates[i]; - if (chainOpParallelStates != null) { - //We only redistribute if the parallelism of the operator changed from previous executions - if (parallelismChanged) { - redistributedParallelStates[i] = repartitioner.repartitionState( - chainOpParallelStates, - newParallelism); - } else { - List<Collection<OperatorStateHandle>> repacking = new ArrayList<>(newParallelism); - for (OperatorStateHandle operatorStateHandle : chainOpParallelStates) { - repacking.add(Collections.singletonList(operatorStateHandle)); - } - redistributedParallelStates[i] = repacking; - } - } - } - - int counter = 0; - - for (int i = 0; i < newParallelism; ++i) { - - // non-partitioned state - ChainedStateHandle<StreamStateHandle> state = null; - - if (hasNonPartitionedState) { - SubtaskState subtaskState = taskState.getState(i); - - if (subtaskState != null) { - // count the number of executions for which we set a state - ++counter; - state = subtaskState.getChainedStateHandle(); - } - } - - // partitionable state - @SuppressWarnings("unchecked") - Collection<OperatorStateHandle>[] ia = new Collection[taskState.getChainLength()]; - List<Collection<OperatorStateHandle>> subTaskPartitionableState = Arrays.asList(ia); - - for (int j = 0; j < redistributedParallelStates.length; ++j) { - List<Collection<OperatorStateHandle>> redistributedParallelState = - redistributedParallelStates[j]; - - if (redistributedParallelState != null) { - subTaskPartitionableState.set(j, redistributedParallelState.get(i)); - } - } + StateAssignmentOperation stateAssignmentOperation = --- End diff -- I was not sure whether this class will have state or not. I might still want to break assignStates() down into smaller methods, which might lead me to introducing some state to make. Last but not least, having things more fine grained and in a class allows better unit testing and mocking, while mocking static methods is impossible. This is one of the places where we might want something like this in the future. Not sure about the allOrNothingState, but that could be a separate change. > Partitionable Raw Keyed/Operator State > -------------------------------------- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature > Reporter: Stefan Richter > Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)