Repository: tez
Updated Branches:
  refs/heads/master 2c22e23a7 -> d2b9222fb


TEZ-3302. Add a version of processorContext.waitForAllInputsReady and
waitForAnyInputReady with a timeout. Contributed by Tsuyoshi Ozawa.


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d2b9222f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d2b9222f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d2b9222f

Branch: refs/heads/master
Commit: d2b9222fb589ce85124b1e381ba199621e91b263
Parents: 2c22e23
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Jun 27 12:57:47 2016 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Jun 27 12:57:47 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../tez/runtime/api/ProcessorContext.java       | 43 +++++++++++++++++++-
 .../apache/tez/runtime/InputReadyTracker.java   | 36 ++++++++++++++--
 .../api/impl/TezProcessorContextImpl.java       | 14 ++++++-
 .../tez/runtime/TestInputReadyTracker.java      | 16 +++++---
 5 files changed, 98 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/d2b9222f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 640d957..26ff72c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3302. Add a version of processorContext.waitForAllInputsReady and 
waitForAnyInputReady with a timeout.
   TEZ-3291. Optimize splits grouping when locality information is not 
available.
   TEZ-3305. TestAnalyzer fails on Hadoop 2.7.
   TEZ-3304. TestHistoryParser fails with Hadoop 2.7.
@@ -66,6 +67,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3302. Add a version of processorContext.waitForAllInputsReady and 
waitForAnyInputReady with a timeout.
   TEZ-3305. TestAnalyzer fails on Hadoop 2.7.
   TEZ-3304. TestHistoryParser fails with Hadoop 2.7.
   TEZ-3296. Tez job can hang if two vertices at the same root distance have 
different task requirements

