Author: aching
Date: Fri Dec 7 21:23:39 2012
New Revision: 1418483
URL: http://svn.apache.org/viewvc?rev=1418483&view=rev
Log:
GIRAPH-446: Add a proper timeout for waiting for workers to join a
superstep. (aching)
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java
Modified: giraph/trunk/CHANGELOG
URL:
http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1418483&r1=1418482&r2=1418483&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Fri Dec 7 21:23:39 2012
@@ -1,6 +1,9 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-446: Add a proper timeout for waiting for workers to join a
+ superstep. (aching)
+
GIRAPH-443: Properly size netty buffers when encoding requests (majakabiljo)
GIRAPH-395: No need to make HashWorkerPartitioner thread-safe. (aching)
Modified:
giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java?rev=1418483&r1=1418482&r2=1418483&view=diff
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
(original)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
Fri Dec 7 21:23:39 2012
@@ -135,11 +135,6 @@ public class GiraphConfiguration extends
/** Default 100% response rate for workers */
public static final float MIN_PERCENT_RESPONDED_DEFAULT = 100.0f;
- /** Polling timeout to check on the number of responded tasks (int) */
- public static final String POLL_MSECS = "giraph.pollMsecs";
- /** Default poll msecs (30 seconds) */
- public static final int POLL_MSECS_DEFAULT = 30 * 1000;
-
/** Enable the Metrics system **/
public static final String METRICS_ENABLE = "giraph.metrics.enable";
@@ -303,12 +298,34 @@ public class GiraphConfiguration extends
/** Default max resolve address attempts */
public static final int MAX_RESOLVE_ADDRESS_ATTEMPTS_DEFAULT = 5;
- /** Msecs to wait between waiting for all requests to finish */
+ /** Milliseconds to wait between waiting for all requests to finish */
public static final String WAITING_REQUEST_MSECS =
"giraph.waitingRequestMsecs";
- /** Default msecs to wait between waiting for all requests to finish */
+ /**
+ * Default milliseconds to wait between waiting for all requests to finish
+ */
public static final int WAITING_REQUEST_MSECS_DEFAULT = 15000;
+ /** Millseconds to wait for an event before continuing */
+ public static final String EVENT_WAIT_MSECS = "giraph.eventWaitMsecs";
+ /**
+ * Default milliseconds to wait for an event before continuing (30 seconds)
+ */
+ public static final int EVENT_WAIT_MSECS_DEFAULT = 30 * 1000;
+
+ /**
+ * Maximum milliseconds to wait before giving up trying to get the minimum
+ * number of workers before a superstep (int).
+ */
+ public static final String MAX_MASTER_SUPERSTEP_WAIT_MSECS =
+ "giraph.maxMasterSuperstepWaitMsecs";
+ /**
+ * Default maximum milliseconds to wait before giving up trying to get
+ * the minimum number of workers before a superstep (10 minutes).
+ */
+ public static final int MAX_MASTER_SUPERSTEP_WAIT_MSECS_DEFAULT =
+ 10 * 60 * 1000;
+
/** Milliseconds for a request to complete (or else resend) */
public static final String MAX_REQUEST_MILLISECONDS =
"giraph.maxRequestMilliseconds";
@@ -413,11 +430,6 @@ public class GiraphConfiguration extends
/** Default number of threads for input splits loading */
public static final int NUM_INPUT_SPLITS_THREADS_DEFAULT = 1;
- /** Number of poll attempts prior to failing the job (int) */
- public static final String POLL_ATTEMPTS = "giraph.pollAttempts";
- /** Default poll attempts */
- public static final int POLL_ATTEMPTS_DEFAULT = 10;
-
/** Number of minimum vertices in each vertex range */
public static final String MIN_VERTICES_PER_RANGE =
"giraph.minVerticesPerRange";
@@ -1170,4 +1182,47 @@ public class GiraphConfiguration extends
public int getMaxTaskAttempts() {
return getInt(MAX_TASK_ATTEMPTS, -1);
}
+
+ /**
+ * Get the number of milliseconds to wait for an event before continuing on
+ *
+ * @return Number of milliseconds to wait for an event before continuing on
+ */
+ public int getEventWaitMsecs() {
+ return getInt(EVENT_WAIT_MSECS, EVENT_WAIT_MSECS_DEFAULT);
+ }
+
+ /**
+ * Set the number of milliseconds to wait for an event before continuing on
+ *
+ * @param eventWaitMsecs Number of milliseconds to wait for an event before
+ * continuing on
+ */
+ public void setEventWaitMsecs(int eventWaitMsecs) {
+ setInt(EVENT_WAIT_MSECS, eventWaitMsecs);
+ }
+
+ /**
+ * Get the maximum milliseconds to wait before giving up trying to get the
+ * minimum number of workers before a superstep.
+ *
+ * @return Maximum milliseconds to wait before giving up trying to get the
+ * minimum number of workers before a superstep
+ */
+ public int getMaxMasterSuperstepWaitMsecs() {
+ return getInt(MAX_MASTER_SUPERSTEP_WAIT_MSECS,
+ MAX_MASTER_SUPERSTEP_WAIT_MSECS_DEFAULT);
+ }
+
+ /**
+ * Set the maximum milliseconds to wait before giving up trying to get the
+ * minimum number of workers before a superstep.
+ *
+ * @param maxMasterSuperstepWaitMsecs Maximum milliseconds to wait before
+ * giving up trying to get the minimum
+ * number of workers before a superstep
+ */
+ public void setMaxMasterSuperstepWaitMsecs(int maxMasterSuperstepWaitMsecs) {
+ setInt(MAX_MASTER_SUPERSTEP_WAIT_MSECS, maxMasterSuperstepWaitMsecs);
+ }
}
Modified:
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1418483&r1=1418482&r2=1418483&view=diff
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
(original)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
Fri Dec 7 21:23:39 2012
@@ -125,10 +125,10 @@ public class BspServiceMaster<I extends
private final int minWorkers;
/** Min % responded workers */
private final float minPercentResponded;
- /** Poll period in msecs */
- private final int msecsPollPeriod;
- /** Max number of poll attempts */
- private final int maxPollAttempts;
+ /** Msecs to wait for an event */
+ private final int eventWaitMsecs;
+ /** Max msecs to wait for a superstep to get enough workers */
+ private final int maxSuperstepWaitMsecs;
/** Min number of long tails before printing */
private final int partitionLongTailMinPrint;
/** Last finalized checkpoint */
@@ -188,10 +188,8 @@ public class BspServiceMaster<I extends
minWorkers = conf.getInt(GiraphConfiguration.MIN_WORKERS, -1);
minPercentResponded = conf.getFloat(
GiraphConfiguration.MIN_PERCENT_RESPONDED, 100.0f);
- msecsPollPeriod = conf.getInt(GiraphConfiguration.POLL_MSECS,
- GiraphConfiguration.POLL_MSECS_DEFAULT);
- maxPollAttempts = conf.getInt(GiraphConfiguration.POLL_ATTEMPTS,
- GiraphConfiguration.POLL_ATTEMPTS_DEFAULT);
+ eventWaitMsecs = conf.getEventWaitMsecs();
+ maxSuperstepWaitMsecs = conf.getMaxMasterSuperstepWaitMsecs();
partitionLongTailMinPrint = conf.getInt(
GiraphConfiguration.PARTITION_LONG_TAIL_MIN_PRINT,
GiraphConfiguration.PARTITION_LONG_TAIL_MIN_PRINT_DEFAULT);
@@ -419,11 +417,13 @@ public class BspServiceMaster<I extends
*/
private List<WorkerInfo> checkWorkers() {
boolean failJob = true;
- int pollAttempt = 0;
+ long failWorkerCheckMsecs =
+ SystemTime.get().getMilliseconds() + maxSuperstepWaitMsecs;
List<WorkerInfo> healthyWorkerInfoList = new ArrayList<WorkerInfo>();
List<WorkerInfo> unhealthyWorkerInfoList = new ArrayList<WorkerInfo>();
int totalResponses = -1;
- while (pollAttempt < maxPollAttempts) {
+ while (SystemTime.get().getMilliseconds() < failWorkerCheckMsecs) {
+ getContext().progress();
getAllWorkerInfos(
getSuperstep(), healthyWorkerInfoList, unhealthyWorkerInfoList);
totalResponses = healthyWorkerInfoList.size() +
@@ -440,7 +440,7 @@ public class BspServiceMaster<I extends
" needed to start superstep " +
getSuperstep());
if (getWorkerHealthRegistrationChangedEvent().waitMsecs(
- msecsPollPeriod)) {
+ eventWaitMsecs)) {
if (LOG.isDebugEnabled()) {
LOG.debug("checkWorkers: Got event that health " +
"registration changed, not using poll attempt");
@@ -452,9 +452,10 @@ public class BspServiceMaster<I extends
LOG.info("checkWorkers: Only found " + totalResponses +
" responses of " + maxWorkers +
" needed to start superstep " +
- getSuperstep() + ". Sleeping for " +
- msecsPollPeriod + " msecs and used " + pollAttempt +
- " of " + maxPollAttempts + " attempts.");
+ getSuperstep() + ". Reporting every" +
+ eventWaitMsecs + " msecs, " +
+ (failWorkerCheckMsecs - SystemTime.get().getMilliseconds()) +
+ " more msecs left before giving up.");
// Find the missing workers if there are only a few
if ((maxWorkers - totalResponses) <=
partitionLongTailMinPrint) {
@@ -462,15 +463,14 @@ public class BspServiceMaster<I extends
unhealthyWorkerInfoList);
}
}
- ++pollAttempt;
}
if (failJob) {
LOG.error("checkWorkers: Did not receive enough processes in " +
"time (only " + totalResponses + " of " +
- minWorkers + " required). This occurs if you do not " +
- "have enough map tasks available simultaneously on " +
- "your Hadoop instance to fulfill the number of " +
- "requested workers.");
+ minWorkers + " required) after waiting " + maxSuperstepWaitMsecs +
+ "msecs). This occurs if you do not have enough map tasks " +
+ "available simultaneously on your Hadoop instance to fulfill " +
+ "the number of requested workers.");
return null;
}
Modified:
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1418483&r1=1418482&r2=1418483&view=diff
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
(original)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
Fri Dec 7 21:23:39 2012
@@ -348,11 +348,11 @@ public class BspServiceWorker<I extends
inputSplitsReadyStat = getZkExt().exists(
inputSplitPaths.getAllReadyPath(), true);
} catch (KeeperException e) {
- throw new IllegalStateException(
- "setup: KeeperException waiting on input splits", e);
+ throw new IllegalStateException("ensureInputSplitsReady: " +
+ "KeeperException waiting on input splits", e);
} catch (InterruptedException e) {
- throw new IllegalStateException(
- "setup: InterruptedException waiting on input splits", e);
+ throw new IllegalStateException("ensureInputSplitsReady: " +
+ "InterruptedException waiting on input splits", e);
}
if (inputSplitsReadyStat != null) {
break;
@@ -380,12 +380,11 @@ public class BspServiceWorker<I extends
CreateMode.PERSISTENT,
true);
} catch (KeeperException e) {
- throw new IllegalStateException(
- "setup: KeeperException creating worker done splits", e);
+ throw new IllegalStateException("waitForOtherWorkers: " +
+ "KeeperException creating worker done splits", e);
} catch (InterruptedException e) {
- throw new IllegalStateException(
- "setup: InterruptedException creating worker done splits",
- e);
+ throw new IllegalStateException("waitForOtherWorkers: " +
+ "InterruptedException creating worker done splits", e);
}
while (true) {
Stat inputSplitsDoneStat;
@@ -394,11 +393,11 @@ public class BspServiceWorker<I extends
getZkExt().exists(inputSplitPaths.getAllDonePath(),
true);
} catch (KeeperException e) {
- throw new IllegalStateException(
- "setup: KeeperException waiting on worker done splits", e);
+ throw new IllegalStateException("waitForOtherWorkers: " +
+ "KeeperException waiting on worker done splits", e);
} catch (InterruptedException e) {
- throw new IllegalStateException(
- "setup: InterruptedException waiting on worker done splits", e);
+ throw new IllegalStateException("waitForOtherWorkers: " +
+ "InterruptedException waiting on worker done splits", e);
}
if (inputSplitsDoneStat != null) {
break;
Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java?rev=1418483&r1=1418482&r2=1418483&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java Fri Dec 7
21:23:39 2012
@@ -100,8 +100,8 @@ public class BspCase implements Watcher
// Single node testing
conf.setBoolean(GiraphConfiguration.SPLIT_MASTER_WORKER, false);
}
- conf.setInt(GiraphConfiguration.POLL_ATTEMPTS, 10);
- conf.setInt(GiraphConfiguration.POLL_MSECS, 3 * 1000);
+ conf.setMaxMasterSuperstepWaitMsecs(30 * 1000);
+ conf.setEventWaitMsecs(3 * 1000);
conf.setInt(GiraphConfiguration.ZOOKEEPER_SERVERLIST_POLL_MSECS, 500);
if (getZooKeeperList() != null) {
conf.setZooKeeperConfiguration(getZooKeeperList());
Modified:
giraph/trunk/giraph/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/TestAutoCheckpoint.java?rev=1418483&r1=1418482&r2=1418483&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
(original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
Fri Dec 7 21:23:39 2012
@@ -66,8 +66,9 @@ public class TestAutoCheckpoint extends
conf.setBoolean(SimpleCheckpointVertex.ENABLE_FAULT, true);
conf.setInt("mapred.map.max.attempts", 4);
// Trigger failure faster
- conf.setInt("mapred.task.timeout", 30000);
- conf.setInt(GiraphConfiguration.POLL_MSECS, 5000);
+ conf.setInt("mapred.task.timeout", 10000);
+ conf.setMaxMasterSuperstepWaitMsecs(10000);
+ conf.setEventWaitMsecs(1000);
conf.setCheckpointFrequency(2);
conf.set(GiraphConfiguration.CHECKPOINT_DIRECTORY,
getTempPath("_singleFaultCheckpoints").toString());
Modified:
giraph/trunk/giraph/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java?rev=1418483&r1=1418482&r2=1418483&view=diff
==============================================================================
---
giraph/trunk/giraph/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java
(original)
+++
giraph/trunk/giraph/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java
Fri Dec 7 21:23:39 2012
@@ -18,10 +18,7 @@
package org.apache.giraph;
-import static org.junit.Assert.assertFalse;
-
import java.io.IOException;
-
import org.apache.giraph.examples.SimpleCheckpointVertex;
import
org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
import
org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
@@ -29,44 +26,46 @@ import org.apache.giraph.graph.GiraphJob
import org.apache.hadoop.fs.Path;
import org.junit.Test;
+import static org.junit.Assert.assertFalse;
+
/**
* Unit test for not enough map tasks
*/
public class TestNotEnoughMapTasks extends BspCase {
- public TestNotEnoughMapTasks() {
- super(TestNotEnoughMapTasks.class.getName());
- }
-
- /**
- * This job should always fail gracefully with not enough map tasks.
- *
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws InterruptedException
- */
- @Test
- public void testNotEnoughMapTasks()
- throws IOException, InterruptedException, ClassNotFoundException {
- if (!runningInDistributedMode()) {
- System.out.println(
- "testNotEnoughMapTasks: Ignore this test in local mode.");
- return;
- }
- Path outputPath = getTempPath(getCallingMethodName());
- GiraphJob job = prepareJob(getCallingMethodName(),
- SimpleCheckpointVertex.SimpleCheckpointComputation.class,
- SimpleSuperstepVertexInputFormat.class,
- SimpleSuperstepVertexOutputFormat.class, outputPath);
-
- // An unlikely impossible number of workers to achieve
- final int unlikelyWorkers = Short.MAX_VALUE;
- job.getConfiguration().setWorkerConfiguration(unlikelyWorkers,
- unlikelyWorkers,
- 100.0f);
- // Only one poll attempt of one second to make failure faster
- job.getConfiguration().setInt(GiraphConfiguration.POLL_ATTEMPTS, 1);
- job.getConfiguration().setInt(GiraphConfiguration.POLL_MSECS, 1);
- assertFalse(job.run(false));
+ public TestNotEnoughMapTasks() {
+ super(TestNotEnoughMapTasks.class.getName());
+ }
+
+ /**
+ * This job should always fail gracefully with not enough map tasks.
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testNotEnoughMapTasks()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ if (!runningInDistributedMode()) {
+ System.out.println(
+ "testNotEnoughMapTasks: Ignore this test in local mode.");
+ return;
}
+ Path outputPath = getTempPath(getCallingMethodName());
+ GiraphJob job = prepareJob(getCallingMethodName(),
+ SimpleCheckpointVertex.SimpleCheckpointComputation.class,
+ SimpleSuperstepVertexInputFormat.class,
+ SimpleSuperstepVertexOutputFormat.class, outputPath);
+
+ // An unlikely impossible number of workers to achieve
+ final int unlikelyWorkers = Short.MAX_VALUE;
+ job.getConfiguration().setWorkerConfiguration(unlikelyWorkers,
+ unlikelyWorkers,
+ 100.0f);
+ // Only one poll attempt of one second to make failure faster
+ job.getConfiguration().setMaxMasterSuperstepWaitMsecs(1000);
+ job.getConfiguration().setEventWaitMsecs(1000);
+ assertFalse(job.run(false));
+ }
}