[hotfix] [tests] Increase robustness of Fast Time Window Operator Tests

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/877c267b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/877c267b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/877c267b

Branch: refs/heads/master
Commit: 877c267b8f7d7f63b07598e1536c7b42567c8a8b
Parents: ed96cb5
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Oct 10 11:19:43 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Oct 10 12:17:04 2016 +0200

----------------------------------------------------------------------
 ...ulatingAlignedProcessingTimeWindowOperatorTest.java |  8 ++++++++
 ...egatingAlignedProcessingTimeWindowOperatorTest.java | 13 +++++++++++++
 2 files changed, 21 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/877c267b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index 2f687f6..c82392a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -264,6 +264,8 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                        // get and verify the result
                        out.waitForNElements(numElements, 60_000);
 
+                       timerService.quiesceAndAwaitPending();
+
                        synchronized (lock) {
                                op.close();
                        }
@@ -322,6 +324,8 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                                Thread.sleep(1);
                        }
 
+                       timerService.quiesceAndAwaitPending();
+
                        synchronized (lock) {
                                op.close();
                        }
@@ -407,6 +411,8 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                        Collections.sort(result);
                        assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6), result);
 
+                       timerService.quiesceAndAwaitPending();
+
                        synchronized (lock) {
                                op.close();
                        }
@@ -463,6 +469,8 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                        Collections.sort(result);
                        assertEquals(Arrays.asList(1, 1, 1, 2, 2, 2), result);
 
+                       timerService.quiesceAndAwaitPending();
+
                        synchronized (lock) {
                                op.close();
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/877c267b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index cd82a9c..12a842f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -277,6 +277,8 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                        List<Tuple2<Integer, Integer>> result = 
out.getElements();
                        assertEquals(numElements, result.size());
 
+                       timerService.quiesceAndAwaitPending();
+
                        synchronized (lock) {
                                op.close();
                        }
@@ -352,6 +354,8 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
 
                        List<Tuple2<Integer, Integer>> result = 
out.getElements();
 
+                       timerService.quiesceAndAwaitPending();
+
                        synchronized (lock) {
                                op.close();
                        }
@@ -414,6 +418,8 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                                Thread.sleep(1);
                        }
 
+                       timerService.quiesceAndAwaitPending();
+
                        synchronized (lock) {
                                op.close();
                        }
@@ -508,6 +514,8 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                                        new Tuple2<>(2, 2)
                        ), result);
 
+                       timerService.quiesceAndAwaitPending();
+
                        synchronized (lock) {
                                op.close();
                        }
@@ -571,6 +579,11 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                                assertTrue(e.getMessage().contains("Artificial 
Test Exception"));
                        }
 
+                       timerService.quiesceAndAwaitPending();
+                       synchronized (lock) {
+                               op.close();
+                       }
+
                        shutdownTimerServiceAndWait(timerService);
                        op.dispose();
 

Reply via email to