Updated Branches:
  refs/heads/trunk e6c4c4170 -> d1a061e1a

GIRAPH-831: waitUntilAllTasksDone waits forever (without debug information) 
(aching)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/d1a061e1
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/d1a061e1
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/d1a061e1

Branch: refs/heads/trunk
Commit: d1a061e1a5382a12d03407d6bb1c19801fcc7fb1
Parents: e6c4c41
Author: Avery Ching <[email protected]>
Authored: Sun Jan 26 21:54:42 2014 -0800
Committer: Avery Ching <[email protected]>
Committed: Mon Jan 27 23:02:28 2014 -0800

----------------------------------------------------------------------
 CHANGELOG                                       |  2 +
 .../apache/giraph/conf/GiraphConfiguration.java | 20 +++++
 .../org/apache/giraph/conf/GiraphConstants.java |  9 +++
 .../apache/giraph/zk/ComputationDoneName.java   | 84 ++++++++++++++++++++
 .../org/apache/giraph/zk/ZooKeeperManager.java  | 51 ++++++++----
 .../giraph/zk/ComputationDoneNameTest.java      | 40 ++++++++++
 6 files changed, 191 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/d1a061e1/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 45551fd..7435a92 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-831: waitUntilAllTasksDone waits forever (without debug information) 
(aching)
+
   GIRAPH-830: directMemory used in netty message (pavanka via aching)
 
   GIRAPH-829: Compilation error due to wrong rexster jar file (armax00 via 
claudio)

http://git-wip-us.apache.org/repos/asf/giraph/blob/d1a061e1/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java 
b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index e378147..8cf403a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -1177,4 +1177,24 @@ public class GiraphConfiguration extends Configuration
   public void setCreateSourceVertex(boolean createVertex) {
     CREATE_EDGE_SOURCE_VERTICES.set(this, createVertex);
   }
+
+  /**
+   * Get the maximum timeout (in milliseconds) for waiting for all tasks
+   * to complete after the job is done.
+   *
+   * @return Wait task done timeout in milliseconds.
+   */
+  public int getWaitTaskDoneTimeoutMs() {
+    return WAIT_TASK_DONE_TIMEOUT_MS.get(this);
+  }
+
+  /**
+   * Set the maximum timeout (in milliseconds) for waiting for all tasks
+   * to complete after the job is done.
+   *
+   * @param ms Milliseconds to set
+   */
+  public void setWaitTaskDoneTimeoutMs(int ms) {
+    WAIT_TASK_DONE_TIMEOUT_MS.set(this, ms);
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d1a061e1/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java 
b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index aca0c16..4e68308 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -1004,5 +1004,14 @@ public interface GiraphConstants {
       HaltApplicationUtils.DefaultHaltInstructionsWriter.class,
       HaltApplicationUtils.HaltInstructionsWriter.class,
       "Class used to write instructions on how to halt the application");
+
+  /**
+   * Maximum timeout (in milliseconds) for waiting for all tasks
+   * to complete after the job is done.  Defaults to 15 minutes.
+   */
+  IntConfOption WAIT_TASK_DONE_TIMEOUT_MS =
+      new IntConfOption("giraph.waitTaskDoneTimeoutMs", MINUTES.toMillis(15),
+          "Maximum timeout (in ms) for waiting for all all tasks to " +
+              "complete");
 }
 // CHECKSTYLE: resume InterfaceIsTypeCheck

