[ 
https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582781#comment-15582781
 ] 

ASF GitHub Bot commented on FLINK-4844:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2648#discussion_r83669355
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
    @@ -804,199 +792,15 @@ public boolean restoreLatestCheckpointedState(
     
                        LOG.info("Restoring from latest valid checkpoint: {}.", 
latest);
     
    -                   for (Map.Entry<JobVertexID, TaskState> 
taskGroupStateEntry: latest.getTaskStates().entrySet()) {
    -                           TaskState taskState = 
taskGroupStateEntry.getValue();
    -                           ExecutionJobVertex executionJobVertex = 
tasks.get(taskGroupStateEntry.getKey());
    -
    -                           if (executionJobVertex != null) {
    -                                   // check that the number of key groups 
have not changed
    -                                   if (taskState.getMaxParallelism() != 
executionJobVertex.getMaxParallelism()) {
    -                                           throw new 
IllegalStateException("The maximum parallelism (" +
    -                                                   
taskState.getMaxParallelism() + ") with which the latest " +
    -                                                   "checkpoint of the 
execution job vertex " + executionJobVertex +
    -                                                   " has been taken and 
the current maximum parallelism (" +
    -                                                   
executionJobVertex.getMaxParallelism() + ") changed. This " +
    -                                                   "is currently not 
supported.");
    -                                   }
    -
    -
    -                                   int oldParallelism = 
taskState.getParallelism();
    -                                   int newParallelism = 
executionJobVertex.getParallelism();
    -                                   boolean parallelismChanged = 
oldParallelism != newParallelism;
    -                                   boolean hasNonPartitionedState = 
taskState.hasNonPartitionedState();
    -
    -                                   if (hasNonPartitionedState && 
parallelismChanged) {
    -                                           throw new 
IllegalStateException("Cannot restore the latest checkpoint because " +
    -                                                   "the operator " + 
executionJobVertex.getJobVertexId() + " has non-partitioned " +
    -                                                   "state and its 
parallelism changed. The operator" + executionJobVertex.getJobVertexId() +
    -                                                   " has parallelism " + 
newParallelism + " whereas the corresponding" +
    -                                                   "state object has a 
parallelism of " + oldParallelism);
    -                                   }
    -
    -                                   List<KeyGroupRange> keyGroupPartitions 
= createKeyGroupPartitions(
    -                                                   
executionJobVertex.getMaxParallelism(),
    -                                                   newParallelism);
    -                                   
    -                                   // operator chain index -> list of the 
stored partitionables states from all parallel instances
    -                                   @SuppressWarnings("unchecked")
    -                                   List<OperatorStateHandle>[] 
chainParallelStates =
    -                                                   new 
List[taskState.getChainLength()];
    -
    -                                   for (int i = 0; i < oldParallelism; 
++i) {
    -
    -                                           
ChainedStateHandle<OperatorStateHandle> partitionableState =
    -                                                           
taskState.getPartitionableState(i);
    -
    -                                           if (partitionableState != null) 
{
    -                                                   for (int j = 0; j < 
partitionableState.getLength(); ++j) {
    -                                                           
OperatorStateHandle opParalleState = partitionableState.get(j);
    -                                                           if 
(opParalleState != null) {
    -                                                                   
List<OperatorStateHandle> opParallelStates =
    -                                                                           
        chainParallelStates[j];
    -                                                                   if 
(opParallelStates == null) {
    -                                                                           
opParallelStates = new ArrayList<>();
    -                                                                           
chainParallelStates[j] = opParallelStates;
    -                                                                   }
    -                                                                   
opParallelStates.add(opParalleState);
    -                                                           }
    -                                                   }
    -                                           }
    -                                   }
    -
    -                                   // operator chain index -> lists with 
collected states (one collection for each parallel subtasks)
    -                                   @SuppressWarnings("unchecked")
    -                                   List<Collection<OperatorStateHandle>>[] 
redistributedParallelStates =
    -                                                   new 
List[taskState.getChainLength()];
    -
    -                                   //TODO here we can employ different 
redistribution strategies for state, e.g. union state. For now we only offer 
round robin as the default.
    -                                   OperatorStateRepartitioner 
repartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE;
    -
    -                                   for (int i = 0; i < 
chainParallelStates.length; ++i) {
    -                                           List<OperatorStateHandle> 
chainOpParallelStates = chainParallelStates[i];
    -                                           if (chainOpParallelStates != 
null) {
    -                                                   //We only redistribute 
if the parallelism of the operator changed from previous executions
    -                                                   if (parallelismChanged) 
{
    -                                                           
redistributedParallelStates[i] = repartitioner.repartitionState(
    -                                                                           
chainOpParallelStates,
    -                                                                           
newParallelism);
    -                                                   } else {
    -                                                           
List<Collection<OperatorStateHandle>> repacking = new 
ArrayList<>(newParallelism);
    -                                                           for 
(OperatorStateHandle operatorStateHandle : chainOpParallelStates) {
    -                                                                   
repacking.add(Collections.singletonList(operatorStateHandle));
    -                                                           }
    -                                                           
redistributedParallelStates[i] = repacking;
    -                                                   }
    -                                           }
    -                                   }
    -
    -                                   int counter = 0;
    -
    -                                   for (int i = 0; i < newParallelism; 
++i) {
    -
    -                                           // non-partitioned state
    -                                           
ChainedStateHandle<StreamStateHandle> state = null;
    -
    -                                           if (hasNonPartitionedState) {
    -                                                   SubtaskState 
subtaskState = taskState.getState(i);
    -
    -                                                   if (subtaskState != 
null) {
    -                                                           // count the 
number of executions for which we set a state
    -                                                           ++counter;
    -                                                           state = 
subtaskState.getChainedStateHandle();
    -                                                   }
    -                                           }
    -
    -                                           // partitionable state
    -                                           @SuppressWarnings("unchecked")
    -                                           
Collection<OperatorStateHandle>[] ia = new 
Collection[taskState.getChainLength()];
    -                                           
List<Collection<OperatorStateHandle>> subTaskPartitionableState = 
Arrays.asList(ia);
    -
    -                                           for (int j = 0; j < 
redistributedParallelStates.length; ++j) {
    -                                                   
List<Collection<OperatorStateHandle>> redistributedParallelState =
    -                                                                   
redistributedParallelStates[j];
    -
    -                                                   if 
(redistributedParallelState != null) {
    -                                                           
subTaskPartitionableState.set(j, redistributedParallelState.get(i));
    -                                                   }
    -                                           }
    +                   StateAssignmentOperation stateAssignmentOperation =
    --- End diff --
    
    Is this a typical pattern? Why not just something like
    ```
    StateAssignmentUtil.assignStates(tasks, latest, allOrNothingState)
    ```
    
    Also, I'm not sure `allOrNothingState` is useful anymore. It's not invoked 
with `true` except in tests.


> Partitionable Raw Keyed/Operator State
> --------------------------------------
>
>                 Key: FLINK-4844
>                 URL: https://issues.apache.org/jira/browse/FLINK-4844
>             Project: Flink
>          Issue Type: New Feature
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>
> Partitionable operator and keyed state are currently only available by using 
> backends. However, the serialization code for many operators is build around 
> reading/writing their state to a stream for checkpointing. We want to provide 
> partitionable states also through streams, so that migrating existing 
> operators becomes more easy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to