[ https://issues.apache.org/jira/browse/FLINK-4731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15552276#comment-15552276 ]
ASF GitHub Bot commented on FLINK-4731: --------------------------------------- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2584#discussion_r82213262 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java --- @@ -663,28 +826,42 @@ public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws public void restoreState(Integer state) throws Exception { counter = state; } + } + + private static class PartitionedStateSource extends StateSourceBase implements ListCheckpointed<Integer> { + + private static final long serialVersionUID = -359715965103593462L; + private static final int NUM_PARTITIONS = 7; + + private static int[] CHECK_CORRECT_SNAPSHOT; + private static int[] CHECK_CORRECT_RESTORE; @Override - public void run(SourceContext<Integer> ctx) throws Exception { - final Object lock = ctx.getCheckpointLock(); + public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception { - while (running) { - synchronized (lock) { - counter++; + CHECK_CORRECT_SNAPSHOT[getRuntimeContext().getIndexOfThisSubtask()] = counter; - ctx.collect(counter * getRuntimeContext().getIndexOfThisSubtask()); - } + int div = counter / NUM_PARTITIONS; + int mod = counter % NUM_PARTITIONS; - Thread.sleep(2); - if(counter == 10) { - workStartedLatch.countDown(); + List<Integer> split = new ArrayList<>(); + for (int i = 0; i < NUM_PARTITIONS; ++i) { + int partitionValue = div; + if (mod > 0) { + --mod; + ++partitionValue; } + split.add(partitionValue); } + return split; } @Override - public void cancel() { - running = false; + public void restoreState(List<Integer> state) throws Exception { + for(Integer v : state) { --- End diff -- Missing whitespace after `for` > HeapKeyedStateBackend restoring broken for scale-in > --------------------------------------------------- > > Key: FLINK-4731 > URL: https://issues.apache.org/jira/browse/FLINK-4731 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing > Reporter: Stefan Richter > Assignee: Stefan Richter > > Restoring the HeapKeyedStateBackend is broken in case that parallelism is > reduced. The restore method is overwriting previously restored state. > We should also add scale-in testing to the RescalingITCase. -- This message was sent by Atlassian JIRA (v6.3.4#6332)