http://git-wip-us.apache.org/repos/asf/giraph/blob/d1a061e1/giraph-core/src/main/java/org/apache/giraph/zk/ComputationDoneName.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/zk/ComputationDoneName.java 
b/giraph-core/src/main/java/org/apache/giraph/zk/ComputationDoneName.java
new file mode 100644
index 0000000..97e60cc
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/zk/ComputationDoneName.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.zk;
+
+import javax.annotation.concurrent.Immutable;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * This name is used by each worker as a file to let the ZooKeeper
+ * servers know that they can shutdown.
+ */
+@Immutable
+public class ComputationDoneName {
+  /** Will end the name (easy to detect if this is a name match) */
+  private static final String COMPUTATION_DONE_SUFFIX =
+      ".COMPUTATION_DONE";
+  /** Unique worker id */
+  private final int workerId;
+  /** Name as a string */
+  private final String name;
+
+  /**
+   * Constructor.
+   *
+   * @param workerId Unique worker id
+   */
+  public ComputationDoneName(int workerId) {
+    this.workerId = workerId;
+    this.name = Integer.toString(workerId) + COMPUTATION_DONE_SUFFIX;
+  }
+
+  public int getWorkerId() {
+    return workerId;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  /**
+   * Create this object from a name (if possible).  If the name is not
+   * able to be parsed this will throw various runtime exceptions.
+   *
+   * @param name Name to parse
+   * @return ComputationDoneName object that represents this name
+   */
+  public static final ComputationDoneName fromName(String name) {
+    checkNotNull(name, "name is null");
+    checkArgument(name.endsWith(COMPUTATION_DONE_SUFFIX),
+        "Name %s is not a valid ComputationDoneName", name);
+
+    return new ComputationDoneName(
+        Integer.parseInt(name.replace(COMPUTATION_DONE_SUFFIX, "")));
+  }
+
+  /**
+   * Is this string a ComputationDoneName?
+   *
+   * @param name Name to check
+   * @return True if matches the format of a ComputationDoneName,
+   *         false otherwise
+   */
+  public static final boolean isName(String name) {
+    return name.endsWith(COMPUTATION_DONE_SUFFIX);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/d1a061e1/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java 
b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
index b3db28f..66f627f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
@@ -18,9 +18,14 @@
 
 package org.apache.giraph.zk;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.Closeables;
 import org.apache.commons.io.FileUtils;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.time.SystemTime;
+import org.apache.giraph.time.Time;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -30,10 +35,6 @@ import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.server.quorum.QuorumPeerMain;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.io.Closeables;
-
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileWriter;
@@ -68,8 +69,6 @@ public class ZooKeeperManager {
   /** The ZooKeeperString filename prefix */
   private static final String ZOOKEEPER_SERVER_LIST_FILE_PREFIX =
       "zkServerList_";
-  /** Denotes that the computation is done for a partition */
-  private static final String COMPUTATION_DONE_SUFFIX = ".COMPUTATION_DONE";
   /** Job context (mainly for progress) */
   private Mapper<?, ?, ?, ?>.Context context;
   /** Hadoop configuration */
@@ -119,6 +118,8 @@ public class ZooKeeperManager {
    * files will go)
    */
   private final String zkDirDefault;
+  /** Time object for tracking timeouts */
+  private final Time time = SystemTime.get();
 
   /** State of the application */
   public enum State {
@@ -150,8 +151,7 @@ public class ZooKeeperManager {
     serverDirectory = new Path(baseDirectory,
         "_zkServer");
     myClosedPath = new Path(taskDirectory,
-        Integer.toString(taskPartition) +
-        COMPUTATION_DONE_SUFFIX);
+        (new ComputationDoneName(taskPartition)).getName());
     pollMsecs = GiraphConstants.ZOOKEEPER_SERVERLIST_POLL_MSECS.get(conf);
     serverCount = GiraphConstants.ZOOKEEPER_SERVER_COUNT.get(conf);
     String jobLocalDir = conf.get("job.local.dir");
@@ -624,8 +624,7 @@ public class ZooKeeperManager {
         LOG.warn("onlineZooKeeperServers: Failed to delete " +
             "directory " + this.zkDir, e);
       }
-      generateZooKeeperConfigFile(
-          new ArrayList<String>(zkServerPortMap.keySet()));
+      generateZooKeeperConfigFile(new ArrayList<>(zkServerPortMap.keySet()));
       ProcessBuilder processBuilder = new ProcessBuilder();
       List<String> commandList = Lists.newArrayList();
       String javaHome = System.getProperty("java.home");
@@ -752,7 +751,7 @@ public class ZooKeeperManager {
             "task failed) to create filestamp " + myReadyPath, e);
       }
     } else {
-      List<String> foundList = new ArrayList<String>();
+      List<String> foundList = new ArrayList<>();
       int readyRetrievalAttempt = 0;
       while (true) {
         try {
@@ -807,22 +806,29 @@ public class ZooKeeperManager {
   }
 
   /**
-   * Wait for all map tasks to signal completion.
+   * Wait for all map tasks to signal completion.  Will wait up to
+   * WAIT_TASK_DONE_TIMEOUT_MS milliseconds for this to complete before
+   * reporting an error.
    *
    * @param totalMapTasks Number of map tasks to wait for
    */
   private void waitUntilAllTasksDone(int totalMapTasks) {
     int attempt = 0;
+    long maxMs = time.getMilliseconds() +
+        conf.getWaitTaskDoneTimeoutMs();
     while (true) {
+      boolean[] taskDoneArray = new boolean[totalMapTasks];
       try {
         FileStatus [] fileStatusArray =
             fs.listStatus(taskDirectory);
         int totalDone = 0;
         if (fileStatusArray.length > 0) {
-          for (int i = 0; i < fileStatusArray.length; ++i) {
-            if (fileStatusArray[i].getPath().getName().endsWith(
-                COMPUTATION_DONE_SUFFIX)) {
+          for (FileStatus fileStatus : fileStatusArray) {
+            String name = fileStatus.getPath().getName();
+            if (ComputationDoneName.isName(name)) {
               ++totalDone;
+              taskDoneArray[ComputationDoneName.fromName(
+                  name).getWorkerId()] = true;
             }
           }
         }
@@ -835,6 +841,15 @@ public class ZooKeeperManager {
         }
         if (totalDone >= totalMapTasks) {
           break;
+        } else {
+          StringBuilder sb = new StringBuilder();
+          for (int i = 0; i < taskDoneArray.length; ++i) {
+            if (!taskDoneArray[i]) {
+              sb.append(i).append(", ");
+            }
+          }
+          LOG.info("waitUntilAllTasksDone: Still waiting on tasks " +
+              sb.toString());
         }
         ++attempt;
         Thread.sleep(pollMsecs);
@@ -844,6 +859,12 @@ public class ZooKeeperManager {
       } catch (InterruptedException e) {
         LOG.warn("waitUntilAllTasksDone: Got InterruptedException", e);
       }
+
+      if (time.getMilliseconds() > maxMs) {
+        throw new IllegalStateException("waitUntilAllTasksDone: Tasks " +
+            "did not finish by the maximum time of " +
+            conf.getWaitTaskDoneTimeoutMs() + " milliseconds");
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/d1a061e1/giraph-core/src/test/java/org/apache/giraph/zk/ComputationDoneNameTest.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/test/java/org/apache/giraph/zk/ComputationDoneNameTest.java 
b/giraph-core/src/test/java/org/apache/giraph/zk/ComputationDoneNameTest.java
new file mode 100644
index 0000000..50e77f9
--- /dev/null
+++ 
b/giraph-core/src/test/java/org/apache/giraph/zk/ComputationDoneNameTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.zk;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test the name generation and parsing
+ */
+public class ComputationDoneNameTest {
+  @Test
+  public void testGenerateNameAndParseIt() {
+    ComputationDoneName computationDoneName = new ComputationDoneName(7);
+    assertEquals(7, computationDoneName.getWorkerId());
+
+    ComputationDoneName parsedComputationDoneName =
+        ComputationDoneName.fromName(computationDoneName.getName());
+    assertEquals(parsedComputationDoneName.getName(),
+        computationDoneName.getName());
+    assertEquals(7, parsedComputationDoneName.getWorkerId());
+  }
+}

Reply via email to