[ 
https://issues.apache.org/jira/browse/FLINK-4731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15552276#comment-15552276
 ] 

ASF GitHub Bot commented on FLINK-4731:
---------------------------------------

Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2584#discussion_r82213262
  
    --- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
 ---
    @@ -663,28 +826,42 @@ public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) throws
                public void restoreState(Integer state) throws Exception {
                        counter = state;
                }
    +   }
    +
    +   private static class PartitionedStateSource extends StateSourceBase 
implements ListCheckpointed<Integer> {
    +
    +           private static final long serialVersionUID = 
-359715965103593462L;
    +           private static final int NUM_PARTITIONS = 7;
    +
    +           private static int[] CHECK_CORRECT_SNAPSHOT;
    +           private static int[] CHECK_CORRECT_RESTORE;
     
                @Override
    -           public void run(SourceContext<Integer> ctx) throws Exception {
    -                   final Object lock = ctx.getCheckpointLock();
    +           public List<Integer> snapshotState(long checkpointId, long 
timestamp) throws Exception {
     
    -                   while (running) {
    -                           synchronized (lock) {
    -                                   counter++;
    +                   
CHECK_CORRECT_SNAPSHOT[getRuntimeContext().getIndexOfThisSubtask()] = counter;
     
    -                                   ctx.collect(counter * 
getRuntimeContext().getIndexOfThisSubtask());
    -                           }
    +                   int div = counter / NUM_PARTITIONS;
    +                   int mod = counter % NUM_PARTITIONS;
     
    -                           Thread.sleep(2);
    -                           if(counter == 10) {
    -                                   workStartedLatch.countDown();
    +                   List<Integer> split = new ArrayList<>();
    +                   for (int i = 0; i < NUM_PARTITIONS; ++i) {
    +                           int partitionValue = div;
    +                           if (mod > 0) {
    +                                   --mod;
    +                                   ++partitionValue;
                                }
    +                           split.add(partitionValue);
                        }
    +                   return split;
                }
     
                @Override
    -           public void cancel() {
    -                   running = false;
    +           public void restoreState(List<Integer> state) throws Exception {
    +                   for(Integer v : state) {
    --- End diff --
    
    Missing whitespace after `for`


> HeapKeyedStateBackend restoring broken for scale-in
> ---------------------------------------------------
>
>                 Key: FLINK-4731
>                 URL: https://issues.apache.org/jira/browse/FLINK-4731
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>
> Restoring the HeapKeyedStateBackend is broken in case that parallelism is 
> reduced. The restore method is overwriting previously restored state.
> We should also add scale-in testing to the RescalingITCase.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to