[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582781#comment-15582781 ]
ASF GitHub Bot commented on FLINK-4844: --------------------------------------- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83669355 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -804,199 +792,15 @@ public boolean restoreLatestCheckpointedState( LOG.info("Restoring from latest valid checkpoint: {}.", latest); - for (Map.Entry<JobVertexID, TaskState> taskGroupStateEntry: latest.getTaskStates().entrySet()) { - TaskState taskState = taskGroupStateEntry.getValue(); - ExecutionJobVertex executionJobVertex = tasks.get(taskGroupStateEntry.getKey()); - - if (executionJobVertex != null) { - // check that the number of key groups have not changed - if (taskState.getMaxParallelism() != executionJobVertex.getMaxParallelism()) { - throw new IllegalStateException("The maximum parallelism (" + - taskState.getMaxParallelism() + ") with which the latest " + - "checkpoint of the execution job vertex " + executionJobVertex + - " has been taken and the current maximum parallelism (" + - executionJobVertex.getMaxParallelism() + ") changed. This " + - "is currently not supported."); - } - - - int oldParallelism = taskState.getParallelism(); - int newParallelism = executionJobVertex.getParallelism(); - boolean parallelismChanged = oldParallelism != newParallelism; - boolean hasNonPartitionedState = taskState.hasNonPartitionedState(); - - if (hasNonPartitionedState && parallelismChanged) { - throw new IllegalStateException("Cannot restore the latest checkpoint because " + - "the operator " + executionJobVertex.getJobVertexId() + " has non-partitioned " + - "state and its parallelism changed. The operator" + executionJobVertex.getJobVertexId() + - " has parallelism " + newParallelism + " whereas the corresponding" + - "state object has a parallelism of " + oldParallelism); - } - - List<KeyGroupRange> keyGroupPartitions = createKeyGroupPartitions( - executionJobVertex.getMaxParallelism(), - newParallelism); - - // operator chain index -> list of the stored partitionables states from all parallel instances - @SuppressWarnings("unchecked") - List<OperatorStateHandle>[] chainParallelStates = - new List[taskState.getChainLength()]; - - for (int i = 0; i < oldParallelism; ++i) { - - ChainedStateHandle<OperatorStateHandle> partitionableState = - taskState.getPartitionableState(i); - - if (partitionableState != null) { - for (int j = 0; j < partitionableState.getLength(); ++j) { - OperatorStateHandle opParalleState = partitionableState.get(j); - if (opParalleState != null) { - List<OperatorStateHandle> opParallelStates = - chainParallelStates[j]; - if (opParallelStates == null) { - opParallelStates = new ArrayList<>(); - chainParallelStates[j] = opParallelStates; - } - opParallelStates.add(opParalleState); - } - } - } - } - - // operator chain index -> lists with collected states (one collection for each parallel subtasks) - @SuppressWarnings("unchecked") - List<Collection<OperatorStateHandle>>[] redistributedParallelStates = - new List[taskState.getChainLength()]; - - //TODO here we can employ different redistribution strategies for state, e.g. union state. For now we only offer round robin as the default. - OperatorStateRepartitioner repartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE; - - for (int i = 0; i < chainParallelStates.length; ++i) { - List<OperatorStateHandle> chainOpParallelStates = chainParallelStates[i]; - if (chainOpParallelStates != null) { - //We only redistribute if the parallelism of the operator changed from previous executions - if (parallelismChanged) { - redistributedParallelStates[i] = repartitioner.repartitionState( - chainOpParallelStates, - newParallelism); - } else { - List<Collection<OperatorStateHandle>> repacking = new ArrayList<>(newParallelism); - for (OperatorStateHandle operatorStateHandle : chainOpParallelStates) { - repacking.add(Collections.singletonList(operatorStateHandle)); - } - redistributedParallelStates[i] = repacking; - } - } - } - - int counter = 0; - - for (int i = 0; i < newParallelism; ++i) { - - // non-partitioned state - ChainedStateHandle<StreamStateHandle> state = null; - - if (hasNonPartitionedState) { - SubtaskState subtaskState = taskState.getState(i); - - if (subtaskState != null) { - // count the number of executions for which we set a state - ++counter; - state = subtaskState.getChainedStateHandle(); - } - } - - // partitionable state - @SuppressWarnings("unchecked") - Collection<OperatorStateHandle>[] ia = new Collection[taskState.getChainLength()]; - List<Collection<OperatorStateHandle>> subTaskPartitionableState = Arrays.asList(ia); - - for (int j = 0; j < redistributedParallelStates.length; ++j) { - List<Collection<OperatorStateHandle>> redistributedParallelState = - redistributedParallelStates[j]; - - if (redistributedParallelState != null) { - subTaskPartitionableState.set(j, redistributedParallelState.get(i)); - } - } + StateAssignmentOperation stateAssignmentOperation = --- End diff -- Is this a typical pattern? Why not just something like ``` StateAssignmentUtil.assignStates(tasks, latest, allOrNothingState) ``` Also, I'm not sure `allOrNothingState` is useful anymore. It's not invoked with `true` except in tests. > Partitionable Raw Keyed/Operator State > -------------------------------------- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature > Reporter: Stefan Richter > Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)