Repository: tez Updated Branches: refs/heads/master 2c22e23a7 -> d2b9222fb
TEZ-3302. Add a version of processorContext.waitForAllInputsReady and waitForAnyInputReady with a timeout. Contributed by Tsuyoshi Ozawa. Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d2b9222f Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d2b9222f Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d2b9222f Branch: refs/heads/master Commit: d2b9222fb589ce85124b1e381ba199621e91b263 Parents: 2c22e23 Author: Siddharth Seth <ss...@apache.org> Authored: Mon Jun 27 12:57:47 2016 -0700 Committer: Siddharth Seth <ss...@apache.org> Committed: Mon Jun 27 12:57:47 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../tez/runtime/api/ProcessorContext.java | 43 +++++++++++++++++++- .../apache/tez/runtime/InputReadyTracker.java | 36 ++++++++++++++-- .../api/impl/TezProcessorContextImpl.java | 14 ++++++- .../tez/runtime/TestInputReadyTracker.java | 16 +++++--- 5 files changed, 98 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/d2b9222f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 640d957..26ff72c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3302. Add a version of processorContext.waitForAllInputsReady and waitForAnyInputReady with a timeout. TEZ-3291. Optimize splits grouping when locality information is not available. TEZ-3305. TestAnalyzer fails on Hadoop 2.7. TEZ-3304. TestHistoryParser fails with Hadoop 2.7. @@ -66,6 +67,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3302. Add a version of processorContext.waitForAllInputsReady and waitForAnyInputReady with a timeout. TEZ-3305. TestAnalyzer fails on Hadoop 2.7. TEZ-3304. TestHistoryParser fails with Hadoop 2.7. TEZ-3296. Tez job can hang if two vertices at the same root distance have different task requirements http://git-wip-us.apache.org/repos/asf/tez/blob/d2b9222f/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorContext.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorContext.java index 8b88289..acb2a57 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorContext.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorContext.java @@ -63,7 +63,30 @@ public interface ProcessorContext extends TaskContext { * @throws InterruptedException */ public Input waitForAnyInputReady(Collection<Input> inputs) throws InterruptedException; - + + /** + * Blocking call which returns when any of the specified Inputs is ready for + * consumption. + * + * There can be multiple parallel invocations of this function - where each + * invocation blocks on the Inputs that it specifies. + * + * If multiple Inputs are ready, any one of them may be returned by this + * method - including an Input which may have been returned in a previous + * call. If invoking this method multiple times, it's recommended to remove + * previously completed Inputs from the invocation list. + * + * @param inputs + * the list of Inputs to monitor + * @param timeoutMillis + * timeout to return in milliseconds. If this value is negative, + * this function will wait forever until all inputs get ready + * or interrupted. + * @return the Input which is ready for consumption. return null when timeout occurs. + * @throws InterruptedException + */ + public Input waitForAnyInputReady(Collection<Input> inputs, long timeoutMillis) throws InterruptedException; + /** * Blocking call which returns only after all of the specified Inputs are * ready for consumption. @@ -76,4 +99,22 @@ public interface ProcessorContext extends TaskContext { * @throws InterruptedException */ public void waitForAllInputsReady(Collection<Input> inputs) throws InterruptedException; + + /** + * Blocking call which returns only after all of the specified Inputs are + * ready for consumption with timeout. + * + * There can be multiple parallel invocations of this function - where each + * invocation blocks on the Inputs that it specifies. + * + * @param inputs + * the list of Inputs to monitor + * @param timeoutMillis + * timeout to return in milliseconds. If this value is negative, + * this function will wait forever until all inputs get ready + * or interrupted. + * @return Return true if all inputs are ready. Otherwise, return false. + * @throws InterruptedException + */ + public boolean waitForAllInputsReady(Collection<Input> inputs, long timeoutMillis) throws InterruptedException; } http://git-wip-us.apache.org/repos/asf/tez/blob/d2b9222f/tez-runtime-internals/src/main/java/org/apache/tez/runtime/InputReadyTracker.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/InputReadyTracker.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/InputReadyTracker.java index 93035ba..ba4fe1d 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/InputReadyTracker.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/InputReadyTracker.java @@ -25,6 +25,8 @@ import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; @@ -77,17 +79,36 @@ public class InputReadyTracker { } public Input waitForAnyInputReady(Collection<Input> inputs) throws InterruptedException { + return waitForAnyInputReady(inputs, -1); + } + + public Input waitForAnyInputReady(Collection<Input> inputs, long timeoutMillis) throws InterruptedException { Preconditions.checkArgument(inputs != null && inputs.size() > 0, "At least one input should be specified"); InputReadyMonitor inputReadyMonitor = new InputReadyMonitor(inputs, true); - return inputReadyMonitor.awaitCondition(); + try { + return inputReadyMonitor.awaitCondition(timeoutMillis); + } catch (TimeoutException e) { + return null; + } } public void waitForAllInputsReady(Collection<Input> inputs) throws InterruptedException { + waitForAllInputsReady(inputs, -1); + } + + public boolean waitForAllInputsReady(Collection<Input> inputs, long timeoutMillis) throws InterruptedException { Preconditions.checkArgument(inputs != null && inputs.size() > 0, "At least one input should be specified"); + boolean succeeded = true; InputReadyMonitor inputReadyMonitor = new InputReadyMonitor(inputs, false); - inputReadyMonitor.awaitCondition(); + + try { + inputReadyMonitor.awaitCondition(timeoutMillis); + } catch (TimeoutException e) { + succeeded = false; + } + return succeeded; } private class InputReadyMonitor { @@ -101,7 +122,7 @@ public class InputReadyTracker { this.selectOne = anyOne; } - public Input awaitCondition() throws InterruptedException { + public Input awaitCondition(long timeoutMillis) throws InterruptedException, TimeoutException { lock.lock(); try { while (pendingInputs.size() > 0) { @@ -117,7 +138,14 @@ public class InputReadyTracker { } } if (pendingInputs.size() > 0) { - condition.await(); + if (timeoutMillis >= 0) { + boolean succeeded = condition.await(timeoutMillis, TimeUnit.MILLISECONDS); + if (!succeeded) { + throw new TimeoutException("pending Inputs timeout"); + } + } else { // timeout < 0 + condition.await(); + } } } } finally { http://git-wip-us.apache.org/repos/asf/tez/blob/d2b9222f/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java index 607bbf1..d7c2d3e 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java @@ -120,12 +120,22 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements Proce @Override public Input waitForAnyInputReady(Collection<Input> inputs) throws InterruptedException { - return inputReadyTracker.waitForAnyInputReady(inputs); + return waitForAnyInputReady(inputs, -1); + } + + @Override + public Input waitForAnyInputReady(Collection<Input> inputs, long timeoutMillis) throws InterruptedException { + return inputReadyTracker.waitForAnyInputReady(inputs, timeoutMillis); } @Override public void waitForAllInputsReady(Collection<Input> inputs) throws InterruptedException { - inputReadyTracker.waitForAllInputsReady(inputs); + waitForAllInputsReady(inputs, -1); + } + + @Override + public boolean waitForAllInputsReady(Collection<Input> inputs, long timeoutMillis) throws InterruptedException { + return inputReadyTracker.waitForAllInputsReady(inputs, timeoutMillis); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/d2b9222f/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java index 29c5023..1846354 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java @@ -19,6 +19,7 @@ package org.apache.tez.runtime; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertEquals; @@ -43,9 +44,9 @@ import com.google.common.collect.Sets; public class TestInputReadyTracker { - private static final long SLEEP_TIME = 500l; + private static final long SLEEP_TIME = 2000l; - @Test(timeout = 5000) + @Test(timeout = 20000) public void testWithoutGrouping1() throws InterruptedException { InputReadyTracker inputReadyTracker = new InputReadyTracker(); @@ -66,7 +67,8 @@ public class TestInputReadyTracker { startTime = System.nanoTime(); setDelayedInputReady(input2); - inputReadyTracker.waitForAllInputsReady(requestList); + assertFalse(inputReadyTracker.waitForAllInputsReady(requestList, 0)); + assertTrue(inputReadyTracker.waitForAllInputsReady(requestList, -1)); readyTime = System.nanoTime(); // Should have moved into ready state - only happens when the setReady function is invoked. // Ensure the method returned only after the specific Input was told it is ready @@ -75,7 +77,7 @@ public class TestInputReadyTracker { assertTrue(input1.isReady); } - @Test(timeout = 5000) + @Test(timeout = 20000) public void testWithoutGrouping2() throws InterruptedException { InputReadyTracker inputReadyTracker = new InputReadyTracker(); @@ -124,7 +126,9 @@ public class TestInputReadyTracker { requestList.add(input3); startTime = System.nanoTime(); setDelayedInputReady(input3); - readyInput = inputReadyTracker.waitForAnyInputReady(requestList); + readyInput = inputReadyTracker.waitForAnyInputReady(requestList, 0); + assertNull(readyInput); + readyInput = inputReadyTracker.waitForAnyInputReady(requestList, -1); assertEquals(input3, readyInput); readyTime = System.nanoTime(); // Should have moved into ready state - only happens when the setReady function is invoked. @@ -135,7 +139,7 @@ public class TestInputReadyTracker { assertTrue(input2.isReady); } - @Test(timeout = 5000) + @Test(timeout = 20000) public void testGrouped() throws InterruptedException { InputReadyTracker inputReadyTracker = new InputReadyTracker();