Author: aching
Date: Tue Aug 21 06:33:59 2012
New Revision: 1375393
URL: http://svn.apache.org/viewvc?rev=1375393&view=rev
Log:
GIRAPH-246: Periodic worker calls to context.progress() will prevent
timeout on some Hadoop clusters during barrier waits. (Eli Reisman via
aching)
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
giraph/trunk/src/main/java/org/apache/giraph/zk/PredicateLock.java
giraph/trunk/src/test/java/org/apache/giraph/TestPredicateLock.java
Modified: giraph/trunk/CHANGELOG
URL:
http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1375393&r1=1375392&r2=1375393&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Tue Aug 21 06:33:59 2012
@@ -2,6 +2,10 @@ Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-246: Periodic worker calls to context.progress() will prevent
+ timeout on some Hadoop clusters during barrier waits. (Eli Reisman
+ via aching)
+
GIRAPH-295: Additional Example Algorithm to compute Outdegree and
Indegree. (Sean Choi via aching)
Modified:
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1375393&r1=1375392&r2=1375393&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
Tue Aug 21 06:33:59 2012
@@ -211,9 +211,11 @@ public class BspServiceWorker<I extends
localitySorter.getPrioritizedLocalInputSplits();
String reservedInputSplitPath = null;
Stat reservedStat = null;
+ final Mapper<?, ?, ?, ?>.Context context = getContext();
while (true) {
int finishedInputSplits = 0;
for (int i = 0; i < inputSplitPathList.size(); ++i) {
+ context.progress();
String tmpInputSplitFinishedPath =
inputSplitPathList.get(i) + INPUT_SPLIT_FINISHED_NODE;
reservedStat =
@@ -273,6 +275,7 @@ public class BspServiceWorker<I extends
}
// Wait for either a reservation to go away or a notification that
// an InputSplit has finished.
+ context.progress();
getInputSplitsStateChangedEvent().waitMsecs(60 * 1000);
getInputSplitsStateChangedEvent().reset();
}
@@ -309,6 +312,7 @@ public class BspServiceWorker<I extends
for (Entry<PartitionOwner, Partition<I, V, E, M>> entry :
inputSplitCache.entrySet()) {
if (!entry.getValue().getVertices().isEmpty()) {
+ getContext().progress();
commService.sendPartitionRequest(entry.getKey().getWorkerInfo(),
entry.getValue());
}
@@ -474,8 +478,8 @@ public class BspServiceWorker<I extends
++totalVerticesLoaded;
totalEdgesLoaded += readerVertex.getNumEdges();
- // Update status every half a million vertices
- if ((totalVerticesLoaded % 500000) == 0) {
+ // Update status every 250k vertices
+ if ((totalVerticesLoaded % 250000) == 0) {
String status = "readVerticesFromInputSplit: Loaded " +
totalVerticesLoaded + " vertices and " +
totalEdgesLoaded + " edges " +
Modified: giraph/trunk/src/main/java/org/apache/giraph/zk/PredicateLock.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/zk/PredicateLock.java?rev=1375393&r1=1375392&r2=1375393&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/zk/PredicateLock.java
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/zk/PredicateLock.java Tue Aug
21 06:33:59 2012
@@ -76,47 +76,37 @@ public class PredicateLock implements Bs
@Override
public boolean waitMsecs(int msecs) {
- if (msecs < -1) {
- throw new RuntimeException("msecs < -1");
+ if (msecs < 0) {
+ throw new RuntimeException("msecs cannot be negative!");
}
-
long maxMsecs = System.currentTimeMillis() + msecs;
- long curMsecTimeout = 0;
+ int curMsecTimeout = 0;
lock.lock();
try {
while (!eventOccurred) {
- if (msecs == -1) {
- try {
- cond.await();
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "waitMsecs: Caught interrupted " +
- "exception on cond.await()", e);
- }
- } else {
- // Keep the wait non-negative
- curMsecTimeout =
- Math.max(maxMsecs - System.currentTimeMillis(), 0);
+ curMsecTimeout =
+ Math.min(msecs, MSEC_PERIOD);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("waitMsecs: Wait for " + curMsecTimeout);
+ }
+ try {
+ boolean signaled =
+ cond.await(curMsecTimeout, TimeUnit.MILLISECONDS);
if (LOG.isDebugEnabled()) {
- LOG.debug("waitMsecs: Wait for " + curMsecTimeout);
- }
- try {
- boolean signaled =
- cond.await(curMsecTimeout, TimeUnit.MILLISECONDS);
- if (LOG.isDebugEnabled()) {
- LOG.debug("waitMsecs: Got timed signaled of " +
- signaled);
- }
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "waitMsecs: Caught interrupted " +
- "exception on cond.await() " +
- curMsecTimeout, e);
- }
- if (System.currentTimeMillis() > maxMsecs) {
- return false;
+ LOG.debug("waitMsecs: Got timed signaled of " +
+ signaled);
}
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "waitMsecs: Caught interrupted " +
+ "exception on cond.await() " +
+ curMsecTimeout, e);
+ }
+ if (System.currentTimeMillis() > maxMsecs) {
+ return false;
}
+ msecs = Math.max(0, msecs - curMsecTimeout);
+ progressable.progress(); // go around again
}
} finally {
lock.unlock();
Modified: giraph/trunk/src/test/java/org/apache/giraph/TestPredicateLock.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/TestPredicateLock.java?rev=1375393&r1=1375392&r2=1375393&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/TestPredicateLock.java
(original)
+++ giraph/trunk/src/test/java/org/apache/giraph/TestPredicateLock.java Tue Aug
21 06:33:59 2012
@@ -74,13 +74,29 @@ public class TestPredicateLock {
public void testEvent() {
BspEvent event = new PredicateLock(getStubProgressable());
event.signal();
- boolean gotPredicate = event.waitMsecs(-1);
- assertTrue(gotPredicate );
+ boolean gotPredicate = event.waitMsecs(50);
+ assertTrue(gotPredicate);
event.reset();
gotPredicate = event.waitMsecs(0);
assertFalse(gotPredicate);
}
+ /**
+ * Simple test for {@link PredicateLock#waitForever()}
+ */
+ @Test
+ public void testWaitForever() {
+ BspEvent event = new PredicateLock(getStubProgressable());
+ Thread signalThread = new SignalThread(event);
+ signalThread.start();
+ event.waitForever();
+ try {
+ signalThread.join();
+ } catch (InterruptedException e) {
+ }
+ assertTrue(event.waitMsecs(0));
+ }
+
/**
* Make sure the the event is signaled correctly
* @throws InterruptedException