Repository: flink Updated Branches: refs/heads/master b198c33a7 -> 42a4df2d4
[hotfix] Fix shaky EventTimeAllWindowCheckpointITCase In very rare cases it could happen that a checkpoint would be performed after the ValidatingSink signaled that it had seen all expected elements. If this happened the job would be restarted with the already complete state and we would never finish since no more elements would arrive. This adds a check in open() of ValidatingSink that signals success if we already have the final state. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/42a4df2d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/42a4df2d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/42a4df2d Branch: refs/heads/master Commit: 42a4df2d475910a9092a5f7251d4897d00b4fba9 Parents: b198c33 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Wed Oct 21 15:06:42 2015 +0200 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Thu Oct 22 10:56:41 2015 +0200 ---------------------------------------------------------------------- .../EventTimeAllWindowCheckpointingITCase.java | 25 ++++++++++++++------ .../EventTimeWindowCheckpointingITCase.java | 5 ---- 2 files changed, 18 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/42a4df2d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java index 2733349..84022f0 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java @@ -432,17 +432,12 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger { synchronized (ctx.getCheckpointLock()) { int next = numElementsEmitted++; for (long i = 0; i < numKeys; i++) { - ctx.collectWithTimestamp(new Tuple2<Long, IntType>(i, new IntType(next)), next); + ctx.collectWithTimestamp(new Tuple2<>(i, new IntType(next)), next); } ctx.emitWatermark(new Watermark(next)); } } else { - // exit at some point so that we don't deadlock - if (numElementsEmitted > numElementsToEmit * 5) { -// running = false; - System.err.println("Succ Checkpoints: " + numSuccessfulCheckpoints + " numElemEmitted: " + numElementsEmitted + "num elements to emit: " + numElementsToEmit); - } // if our work is done, delay a bit to prevent busy waiting Thread.sleep(1); } @@ -491,6 +486,22 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger { public void open(Configuration parameters) throws Exception { // this sink can only work with DOP 1 assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks()); + + // it can happen that a checkpoint happens when the complete success state is + // already set. In that case we restart with the final state and would never + // finish because no more elements arrive. + if (windowCounts.size() == numKeys) { + boolean seenAll = true; + for (Integer windowCount: windowCounts.values()) { + if (windowCount != numWindowsExpected) { + seenAll = false; + break; + } + } + if (seenAll) { + throw new SuccessException(); + } + } } @Override @@ -498,7 +509,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger { boolean seenAll = true; if (windowCounts.size() == numKeys) { for (Integer windowCount: windowCounts.values()) { - if (windowCount < numWindowsExpected) { + if (windowCount != numWindowsExpected) { seenAll = false; break; } http://git-wip-us.apache.org/repos/asf/flink/blob/42a4df2d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java index 4d1d2c3..6cf04f5 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java @@ -439,11 +439,6 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { } } else { - // exit at some point so that we don't deadlock - if (numElementsEmitted > numElementsToEmit * 5) { -// running = false; - System.err.println("Succ Checkpoints: " + numSuccessfulCheckpoints + " numElemEmitted: " + numElementsEmitted + "num elements to emit: " + numElementsToEmit); - } // if our work is done, delay a bit to prevent busy waiting Thread.sleep(1);