[hotfix] [tests] Increase robustness of Fast Time Window Operator Tests
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/877c267b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/877c267b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/877c267b Branch: refs/heads/master Commit: 877c267b8f7d7f63b07598e1536c7b42567c8a8b Parents: ed96cb5 Author: Stephan Ewen <se...@apache.org> Authored: Mon Oct 10 11:19:43 2016 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Mon Oct 10 12:17:04 2016 +0200 ---------------------------------------------------------------------- ...ulatingAlignedProcessingTimeWindowOperatorTest.java | 8 ++++++++ ...egatingAlignedProcessingTimeWindowOperatorTest.java | 13 +++++++++++++ 2 files changed, 21 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/877c267b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java index 2f687f6..c82392a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java @@ -264,6 +264,8 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { // get and verify the result out.waitForNElements(numElements, 60_000); + timerService.quiesceAndAwaitPending(); + synchronized (lock) { op.close(); } @@ -322,6 +324,8 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { Thread.sleep(1); } + timerService.quiesceAndAwaitPending(); + synchronized (lock) { op.close(); } @@ -407,6 +411,8 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { Collections.sort(result); assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6), result); + timerService.quiesceAndAwaitPending(); + synchronized (lock) { op.close(); } @@ -463,6 +469,8 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { Collections.sort(result); assertEquals(Arrays.asList(1, 1, 1, 2, 2, 2), result); + timerService.quiesceAndAwaitPending(); + synchronized (lock) { op.close(); } http://git-wip-us.apache.org/repos/asf/flink/blob/877c267b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java index cd82a9c..12a842f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java @@ -277,6 +277,8 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { List<Tuple2<Integer, Integer>> result = out.getElements(); assertEquals(numElements, result.size()); + timerService.quiesceAndAwaitPending(); + synchronized (lock) { op.close(); } @@ -352,6 +354,8 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { List<Tuple2<Integer, Integer>> result = out.getElements(); + timerService.quiesceAndAwaitPending(); + synchronized (lock) { op.close(); } @@ -414,6 +418,8 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { Thread.sleep(1); } + timerService.quiesceAndAwaitPending(); + synchronized (lock) { op.close(); } @@ -508,6 +514,8 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { new Tuple2<>(2, 2) ), result); + timerService.quiesceAndAwaitPending(); + synchronized (lock) { op.close(); } @@ -571,6 +579,11 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { assertTrue(e.getMessage().contains("Artificial Test Exception")); } + timerService.quiesceAndAwaitPending(); + synchronized (lock) { + op.close(); + } + shutdownTimerServiceAndWait(timerService); op.dispose();