Author: aching
Date: Tue Aug 21 06:33:59 2012
New Revision: 1375393

URL: http://svn.apache.org/viewvc?rev=1375393&view=rev
Log:
GIRAPH-246: Periodic worker calls to context.progress() will prevent
timeout on some Hadoop clusters during barrier waits. (Eli Reisman via
aching)

Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
    giraph/trunk/src/main/java/org/apache/giraph/zk/PredicateLock.java
    giraph/trunk/src/test/java/org/apache/giraph/TestPredicateLock.java

Modified: giraph/trunk/CHANGELOG
URL: 
http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1375393&r1=1375392&r2=1375393&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Tue Aug 21 06:33:59 2012
@@ -2,6 +2,10 @@ Giraph Change Log
 
 Release 0.2.0 - unreleased
 
+  GIRAPH-246: Periodic worker calls to context.progress() will prevent
+  timeout on some Hadoop clusters during barrier waits. (Eli Reisman
+  via aching)
+
   GIRAPH-295: Additional Example Algorithm to compute Outdegree and
   Indegree. (Sean Choi via aching)
 

Modified: 
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1375393&r1=1375392&r2=1375393&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java 
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java 
Tue Aug 21 06:33:59 2012
@@ -211,9 +211,11 @@ public class BspServiceWorker<I extends 
       localitySorter.getPrioritizedLocalInputSplits();
     String reservedInputSplitPath = null;
     Stat reservedStat = null;
+    final Mapper<?, ?, ?, ?>.Context context = getContext();
     while (true) {
       int finishedInputSplits = 0;
       for (int i = 0; i < inputSplitPathList.size(); ++i) {
+        context.progress();
         String tmpInputSplitFinishedPath =
             inputSplitPathList.get(i) + INPUT_SPLIT_FINISHED_NODE;
         reservedStat =
@@ -273,6 +275,7 @@ public class BspServiceWorker<I extends 
       }
       // Wait for either a reservation to go away or a notification that
       // an InputSplit has finished.
+      context.progress();
       getInputSplitsStateChangedEvent().waitMsecs(60 * 1000);
       getInputSplitsStateChangedEvent().reset();
     }
@@ -309,6 +312,7 @@ public class BspServiceWorker<I extends 
     for (Entry<PartitionOwner, Partition<I, V, E, M>> entry :
       inputSplitCache.entrySet()) {
       if (!entry.getValue().getVertices().isEmpty()) {
+        getContext().progress();
         commService.sendPartitionRequest(entry.getKey().getWorkerInfo(),
             entry.getValue());
       }
@@ -474,8 +478,8 @@ public class BspServiceWorker<I extends 
       ++totalVerticesLoaded;
       totalEdgesLoaded += readerVertex.getNumEdges();
 
-      // Update status every half a million vertices
-      if ((totalVerticesLoaded % 500000) == 0) {
+      // Update status every 250k vertices
+      if ((totalVerticesLoaded % 250000) == 0) {
         String status = "readVerticesFromInputSplit: Loaded " +
             totalVerticesLoaded + " vertices and " +
             totalEdgesLoaded + " edges " +

Modified: giraph/trunk/src/main/java/org/apache/giraph/zk/PredicateLock.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/zk/PredicateLock.java?rev=1375393&r1=1375392&r2=1375393&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/zk/PredicateLock.java 
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/zk/PredicateLock.java Tue Aug 
21 06:33:59 2012
@@ -76,47 +76,37 @@ public class PredicateLock implements Bs
 
   @Override
   public boolean waitMsecs(int msecs) {
-    if (msecs < -1) {
-      throw new RuntimeException("msecs < -1");
+    if (msecs < 0) {
+      throw new RuntimeException("msecs cannot be negative!");
     }
-
     long maxMsecs = System.currentTimeMillis() + msecs;
-    long curMsecTimeout = 0;
+    int curMsecTimeout = 0;
     lock.lock();
     try {
       while (!eventOccurred) {
-        if (msecs == -1) {
-          try {
-            cond.await();
-          } catch (InterruptedException e) {
-            throw new IllegalStateException(
-                "waitMsecs: Caught interrupted " +
-                    "exception on cond.await()", e);
-          }
-        } else {
-          // Keep the wait non-negative
-          curMsecTimeout =
-              Math.max(maxMsecs - System.currentTimeMillis(), 0);
+        curMsecTimeout =
+            Math.min(msecs, MSEC_PERIOD);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("waitMsecs: Wait for " + curMsecTimeout);
+        }
+        try {
+          boolean signaled =
+              cond.await(curMsecTimeout, TimeUnit.MILLISECONDS);
           if (LOG.isDebugEnabled()) {
-            LOG.debug("waitMsecs: Wait for " + curMsecTimeout);
-          }
-          try {
-            boolean signaled =
-                cond.await(curMsecTimeout, TimeUnit.MILLISECONDS);
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("waitMsecs: Got timed signaled of " +
-                  signaled);
-            }
-          } catch (InterruptedException e) {
-            throw new IllegalStateException(
-                "waitMsecs: Caught interrupted " +
-                    "exception on cond.await() " +
-                    curMsecTimeout, e);
-          }
-          if (System.currentTimeMillis() > maxMsecs) {
-            return false;
+            LOG.debug("waitMsecs: Got timed signaled of " +
+              signaled);
           }
+        } catch (InterruptedException e) {
+          throw new IllegalStateException(
+            "waitMsecs: Caught interrupted " +
+            "exception on cond.await() " +
+            curMsecTimeout, e);
+        }
+        if (System.currentTimeMillis() > maxMsecs) {
+          return false;
         }
+        msecs = Math.max(0, msecs - curMsecTimeout);
+        progressable.progress(); // go around again
       }
     } finally {
       lock.unlock();

Modified: giraph/trunk/src/test/java/org/apache/giraph/TestPredicateLock.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/TestPredicateLock.java?rev=1375393&r1=1375392&r2=1375393&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/TestPredicateLock.java 
(original)
+++ giraph/trunk/src/test/java/org/apache/giraph/TestPredicateLock.java Tue Aug 
21 06:33:59 2012
@@ -74,13 +74,29 @@ public class TestPredicateLock {
     public void testEvent() {
         BspEvent event = new PredicateLock(getStubProgressable());
         event.signal();
-        boolean gotPredicate = event.waitMsecs(-1);
-        assertTrue(gotPredicate );
+        boolean gotPredicate = event.waitMsecs(50);
+        assertTrue(gotPredicate);
         event.reset();
         gotPredicate = event.waitMsecs(0);
         assertFalse(gotPredicate);
     }
 
+  /**
+   * Simple test for {@link PredicateLock#waitForever()}
+   */
+  @Test
+  public void testWaitForever() {
+    BspEvent event = new PredicateLock(getStubProgressable());
+    Thread signalThread = new SignalThread(event);
+    signalThread.start();
+    event.waitForever();
+    try {
+      signalThread.join();
+    } catch (InterruptedException e) {
+    }
+    assertTrue(event.waitMsecs(0));
+  }
+
     /**
      * Make sure the the event is signaled correctly
      * @throws InterruptedException


Reply via email to