[hotfix][runtime-tests] Immediatelly fail test when one of the futures fails


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/433e05c7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/433e05c7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/433e05c7

Branch: refs/heads/master
Commit: 433e05c7a9cf3599e241384ad4a86ee8a4cc4325
Parents: 10d11d7
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
Authored: Tue Jan 23 15:34:57 2018 +0100
Committer: Piotr Nowojski <piotr.nowoj...@gmail.com>
Committed: Mon Feb 19 12:21:21 2018 +0100

----------------------------------------------------------------------
 .../java/org/apache/flink/util/FutureUtil.java  | 37 ++++++++++++++++++++
 .../partition/PipelinedSubpartitionTest.java    |  7 ++--
 .../consumer/LocalInputChannelTest.java         |  6 ++--
 3 files changed, 41 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/433e05c7/flink-core/src/main/java/org/apache/flink/util/FutureUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/FutureUtil.java 
b/flink-core/src/main/java/org/apache/flink/util/FutureUtil.java
index b6bac88..8d196a5 100644
--- a/flink-core/src/main/java/org/apache/flink/util/FutureUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/FutureUtil.java
@@ -20,8 +20,15 @@ package org.apache.flink.util;
 
 import org.apache.flink.annotation.Internal;
 
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeoutException;
 
 /**
  * Simple utility class to work with Java's Futures.
@@ -45,4 +52,34 @@ public class FutureUtil {
 
                return future.get();
        }
+
+       public static void waitForAll(long timeoutMillis, Future<?>...futures) 
throws Exception {
+               waitForAll(timeoutMillis, Arrays.asList(futures));
+       }
+
+       public static void waitForAll(long timeoutMillis, Collection<Future<?>> 
futures) throws Exception {
+               long startMillis = System.currentTimeMillis();
+               Set<Future<?>> futuresSet = new HashSet<>();
+               futuresSet.addAll(futures);
+
+               while (System.currentTimeMillis() < startMillis + 
timeoutMillis) {
+                       if (futuresSet.isEmpty()) {
+                               return;
+                       }
+                       Iterator<Future<?>> futureIterator = 
futuresSet.iterator();
+                       while (futureIterator.hasNext()) {
+                               Future<?> future = futureIterator.next();
+                               if (future.isDone()) {
+                                       future.get();
+                                       futureIterator.remove();
+                               }
+                       }
+
+                       Thread.sleep(10);
+               }
+
+               if (!futuresSet.isEmpty()) {
+                       throw new TimeoutException(String.format("Some of the 
futures have not finished [%s]", futuresSet));
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/433e05c7/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index 7e369fa..5a70350 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -40,6 +40,7 @@ import java.util.concurrent.Future;
 
 import static 
org.apache.flink.runtime.io.network.util.TestBufferFactory.BUFFER_SIZE;
 import static 
org.apache.flink.runtime.io.network.util.TestBufferFactory.createBuffer;
+import static org.apache.flink.util.FutureUtil.waitForAll;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -308,15 +309,11 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
 
                Future<Boolean> producerResult = executorService.submit(
                        new TestSubpartitionProducer(subpartition, 
isSlowProducer, producerSource));
-
                Future<Boolean> consumerResult = 
executorService.submit(consumer);
 
-               // Wait for producer and consumer to finish
-               producerResult.get();
-               consumerResult.get();
+               waitForAll(60_000L, producerResult, consumerResult);
        }
 
-
        /**
         * Tests cleanup of {@link PipelinedSubpartition#release()} with no 
read view attached.
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/433e05c7/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 2f68418..7fc6d51 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -59,6 +59,7 @@ import java.util.concurrent.Future;
 
 import scala.Tuple2;
 
+import static org.apache.flink.util.FutureUtil.waitForAll;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
@@ -170,10 +171,7 @@ public class LocalInputChannelTest {
                                                partitionIds)));
                        }
 
-                       // Wait for all to finish
-                       for (Future<?> result : results) {
-                               result.get();
-                       }
+                       waitForAll(60_000L, results);
                }
                finally {
                        networkBuffers.destroyAllBufferPools();

Reply via email to