Updated Branches: refs/heads/trunk e6c4c4170 -> d1a061e1a
GIRAPH-831: waitUntilAllTasksDone waits forever (without debug information) (aching) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/d1a061e1 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/d1a061e1 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/d1a061e1 Branch: refs/heads/trunk Commit: d1a061e1a5382a12d03407d6bb1c19801fcc7fb1 Parents: e6c4c41 Author: Avery Ching <[email protected]> Authored: Sun Jan 26 21:54:42 2014 -0800 Committer: Avery Ching <[email protected]> Committed: Mon Jan 27 23:02:28 2014 -0800 ---------------------------------------------------------------------- CHANGELOG | 2 + .../apache/giraph/conf/GiraphConfiguration.java | 20 +++++ .../org/apache/giraph/conf/GiraphConstants.java | 9 +++ .../apache/giraph/zk/ComputationDoneName.java | 84 ++++++++++++++++++++ .../org/apache/giraph/zk/ZooKeeperManager.java | 51 ++++++++---- .../giraph/zk/ComputationDoneNameTest.java | 40 ++++++++++ 6 files changed, 191 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/d1a061e1/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 45551fd..7435a92 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.1.0 - unreleased + GIRAPH-831: waitUntilAllTasksDone waits forever (without debug information) (aching) + GIRAPH-830: directMemory used in netty message (pavanka via aching) GIRAPH-829: Compilation error due to wrong rexster jar file (armax00 via claudio) http://git-wip-us.apache.org/repos/asf/giraph/blob/d1a061e1/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java index e378147..8cf403a 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java @@ -1177,4 +1177,24 @@ public class GiraphConfiguration extends Configuration public void setCreateSourceVertex(boolean createVertex) { CREATE_EDGE_SOURCE_VERTICES.set(this, createVertex); } + + /** + * Get the maximum timeout (in milliseconds) for waiting for all tasks + * to complete after the job is done. + * + * @return Wait task done timeout in milliseconds. + */ + public int getWaitTaskDoneTimeoutMs() { + return WAIT_TASK_DONE_TIMEOUT_MS.get(this); + } + + /** + * Set the maximum timeout (in milliseconds) for waiting for all tasks + * to complete after the job is done. + * + * @param ms Milliseconds to set + */ + public void setWaitTaskDoneTimeoutMs(int ms) { + WAIT_TASK_DONE_TIMEOUT_MS.set(this, ms); + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/d1a061e1/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index aca0c16..4e68308 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -1004,5 +1004,14 @@ public interface GiraphConstants { HaltApplicationUtils.DefaultHaltInstructionsWriter.class, HaltApplicationUtils.HaltInstructionsWriter.class, "Class used to write instructions on how to halt the application"); + + /** + * Maximum timeout (in milliseconds) for waiting for all tasks + * to complete after the job is done. Defaults to 15 minutes. + */ + IntConfOption WAIT_TASK_DONE_TIMEOUT_MS = + new IntConfOption("giraph.waitTaskDoneTimeoutMs", MINUTES.toMillis(15), + "Maximum timeout (in ms) for waiting for all all tasks to " + + "complete"); } // CHECKSTYLE: resume InterfaceIsTypeCheck http://git-wip-us.apache.org/repos/asf/giraph/blob/d1a061e1/giraph-core/src/main/java/org/apache/giraph/zk/ComputationDoneName.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/zk/ComputationDoneName.java b/giraph-core/src/main/java/org/apache/giraph/zk/ComputationDoneName.java new file mode 100644 index 0000000..97e60cc --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/zk/ComputationDoneName.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.zk; + +import javax.annotation.concurrent.Immutable; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * This name is used by each worker as a file to let the ZooKeeper + * servers know that they can shutdown. + */ +@Immutable +public class ComputationDoneName { + /** Will end the name (easy to detect if this is a name match) */ + private static final String COMPUTATION_DONE_SUFFIX = + ".COMPUTATION_DONE"; + /** Unique worker id */ + private final int workerId; + /** Name as a string */ + private final String name; + + /** + * Constructor. + * + * @param workerId Unique worker id + */ + public ComputationDoneName(int workerId) { + this.workerId = workerId; + this.name = Integer.toString(workerId) + COMPUTATION_DONE_SUFFIX; + } + + public int getWorkerId() { + return workerId; + } + + public String getName() { + return name; + } + + /** + * Create this object from a name (if possible). If the name is not + * able to be parsed this will throw various runtime exceptions. + * + * @param name Name to parse + * @return ComputationDoneName object that represents this name + */ + public static final ComputationDoneName fromName(String name) { + checkNotNull(name, "name is null"); + checkArgument(name.endsWith(COMPUTATION_DONE_SUFFIX), + "Name %s is not a valid ComputationDoneName", name); + + return new ComputationDoneName( + Integer.parseInt(name.replace(COMPUTATION_DONE_SUFFIX, ""))); + } + + /** + * Is this string a ComputationDoneName? + * + * @param name Name to check + * @return True if matches the format of a ComputationDoneName, + * false otherwise + */ + public static final boolean isName(String name) { + return name.endsWith(COMPUTATION_DONE_SUFFIX); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/d1a061e1/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java index b3db28f..66f627f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java @@ -18,9 +18,14 @@ package org.apache.giraph.zk; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.io.Closeables; import org.apache.commons.io.FileUtils; import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.time.SystemTime; +import org.apache.giraph.time.Time; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -30,10 +35,6 @@ import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.zookeeper.server.quorum.QuorumPeerMain; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.io.Closeables; - import java.io.BufferedReader; import java.io.File; import java.io.FileWriter; @@ -68,8 +69,6 @@ public class ZooKeeperManager { /** The ZooKeeperString filename prefix */ private static final String ZOOKEEPER_SERVER_LIST_FILE_PREFIX = "zkServerList_"; - /** Denotes that the computation is done for a partition */ - private static final String COMPUTATION_DONE_SUFFIX = ".COMPUTATION_DONE"; /** Job context (mainly for progress) */ private Mapper<?, ?, ?, ?>.Context context; /** Hadoop configuration */ @@ -119,6 +118,8 @@ public class ZooKeeperManager { * files will go) */ private final String zkDirDefault; + /** Time object for tracking timeouts */ + private final Time time = SystemTime.get(); /** State of the application */ public enum State { @@ -150,8 +151,7 @@ public class ZooKeeperManager { serverDirectory = new Path(baseDirectory, "_zkServer"); myClosedPath = new Path(taskDirectory, - Integer.toString(taskPartition) + - COMPUTATION_DONE_SUFFIX); + (new ComputationDoneName(taskPartition)).getName()); pollMsecs = GiraphConstants.ZOOKEEPER_SERVERLIST_POLL_MSECS.get(conf); serverCount = GiraphConstants.ZOOKEEPER_SERVER_COUNT.get(conf); String jobLocalDir = conf.get("job.local.dir"); @@ -624,8 +624,7 @@ public class ZooKeeperManager { LOG.warn("onlineZooKeeperServers: Failed to delete " + "directory " + this.zkDir, e); } - generateZooKeeperConfigFile( - new ArrayList<String>(zkServerPortMap.keySet())); + generateZooKeeperConfigFile(new ArrayList<>(zkServerPortMap.keySet())); ProcessBuilder processBuilder = new ProcessBuilder(); List<String> commandList = Lists.newArrayList(); String javaHome = System.getProperty("java.home"); @@ -752,7 +751,7 @@ public class ZooKeeperManager { "task failed) to create filestamp " + myReadyPath, e); } } else { - List<String> foundList = new ArrayList<String>(); + List<String> foundList = new ArrayList<>(); int readyRetrievalAttempt = 0; while (true) { try { @@ -807,22 +806,29 @@ public class ZooKeeperManager { } /** - * Wait for all map tasks to signal completion. + * Wait for all map tasks to signal completion. Will wait up to + * WAIT_TASK_DONE_TIMEOUT_MS milliseconds for this to complete before + * reporting an error. * * @param totalMapTasks Number of map tasks to wait for */ private void waitUntilAllTasksDone(int totalMapTasks) { int attempt = 0; + long maxMs = time.getMilliseconds() + + conf.getWaitTaskDoneTimeoutMs(); while (true) { + boolean[] taskDoneArray = new boolean[totalMapTasks]; try { FileStatus [] fileStatusArray = fs.listStatus(taskDirectory); int totalDone = 0; if (fileStatusArray.length > 0) { - for (int i = 0; i < fileStatusArray.length; ++i) { - if (fileStatusArray[i].getPath().getName().endsWith( - COMPUTATION_DONE_SUFFIX)) { + for (FileStatus fileStatus : fileStatusArray) { + String name = fileStatus.getPath().getName(); + if (ComputationDoneName.isName(name)) { ++totalDone; + taskDoneArray[ComputationDoneName.fromName( + name).getWorkerId()] = true; } } } @@ -835,6 +841,15 @@ public class ZooKeeperManager { } if (totalDone >= totalMapTasks) { break; + } else { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < taskDoneArray.length; ++i) { + if (!taskDoneArray[i]) { + sb.append(i).append(", "); + } + } + LOG.info("waitUntilAllTasksDone: Still waiting on tasks " + + sb.toString()); } ++attempt; Thread.sleep(pollMsecs); @@ -844,6 +859,12 @@ public class ZooKeeperManager { } catch (InterruptedException e) { LOG.warn("waitUntilAllTasksDone: Got InterruptedException", e); } + + if (time.getMilliseconds() > maxMs) { + throw new IllegalStateException("waitUntilAllTasksDone: Tasks " + + "did not finish by the maximum time of " + + conf.getWaitTaskDoneTimeoutMs() + " milliseconds"); + } } } http://git-wip-us.apache.org/repos/asf/giraph/blob/d1a061e1/giraph-core/src/test/java/org/apache/giraph/zk/ComputationDoneNameTest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/zk/ComputationDoneNameTest.java b/giraph-core/src/test/java/org/apache/giraph/zk/ComputationDoneNameTest.java new file mode 100644 index 0000000..50e77f9 --- /dev/null +++ b/giraph-core/src/test/java/org/apache/giraph/zk/ComputationDoneNameTest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.zk; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Test the name generation and parsing + */ +public class ComputationDoneNameTest { + @Test + public void testGenerateNameAndParseIt() { + ComputationDoneName computationDoneName = new ComputationDoneName(7); + assertEquals(7, computationDoneName.getWorkerId()); + + ComputationDoneName parsedComputationDoneName = + ComputationDoneName.fromName(computationDoneName.getName()); + assertEquals(parsedComputationDoneName.getName(), + computationDoneName.getName()); + assertEquals(7, parsedComputationDoneName.getWorkerId()); + } +}