http://git-wip-us.apache.org/repos/asf/tez/blob/d2b9222f/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorContext.java
----------------------------------------------------------------------
diff --git 
a/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorContext.java 
b/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorContext.java
index 8b88289..acb2a57 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorContext.java
@@ -63,7 +63,30 @@ public interface ProcessorContext extends TaskContext {
    * @throws InterruptedException
    */
   public Input waitForAnyInputReady(Collection<Input> inputs) throws 
InterruptedException;
-  
+
+  /**
+   * Blocking call which returns when any of the specified Inputs is ready for
+   * consumption.
+   *
+   * There can be multiple parallel invocations of this function - where each
+   * invocation blocks on the Inputs that it specifies.
+   *
+   * If multiple Inputs are ready, any one of them may be returned by this
+   * method - including an Input which may have been returned in a previous
+   * call. If invoking this method multiple times, it's recommended to remove
+   * previously completed Inputs from the invocation list.
+   *
+   * @param inputs
+   *          the list of Inputs to monitor
+   * @param timeoutMillis
+   *          timeout to return in milliseconds. If this value is negative,
+   *          this function will wait forever until all inputs get ready
+   *          or interrupted.
+   * @return the Input which is ready for consumption. return null when 
timeout occurs.
+   * @throws InterruptedException
+   */
+  public Input waitForAnyInputReady(Collection<Input> inputs, long 
timeoutMillis) throws InterruptedException;
+
   /**
    * Blocking call which returns only after all of the specified Inputs are
    * ready for consumption.
@@ -76,4 +99,22 @@ public interface ProcessorContext extends TaskContext {
    * @throws InterruptedException
    */
   public void waitForAllInputsReady(Collection<Input> inputs) throws 
InterruptedException;
+
+  /**
+   * Blocking call which returns only after all of the specified Inputs are
+   * ready for consumption with timeout.
+   *
+   * There can be multiple parallel invocations of this function - where each
+   * invocation blocks on the Inputs that it specifies.
+   *
+   * @param inputs
+   *          the list of Inputs to monitor
+   * @param timeoutMillis
+   *          timeout to return in milliseconds. If this value is negative,
+   *          this function will wait forever until all inputs get ready
+   *          or interrupted.
+   * @return Return true if all inputs are ready. Otherwise, return false.
+   * @throws InterruptedException
+   */
+  public boolean waitForAllInputsReady(Collection<Input> inputs, long 
timeoutMillis) throws InterruptedException;
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/d2b9222f/tez-runtime-internals/src/main/java/org/apache/tez/runtime/InputReadyTracker.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/InputReadyTracker.java
 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/InputReadyTracker.java
index 93035ba..ba4fe1d 100644
--- 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/InputReadyTracker.java
+++ 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/InputReadyTracker.java
@@ -25,6 +25,8 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -77,17 +79,36 @@ public class InputReadyTracker {
   }
 
   public Input waitForAnyInputReady(Collection<Input> inputs) throws 
InterruptedException {
+    return waitForAnyInputReady(inputs, -1);
+  }
+
+  public Input waitForAnyInputReady(Collection<Input> inputs, long 
timeoutMillis) throws InterruptedException {
     Preconditions.checkArgument(inputs != null && inputs.size() > 0,
         "At least one input should be specified");
     InputReadyMonitor inputReadyMonitor = new InputReadyMonitor(inputs, true);
-    return inputReadyMonitor.awaitCondition();
+    try {
+      return inputReadyMonitor.awaitCondition(timeoutMillis);
+    } catch (TimeoutException e) {
+      return null;
+    }
   }
 
   public void waitForAllInputsReady(Collection<Input> inputs) throws 
InterruptedException {
+    waitForAllInputsReady(inputs, -1);
+  }
+
+  public boolean waitForAllInputsReady(Collection<Input> inputs, long 
timeoutMillis) throws InterruptedException {
     Preconditions.checkArgument(inputs != null && inputs.size() > 0,
         "At least one input should be specified");
+    boolean succeeded = true;
     InputReadyMonitor inputReadyMonitor = new InputReadyMonitor(inputs, false);
-    inputReadyMonitor.awaitCondition();
+
+    try {
+      inputReadyMonitor.awaitCondition(timeoutMillis);
+    } catch (TimeoutException e) {
+      succeeded = false;
+    }
+    return succeeded;
   }
 
   private class InputReadyMonitor {
@@ -101,7 +122,7 @@ public class InputReadyTracker {
       this.selectOne = anyOne;
     }
 
-    public Input awaitCondition() throws InterruptedException {
+    public Input awaitCondition(long timeoutMillis) throws 
InterruptedException, TimeoutException {
       lock.lock();
       try {
         while (pendingInputs.size() > 0) {
@@ -117,7 +138,14 @@ public class InputReadyTracker {
             }
           }
           if (pendingInputs.size() > 0) {
-            condition.await();
+            if (timeoutMillis >= 0) {
+              boolean succeeded = condition.await(timeoutMillis, 
TimeUnit.MILLISECONDS);
+              if (!succeeded) {
+                throw new TimeoutException("pending Inputs timeout");
+              }
+            } else { // timeout < 0
+              condition.await();
+            }
           }
         }
       } finally {

http://git-wip-us.apache.org/repos/asf/tez/blob/d2b9222f/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
index 607bbf1..d7c2d3e 100644
--- 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
+++ 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
@@ -120,12 +120,22 @@ public class TezProcessorContextImpl extends 
TezTaskContextImpl implements Proce
 
   @Override
   public Input waitForAnyInputReady(Collection<Input> inputs) throws 
InterruptedException {
-    return inputReadyTracker.waitForAnyInputReady(inputs);
+    return waitForAnyInputReady(inputs, -1);
+  }
+
+  @Override
+  public Input waitForAnyInputReady(Collection<Input> inputs, long 
timeoutMillis) throws InterruptedException {
+    return inputReadyTracker.waitForAnyInputReady(inputs, timeoutMillis);
   }
 
   @Override
   public void waitForAllInputsReady(Collection<Input> inputs) throws 
InterruptedException {
-    inputReadyTracker.waitForAllInputsReady(inputs);
+    waitForAllInputsReady(inputs, -1);
+  }
+
+  @Override
+  public boolean waitForAllInputsReady(Collection<Input> inputs, long 
timeoutMillis) throws InterruptedException {
+    return inputReadyTracker.waitForAllInputsReady(inputs, timeoutMillis);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/d2b9222f/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java
 
b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java
index 29c5023..1846354 100644
--- 
a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java
+++ 
b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java
@@ -19,6 +19,7 @@
 package org.apache.tez.runtime;
 
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertEquals;
 
@@ -43,9 +44,9 @@ import com.google.common.collect.Sets;
 
 public class TestInputReadyTracker {
 
-  private static final long SLEEP_TIME = 500l;
+  private static final long SLEEP_TIME = 2000l;
   
-  @Test(timeout = 5000)
+  @Test(timeout = 20000)
   public void testWithoutGrouping1() throws InterruptedException {
     InputReadyTracker inputReadyTracker = new InputReadyTracker();
 
@@ -66,7 +67,8 @@ public class TestInputReadyTracker {
     
     startTime = System.nanoTime();
     setDelayedInputReady(input2);
-    inputReadyTracker.waitForAllInputsReady(requestList);
+    assertFalse(inputReadyTracker.waitForAllInputsReady(requestList, 0));
+    assertTrue(inputReadyTracker.waitForAllInputsReady(requestList, -1));
     readyTime = System.nanoTime();
     // Should have moved into ready state - only happens when the setReady 
function is invoked.
     // Ensure the method returned only after the specific Input was told it is 
ready
@@ -75,7 +77,7 @@ public class TestInputReadyTracker {
     assertTrue(input1.isReady);
   }
 
-  @Test(timeout = 5000)
+  @Test(timeout = 20000)
   public void testWithoutGrouping2() throws InterruptedException {
     InputReadyTracker inputReadyTracker = new InputReadyTracker();
 
@@ -124,7 +126,9 @@ public class TestInputReadyTracker {
     requestList.add(input3);
     startTime = System.nanoTime();
     setDelayedInputReady(input3);
-    readyInput = inputReadyTracker.waitForAnyInputReady(requestList);
+    readyInput = inputReadyTracker.waitForAnyInputReady(requestList, 0);
+    assertNull(readyInput);
+    readyInput = inputReadyTracker.waitForAnyInputReady(requestList, -1);
     assertEquals(input3, readyInput);
     readyTime = System.nanoTime();
     // Should have moved into ready state - only happens when the setReady 
function is invoked.
@@ -135,7 +139,7 @@ public class TestInputReadyTracker {
     assertTrue(input2.isReady);
   }
 
-  @Test(timeout = 5000)
+  @Test(timeout = 20000)
   public void testGrouped() throws InterruptedException {
     InputReadyTracker inputReadyTracker = new InputReadyTracker();
 

Reply via email to