[FLINK-2648] [tests] Fix flaky CombineTaskTest and improve cancelling in 
GroupReduceCombineDriver


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/361947d6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/361947d6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/361947d6

Branch: refs/heads/master
Commit: 361947d6c5490f0c6d04ffec709a353995aad373
Parents: 4b6eae5
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Sep 9 18:59:43 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Sep 9 19:10:51 2015 +0200

----------------------------------------------------------------------
 .../runtime/operators/GroupReduceCombineDriver.java      |  4 +++-
 .../apache/flink/runtime/operators/CombineTaskTest.java  | 11 +++++++++--
 2 files changed, 12 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/361947d6/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
index 028ed95..7115a4d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
@@ -184,7 +184,9 @@ public class GroupReduceCombineDriver<IN, OUT> implements 
PactDriver<GroupCombin
                }
 
                // sort, combine, and send the final batch
-               sortAndCombine();
+               if (running) {
+                       sortAndCombine();
+               }
        }
 
        private void sortAndCombine() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/361947d6/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
index 932e746..b6ce2d4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
@@ -53,10 +53,12 @@ public class CombineTaskTest
        
        private final ArrayList<Tuple2<Integer, Integer>> outList = new 
ArrayList<Tuple2<Integer, Integer>>();
 
+       @SuppressWarnings("unchecked")
        private final TypeSerializer<Tuple2<Integer, Integer>> serializer = new 
TupleSerializer<Tuple2<Integer, Integer>>(
                        (Class<Tuple2<Integer, Integer>>) (Class<?>) 
Tuple2.class,
                        new TypeSerializer<?>[] { IntSerializer.INSTANCE, 
IntSerializer.INSTANCE });
        
+       
        private final TypeComparator<Tuple2<Integer, Integer>> comparator = new 
TupleComparator<Tuple2<Integer, Integer>>(
                        new int[]{0},
                        new TypeComparator<?>[] { new IntComparator(true) },
@@ -179,9 +181,14 @@ public class CombineTaskTest
                        testTask.cancel();
                        
                        // make sure it reacts to the canceling in some time
-                       taskRunner.join(5000);
+                       long deadline = System.currentTimeMillis() + 10000;
+                       do {
+                               taskRunner.interrupt();
+                               taskRunner.join(5000);
+                       }
+                       while (taskRunner.isAlive() && 
System.currentTimeMillis() < deadline);
                        
-                       assertFalse("Task did not cancel properly within in 5 
seconds.", taskRunner.isAlive());
+                       assertFalse("Task did not cancel properly within in 10 
seconds.", taskRunner.isAlive());
                }
                catch (Exception e) {
                        e.printStackTrace();

Reply via email to