[hotfix][runtime-tests] Immediatelly fail test when one of the futures fails
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/433e05c7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/433e05c7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/433e05c7 Branch: refs/heads/master Commit: 433e05c7a9cf3599e241384ad4a86ee8a4cc4325 Parents: 10d11d7 Author: Piotr Nowojski <piotr.nowoj...@gmail.com> Authored: Tue Jan 23 15:34:57 2018 +0100 Committer: Piotr Nowojski <piotr.nowoj...@gmail.com> Committed: Mon Feb 19 12:21:21 2018 +0100 ---------------------------------------------------------------------- .../java/org/apache/flink/util/FutureUtil.java | 37 ++++++++++++++++++++ .../partition/PipelinedSubpartitionTest.java | 7 ++-- .../consumer/LocalInputChannelTest.java | 6 ++-- 3 files changed, 41 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/433e05c7/flink-core/src/main/java/org/apache/flink/util/FutureUtil.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/FutureUtil.java b/flink-core/src/main/java/org/apache/flink/util/FutureUtil.java index b6bac88..8d196a5 100644 --- a/flink-core/src/main/java/org/apache/flink/util/FutureUtil.java +++ b/flink-core/src/main/java/org/apache/flink/util/FutureUtil.java @@ -20,8 +20,15 @@ package org.apache.flink.util; import org.apache.flink.annotation.Internal; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.RunnableFuture; +import java.util.concurrent.TimeoutException; /** * Simple utility class to work with Java's Futures. @@ -45,4 +52,34 @@ public class FutureUtil { return future.get(); } + + public static void waitForAll(long timeoutMillis, Future<?>...futures) throws Exception { + waitForAll(timeoutMillis, Arrays.asList(futures)); + } + + public static void waitForAll(long timeoutMillis, Collection<Future<?>> futures) throws Exception { + long startMillis = System.currentTimeMillis(); + Set<Future<?>> futuresSet = new HashSet<>(); + futuresSet.addAll(futures); + + while (System.currentTimeMillis() < startMillis + timeoutMillis) { + if (futuresSet.isEmpty()) { + return; + } + Iterator<Future<?>> futureIterator = futuresSet.iterator(); + while (futureIterator.hasNext()) { + Future<?> future = futureIterator.next(); + if (future.isDone()) { + future.get(); + futureIterator.remove(); + } + } + + Thread.sleep(10); + } + + if (!futuresSet.isEmpty()) { + throw new TimeoutException(String.format("Some of the futures have not finished [%s]", futuresSet)); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/433e05c7/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java index 7e369fa..5a70350 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java @@ -40,6 +40,7 @@ import java.util.concurrent.Future; import static org.apache.flink.runtime.io.network.util.TestBufferFactory.BUFFER_SIZE; import static org.apache.flink.runtime.io.network.util.TestBufferFactory.createBuffer; +import static org.apache.flink.util.FutureUtil.waitForAll; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -308,15 +309,11 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { Future<Boolean> producerResult = executorService.submit( new TestSubpartitionProducer(subpartition, isSlowProducer, producerSource)); - Future<Boolean> consumerResult = executorService.submit(consumer); - // Wait for producer and consumer to finish - producerResult.get(); - consumerResult.get(); + waitForAll(60_000L, producerResult, consumerResult); } - /** * Tests cleanup of {@link PipelinedSubpartition#release()} with no read view attached. */ http://git-wip-us.apache.org/repos/asf/flink/blob/433e05c7/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java index 2f68418..7fc6d51 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java @@ -59,6 +59,7 @@ import java.util.concurrent.Future; import scala.Tuple2; +import static org.apache.flink.util.FutureUtil.waitForAll; import static org.apache.flink.util.Preconditions.checkArgument; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; @@ -170,10 +171,7 @@ public class LocalInputChannelTest { partitionIds))); } - // Wait for all to finish - for (Future<?> result : results) { - result.get(); - } + waitForAll(60_000L, results); } finally { networkBuffers.destroyAllBufferPools();