Updated Branches:
  refs/heads/trunk b022dce9f -> 1c967b8e0

GIRAPH-498: We should check input splits status from zookeeeper once per 
worker, not once per split thread (majakabiljo)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/1c967b8e
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/1c967b8e
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/1c967b8e

Branch: refs/heads/trunk
Commit: 1c967b8e0b282deb9a3d49231f410ffcabd9e9de
Parents: b022dce
Author: Maja Kabiljo <[email protected]>
Authored: Tue Feb 5 10:34:54 2013 -0800
Committer: Maja Kabiljo <[email protected]>
Committed: Tue Feb 5 10:36:21 2013 -0800

----------------------------------------------------------------------
 CHANGELOG                                          |    3 +
 .../apache/giraph/conf/GiraphConfiguration.java    |   10 +
 .../org/apache/giraph/worker/BspServiceWorker.java |   28 ++-
 .../giraph/worker/EdgeInputSplitsCallable.java     |   14 +-
 .../worker/EdgeInputSplitsCallableFactory.java     |   21 +-
 .../giraph/worker/InputSplitPathOrganizer.java     |  103 +-------
 .../apache/giraph/worker/InputSplitsCallable.java  |  167 +------------
 .../apache/giraph/worker/InputSplitsHandler.java   |  199 +++++++++++++++
 .../giraph/worker/VertexInputSplitsCallable.java   |   14 +-
 .../worker/VertexInputSplitsCallableFactory.java   |   21 +-
 .../test/java/org/apache/giraph/TestBspBasic.java  |    8 +-
 11 files changed, 280 insertions(+), 308 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/1c967b8e/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index d50e1e5..926e404 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,9 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-498: We should check input splits status from zookeeeper once per 
worker, 
+  not once per split thread (majakabiljo)
+
   GIRAPH-497: Limiting number of open requests doesn't work with 
multithreading (majakabiljo via ereisman)
 
   GIRAPH-461: Convert static assignment of in-memory partitions with LRU cache 
(claudio)

http://git-wip-us.apache.org/repos/asf/giraph/blob/1c967b8e/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java 
b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 796047d..79b12d3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -645,4 +645,14 @@ public class GiraphConfiguration extends Configuration
       set("mapreduce.job.credentials.binary", hadoopTokenFilePath);
     }
   }
+
+  /**
+   * Check if we want to prioritize input splits which reside on the host.
+   *
+   * @return True iff we want to use input split locality
+   */
+  public boolean useInputSplitLocality() {
+    return getBoolean(GiraphConstants.USE_INPUT_SPLIT_LOCALITY,
+        GiraphConstants.USE_INPUT_SPLIT_LOCALITY_DEFAULT);
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/1c967b8e/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java 
b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index a48c5ea..ff1033e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -297,14 +297,24 @@ public class BspServiceWorker<I extends 
WritableComparable,
         INPUT_SUPERSTEP, 0, 0, getContext(), getGraphTaskManager(),
         null, null);
 
+    InputSplitPathOrganizer splitOrganizer =
+        new InputSplitPathOrganizer(getZkExt(),
+            inputSplitPathList, getWorkerInfo().getHostname(),
+            getConfiguration().useInputSplitLocality());
+    InputSplitsHandler splitsHandler = new InputSplitsHandler(
+        splitOrganizer,
+        getZkExt(),
+        getContext(),
+        BspService.VERTEX_INPUT_SPLIT_RESERVED_NODE,
+        BspService.VERTEX_INPUT_SPLIT_FINISHED_NODE);
+
     VertexInputSplitsCallableFactory<I, V, E, M> inputSplitsCallableFactory =
         new VertexInputSplitsCallableFactory<I, V, E, M>(
             getContext(),
             graphState,
             getConfiguration(),
             this,
-            inputSplitPathList,
-            getWorkerInfo(),
+            splitsHandler,
             getZkExt());
 
     return loadInputSplits(inputSplitPathList, inputSplitsCallableFactory);
@@ -324,14 +334,24 @@ public class BspServiceWorker<I extends 
WritableComparable,
         INPUT_SUPERSTEP, 0, 0, getContext(), getGraphTaskManager(),
         null, null);
 
+    InputSplitPathOrganizer splitOrganizer =
+        new InputSplitPathOrganizer(getZkExt(),
+            inputSplitPathList, getWorkerInfo().getHostname(),
+            getConfiguration().useInputSplitLocality());
+    InputSplitsHandler splitsHandler = new InputSplitsHandler(
+        splitOrganizer,
+        getZkExt(),
+        getContext(),
+        BspService.EDGE_INPUT_SPLIT_RESERVED_NODE,
+        BspService.EDGE_INPUT_SPLIT_FINISHED_NODE);
+
     EdgeInputSplitsCallableFactory<I, V, E, M> inputSplitsCallableFactory =
         new EdgeInputSplitsCallableFactory<I, V, E, M>(
             getContext(),
             graphState,
             getConfiguration(),
             this,
-            inputSplitPathList,
-            getWorkerInfo(),
+            splitsHandler,
             getZkExt());
 
     return loadInputSplits(inputSplitPathList, inputSplitsCallableFactory).

http://git-wip-us.apache.org/repos/asf/giraph/blob/1c967b8e/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
 
b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
index 7d40dfb..80c341c 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
@@ -39,7 +39,6 @@ import org.apache.log4j.Logger;
 import com.yammer.metrics.core.Counter;
 
 import java.io.IOException;
-import java.util.List;
 
 /**
  * Load as many edge input splits as possible.
@@ -73,9 +72,7 @@ public class EdgeInputSplitsCallable<I extends 
WritableComparable,
    * @param graphState Graph state
    * @param configuration Configuration
    * @param bspServiceWorker service worker
-   * @param inputSplitPathList List of the paths of the input splits
-   * @param workerInfo This worker's info
-   * @param threadId Id of input split thread
+   * @param splitsHandler Handler for input splits
    * @param zooKeeperExt Handle to ZooKeeperExt
    */
   public EdgeInputSplitsCallable(
@@ -83,15 +80,10 @@ public class EdgeInputSplitsCallable<I extends 
WritableComparable,
       GraphState<I, V, E, M> graphState,
       ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
       BspServiceWorker<I, V, E, M> bspServiceWorker,
-      List<String> inputSplitPathList,
-      WorkerInfo workerInfo,
-      int threadId,
+      InputSplitsHandler splitsHandler,
       ZooKeeperExt zooKeeperExt)  {
     super(context, graphState, configuration, bspServiceWorker,
-        inputSplitPathList, workerInfo, threadId, zooKeeperExt,
-        BspServiceWorker.EDGE_INPUT_SPLIT_RESERVED_NODE,
-        BspServiceWorker.EDGE_INPUT_SPLIT_FINISHED_NODE,
-        bspServiceWorker.getEdgeInputSplitsEvents());
+        splitsHandler, zooKeeperExt);
 
     inputSplitMaxEdges = configuration.getInputSplitMaxEdges();
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/1c967b8e/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
 
b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
index 1adcd73..9297ac1 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
@@ -25,8 +25,6 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 
-import java.util.List;
-
 /**
  * Factory for {@link EdgeInputSplitsCallable}s.
  *
@@ -46,10 +44,8 @@ public class EdgeInputSplitsCallableFactory<I extends 
WritableComparable,
   private final ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
   /** {@link BspServiceWorker} we're running on. */
   private final BspServiceWorker<I, V, E, M> bspServiceWorker;
-  /** List of input split paths. */
-  private final List<String> inputSplitPathList;
-  /** Worker info. */
-  private final WorkerInfo workerInfo;
+  /** Handler for input splits */
+  private final InputSplitsHandler splitsHandler;
   /** {@link ZooKeeperExt} for this worker. */
   private final ZooKeeperExt zooKeeperExt;
 
@@ -60,8 +56,7 @@ public class EdgeInputSplitsCallableFactory<I extends 
WritableComparable,
    * @param graphState Graph state
    * @param configuration Configuration
    * @param bspServiceWorker Calling {@link BspServiceWorker}
-   * @param inputSplitPathList List of input split paths
-   * @param workerInfo Worker info
+   * @param splitsHandler Handler for input splits
    * @param zooKeeperExt {@link ZooKeeperExt} for this worker
    */
   public EdgeInputSplitsCallableFactory(
@@ -69,16 +64,14 @@ public class EdgeInputSplitsCallableFactory<I extends 
WritableComparable,
       GraphState<I, V, E, M> graphState,
       ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
       BspServiceWorker<I, V, E, M> bspServiceWorker,
-      List<String> inputSplitPathList,
-      WorkerInfo workerInfo,
+      InputSplitsHandler splitsHandler,
       ZooKeeperExt zooKeeperExt) {
     this.context = context;
     this.graphState = graphState;
     this.configuration = configuration;
     this.bspServiceWorker = bspServiceWorker;
-    this.inputSplitPathList = inputSplitPathList;
-    this.workerInfo = workerInfo;
     this.zooKeeperExt = zooKeeperExt;
+    this.splitsHandler = splitsHandler;
   }
 
   @Override
@@ -88,9 +81,7 @@ public class EdgeInputSplitsCallableFactory<I extends 
WritableComparable,
         graphState,
         configuration,
         bspServiceWorker,
-        inputSplitPathList,
-        workerInfo,
-        threadId,
+        splitsHandler,
         zooKeeperExt);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/1c967b8e/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
 
b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
index bfaefd2..463601c 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
@@ -17,8 +17,8 @@
  */
 package org.apache.giraph.worker;
 
-import com.google.common.base.Objects;
 import com.google.common.collect.Lists;
+
 import org.apache.giraph.zk.ZooKeeperExt;
 import org.apache.hadoop.io.Text;
 import org.apache.zookeeper.KeeperException;
@@ -42,15 +42,13 @@ import java.util.List;
  * down on the number of ZK reads workers perform before locating
  * an unclaimed InputSplit.
  */
-public class InputSplitPathOrganizer implements Iterable<String> {
+public class InputSplitPathOrganizer {
   /** The worker's local ZooKeeperExt ref */
   private final ZooKeeperExt zooKeeper;
   /** The List of InputSplit znode paths */
   private final List<String> pathList;
   /** The worker's hostname */
   private final String hostName;
-  /** The adjusted base offset by which to iterate on the path list */
-  private int baseOffset;
 
   /**
    * Constructor
@@ -58,16 +56,13 @@ public class InputSplitPathOrganizer implements 
Iterable<String> {
    * @param zooKeeper the worker's ZkExt
    * @param zkPathList the path to read from
    * @param hostName the worker's host name (for matching)
-   * @param port the port number for this worker
-   * @param threadId id of the input split thread
    * @param useLocality whether to prioritize local input splits
    */
   public InputSplitPathOrganizer(final ZooKeeperExt zooKeeper,
-    final String zkPathList, final String hostName, final int port,
-    final int threadId, final boolean useLocality)
-    throws KeeperException, InterruptedException {
+    final String zkPathList, final String hostName,
+    final boolean useLocality) throws KeeperException, InterruptedException {
     this(zooKeeper, zooKeeper.getChildrenExt(zkPathList, false, false, true),
-        hostName, port, threadId, useLocality);
+        hostName, useLocality);
   }
 
   /**
@@ -76,48 +71,29 @@ public class InputSplitPathOrganizer implements 
Iterable<String> {
    * @param zooKeeper the worker's ZkExt
    * @param inputSplitPathList path of input splits to read from
    * @param hostName the worker's host name (for matching)
-   * @param port the port number for this worker
-   * @param threadId id of the input split thread
    * @param useLocality whether to prioritize local input splits
    */
   public InputSplitPathOrganizer(
       final ZooKeeperExt zooKeeper, final List<String> inputSplitPathList,
-      final String hostName, final int port, final int threadId,
-      final boolean useLocality)
-    throws KeeperException, InterruptedException {
+      final String hostName, final boolean useLocality) {
     this.zooKeeper = zooKeeper;
     this.pathList = Lists.newArrayList(inputSplitPathList);
     this.hostName = hostName;
+    // Shuffle input splits in case several workers exist on this host
+    Collections.shuffle(pathList);
     if (useLocality) {
-      prioritizeLocalInputSplits(port, threadId);
-    } else {
-      this.baseOffset = computeBaseOffset(port, threadId);
+      prioritizeLocalInputSplits();
     }
   }
 
   /**
-   * Compute base offset to start iterating from,
-   * in order to avoid collisions with other workers/threads.
-   *
-   * @param port the port number for this worker
-   * @param threadId id of the input split thread
-   * @return the offset to start iterating from
-   */
-  private int computeBaseOffset(final int port, final int threadId) {
-    return pathList.isEmpty() ? 0 :
-        Math.abs(Objects.hashCode(hostName, port, threadId) % pathList.size());
-  }
-
-  /**
   * Re-order list of InputSplits so files local to this worker node's
   * disk are the first it will iterate over when attempting to claim
   * a split to read. This will increase locality of data reads with greater
   * probability as the % of total nodes in the cluster hosting data and workers
   * BOTH increase towards 100%. Replication increases our chances of a "hit."
-  * @param port the port number to hash against
-  * @param threadId the threadId to hash against
   */
-  private void prioritizeLocalInputSplits(final int port, final int threadId) {
+  private void prioritizeLocalInputSplits() {
     List<String> sortedList = new ArrayList<String>();
     String hosts;
     for (Iterator<String> iterator = pathList.iterator(); iterator.hasNext();) 
{
@@ -136,13 +112,7 @@ public class InputSplitPathOrganizer implements 
Iterable<String> {
         iterator.remove(); // remove local block from list
       }
     }
-    // shuffle the local blocks in case several workers exist on this host
-    Collections.shuffle(sortedList);
-    // set the base offset for the split iterator based on the insertion
-    // point of the local list items back into the nonlocal split list.
-    baseOffset = computeBaseOffset(port, threadId);
-    // re-insert local paths at "adjusted index zero" for caller to iterate on
-    pathList.addAll(baseOffset, sortedList);
+    pathList.addAll(0, sortedList);
   }
 
   /**
@@ -162,54 +132,11 @@ public class InputSplitPathOrganizer implements 
Iterable<String> {
   }
 
   /**
-   * Utility accessor for Input Split znode path list size
-   *
-   * @return the size of <code>this.pathList</code>
-   */
-  public int getPathListSize() {
-    return this.pathList.size();
-  }
-
-  /**
-   * Iterator for the pathList
+   * Get the ordered input splits paths.
    *
-   * @return an iterator for our list of input split paths
+   * @return Ordered input splits paths
    */
-  public Iterator<String> iterator() {
-    return new PathListIterator();
-  }
-
-  /**
-   * Iterator for path list that handles the locality and hash offsetting.
-   */
-  public class PathListIterator implements Iterator<String> {
-    /** the current iterator index */
-    private int currentIndex = 0;
-
-    /**
-     *  Do we have more list to iterate upon?
-     *
-     *  @return true if more path strings are available
-     */
-    @Override
-    public boolean hasNext() {
-      return currentIndex < pathList.size();
-    }
-
-    /**
-     * Return the next pathList element
-     *
-     * @return the next input split path
-     */
-    @Override
-    public String next() {
-      return pathList.get((baseOffset + currentIndex++) % pathList.size());
-    }
-
-    /** Just a placeholder; should not do anything! */
-    @Override
-    public void remove() {
-      throw new UnsupportedOperationException("Remove is not allowed.");
-    }
+  public Iterable<String> getPathList() {
+    return pathList;
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/1c967b8e/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java 
b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
index d09ca2b..5487ab7 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
@@ -20,10 +20,8 @@ package org.apache.giraph.worker;
 
 import org.apache.giraph.comm.WorkerClientRequestProcessor;
 import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
-import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.GraphState;
-import org.apache.giraph.graph.InputSplitEvents;
 import org.apache.giraph.graph.VertexEdgeCount;
 import org.apache.giraph.time.SystemTime;
 import org.apache.giraph.time.Time;
@@ -36,15 +34,11 @@ import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.log4j.Logger;
-import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.Stat;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
-import java.util.List;
 import java.util.concurrent.Callable;
 
 /**
@@ -82,17 +76,11 @@ public abstract class InputSplitsCallable<I extends 
WritableComparable,
    * Stores and processes the list of InputSplits advertised
    * in a tree of child znodes by the master.
    */
-  private final InputSplitPathOrganizer splitOrganizer;
+  private final InputSplitsHandler splitsHandler;
   /** ZooKeeperExt handle */
   private final ZooKeeperExt zooKeeperExt;
   /** Get the start time in nanos */
   private final long startNanos = TIME.getNanoseconds();
-  /** ZooKeeper input split reserved node. */
-  private final String inputSplitReservedNode;
-  /** ZooKeeper input split finished node. */
-  private final String inputSplitFinishedNode;
-  /** Input split events. */
-  private final InputSplitEvents inputSplitEvents;
   /** Whether to prioritize local input splits. */
   private final boolean useLocality;
 
@@ -104,26 +92,16 @@ public abstract class InputSplitsCallable<I extends 
WritableComparable,
    * @param graphState Graph state
    * @param configuration Configuration
    * @param bspServiceWorker service worker
-   * @param inputSplitPathList List of the paths of the input splits
-   * @param workerInfo This worker's info
-   * @param threadId Id of input split thread
+   * @param splitsHandler Handler for input splits
    * @param zooKeeperExt Handle to ZooKeeperExt
-   * @param inputSplitReservedNode Path to input split reserved
-   * @param inputSplitFinishedNode Path to input split finsished
-   * @param inputSplitEvents Input split events
    */
   public InputSplitsCallable(
       Mapper<?, ?, ?, ?>.Context context,
       GraphState<I, V, E, M> graphState,
       ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
       BspServiceWorker<I, V, E, M> bspServiceWorker,
-      List<String> inputSplitPathList,
-      WorkerInfo workerInfo,
-      int threadId,
-      ZooKeeperExt zooKeeperExt,
-      String inputSplitReservedNode,
-      String inputSplitFinishedNode,
-      InputSplitEvents inputSplitEvents) {
+      InputSplitsHandler splitsHandler,
+      ZooKeeperExt zooKeeperExt) {
     this.zooKeeperExt = zooKeeperExt;
     this.context = context;
     this.workerClientRequestProcessor =
@@ -133,24 +111,9 @@ public abstract class InputSplitsCallable<I extends 
WritableComparable,
         graphState.getTotalNumVertices(), graphState.getTotalNumEdges(),
         context, graphState.getGraphTaskManager(), 
workerClientRequestProcessor,
         null);
-    this.useLocality = configuration.getBoolean(
-        GiraphConstants.USE_INPUT_SPLIT_LOCALITY,
-        GiraphConstants.USE_INPUT_SPLIT_LOCALITY_DEFAULT);
-    try {
-      splitOrganizer = new InputSplitPathOrganizer(zooKeeperExt,
-          inputSplitPathList, workerInfo.getHostname(), workerInfo.getPort(),
-          threadId, this.useLocality);
-    } catch (KeeperException e) {
-      throw new IllegalStateException(
-          "InputSplitsCallable: KeeperException", e);
-    } catch (InterruptedException e) {
-      throw new IllegalStateException(
-          "InputSplitsCallable: InterruptedException", e);
-    }
+    this.useLocality = configuration.useInputSplitLocality();
+    this.splitsHandler = splitsHandler;
     this.configuration = configuration;
-    this.inputSplitReservedNode = inputSplitReservedNode;
-    this.inputSplitFinishedNode = inputSplitFinishedNode;
-    this.inputSplitEvents = inputSplitEvents;
   }
   // CHECKSTYLE: resume ParameterNumberCheck
 
@@ -174,7 +137,7 @@ public abstract class InputSplitsCallable<I extends 
WritableComparable,
     String inputSplitPath;
     int inputSplitsProcessed = 0;
     try {
-      while ((inputSplitPath = reserveInputSplit()) != null) {
+      while ((inputSplitPath = splitsHandler.reserveInputSplit()) != null) {
         vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(
             loadInputSplit(inputSplitPath,
                 graphState));
@@ -214,120 +177,6 @@ public abstract class InputSplitsCallable<I extends 
WritableComparable,
   }
 
   /**
-   * Try to reserve an InputSplit for loading.  While InputSplits exists that
-   * are not finished, wait until they are.
-   *
-   * NOTE: iterations on the InputSplit list only halt for each worker when it
-   * has scanned the entire list once and found every split marked RESERVED.
-   * When a worker fails, its Ephemeral RESERVED znodes will disappear,
-   * allowing other iterating workers to claim it's previously read splits.
-   * Only when the last worker left iterating on the list fails can a danger
-   * of data loss occur. Since worker failure in INPUT_SUPERSTEP currently
-   * causes job failure, this is OK. As the failure model evolves, this
-   * behavior might need to change.
-   *
-   * @return reserved InputSplit or null if no unfinished InputSplits exist
-   * @throws org.apache.zookeeper.KeeperException
-   * @throws InterruptedException
-   */
-  private String reserveInputSplit()
-    throws KeeperException, InterruptedException {
-    String reservedInputSplitPath = null;
-    Stat reservedStat;
-    while (true) {
-      int reservedInputSplits = 0;
-      for (String nextSplitToClaim : splitOrganizer) {
-        context.progress();
-        String tmpInputSplitReservedPath = nextSplitToClaim +
-            inputSplitReservedNode;
-        reservedStat =
-            zooKeeperExt.exists(tmpInputSplitReservedPath, true);
-        if (reservedStat == null) {
-          try {
-            // Attempt to reserve this InputSplit
-            zooKeeperExt.createExt(tmpInputSplitReservedPath,
-                null,
-                ZooDefs.Ids.OPEN_ACL_UNSAFE,
-                CreateMode.EPHEMERAL,
-                false);
-            reservedInputSplitPath = nextSplitToClaim;
-            if (LOG.isInfoEnabled()) {
-              float percentFinished =
-                  reservedInputSplits * 100.0f /
-                      splitOrganizer.getPathListSize();
-              LOG.info("reserveInputSplit: Reserved input " +
-                  "split path " + reservedInputSplitPath +
-                  ", overall roughly " +
-                  + percentFinished +
-                  "% input splits reserved");
-            }
-            return reservedInputSplitPath;
-          } catch (KeeperException.NodeExistsException e) {
-            LOG.info("reserveInputSplit: Couldn't reserve " +
-                "(already reserved) inputSplit" +
-                " at " + tmpInputSplitReservedPath);
-          } catch (KeeperException e) {
-            throw new IllegalStateException(
-                "reserveInputSplit: KeeperException on reserve", e);
-          } catch (InterruptedException e) {
-            throw new IllegalStateException(
-                "reserveInputSplit: InterruptedException " +
-                    "on reserve", e);
-          }
-        } else {
-          ++reservedInputSplits;
-        }
-      }
-      if (LOG.isInfoEnabled()) {
-        LOG.info("reserveInputSplit: reservedPath = " +
-            reservedInputSplitPath + ", " + reservedInputSplits +
-            " of " + splitOrganizer.getPathListSize() +
-            " InputSplits are finished.");
-      }
-      if (reservedInputSplits == splitOrganizer.getPathListSize()) {
-        return null;
-      }
-      context.progress();
-      // Wait for either a reservation to go away or a notification that
-      // an InputSplit has finished.
-      context.progress();
-      inputSplitEvents.getStateChanged().waitMsecs(
-          60 * 1000);
-      inputSplitEvents.getStateChanged().reset();
-    }
-  }
-
-  /**
-   * Mark an input split path as completed by this worker.  This notifies
-   * the master and the other workers that this input split has not only
-   * been reserved, but also marked processed.
-   *
-   * @param inputSplitPath Path to the input split.
-   */
-  private void markInputSplitPathFinished(String inputSplitPath) {
-    String inputSplitFinishedPath =
-        inputSplitPath + inputSplitFinishedNode;
-    try {
-      zooKeeperExt.createExt(inputSplitFinishedPath,
-          null,
-          ZooDefs.Ids.OPEN_ACL_UNSAFE,
-          CreateMode.PERSISTENT,
-          true);
-    } catch (KeeperException.NodeExistsException e) {
-      LOG.warn("markInputSplitPathFinished: " + inputSplitFinishedPath +
-          " already exists!");
-    } catch (KeeperException e) {
-      throw new IllegalStateException(
-          "markInputSplitPathFinished: KeeperException on " +
-              inputSplitFinishedPath, e);
-    } catch (InterruptedException e) {
-      throw new IllegalStateException(
-          "markInputSplitPathFinished: InterruptedException on " +
-              inputSplitFinishedPath, e);
-    }
-  }
-
-  /**
    * Extract vertices from input split, saving them into a mini cache of
    * partitions.  Periodically flush the cache of vertices when a limit is
    * reached in readVerticeFromInputSplit.
@@ -354,7 +203,7 @@ public abstract class InputSplitsCallable<I extends 
WritableComparable,
       LOG.info("loadFromInputSplit: Finished loading " +
           inputSplitPath + " " + vertexEdgeCount);
     }
-    markInputSplitPathFinished(inputSplitPath);
+    splitsHandler.markInputSplitPathFinished(inputSplitPath);
     return vertexEdgeCount;
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/1c967b8e/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsHandler.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsHandler.java 
b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsHandler.java
new file mode 100644
index 0000000..f7d11a3
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsHandler.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.Stat;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Stores the list of input split paths, and provides thread-safe way for
+ * reserving input splits.
+ */
+public class InputSplitsHandler implements Watcher  {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(InputSplitsHandler.class);
+
+  /** The List of InputSplit znode paths */
+  private final List<String> pathList;
+  /** Current position in the path list */
+  private final AtomicInteger currentIndex;
+  /** The worker's local ZooKeeperExt ref */
+  private final ZooKeeperExt zooKeeper;
+  /** Context for reporting progress */
+  private final Mapper<?, ?, ?, ?>.Context context;
+  /** ZooKeeper input split reserved node. */
+  private final String inputSplitReservedNode;
+  /** ZooKeeper input split finished node. */
+  private final String inputSplitFinishedNode;
+
+  /**
+   * Constructor
+   *
+   * @param splitOrganizer Input splits organizer
+   * @param zooKeeper The worker's local ZooKeeperExt ref
+   * @param context Context for reporting progress
+   * @param inputSplitReservedNode ZooKeeper input split reserved node
+   * @param inputSplitFinishedNode ZooKeeper input split finished node
+   */
+  public InputSplitsHandler(InputSplitPathOrganizer splitOrganizer,
+      ZooKeeperExt zooKeeper, Mapper<?, ?, ?, ?>.Context context,
+      String inputSplitReservedNode, String inputSplitFinishedNode) {
+    this.pathList = Lists.newArrayList(splitOrganizer.getPathList());
+    this.currentIndex = new AtomicInteger(0);
+    this.zooKeeper = zooKeeper;
+    this.context = context;
+    this.inputSplitReservedNode = inputSplitReservedNode;
+    this.inputSplitFinishedNode = inputSplitFinishedNode;
+  }
+
+
+  /**
+   * Try to reserve an InputSplit for loading.  While InputSplits exists that
+   * are not finished, wait until they are.
+   *
+   * NOTE: iterations on the InputSplit list only halt for each worker when it
+   * has scanned the entire list once and found every split marked RESERVED.
+   * When a worker fails, its Ephemeral RESERVED znodes will disappear,
+   * allowing other iterating workers to claim it's previously read splits.
+   * Only when the last worker left iterating on the list fails can a danger
+   * of data loss occur. Since worker failure in INPUT_SUPERSTEP currently
+   * causes job failure, this is OK. As the failure model evolves, this
+   * behavior might need to change. We could add watches on
+   * inputSplitFinishedNodes and stop iterating only when all these nodes
+   * have been created.
+   *
+   * @return reserved InputSplit or null if no unfinished InputSplits exist
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public String reserveInputSplit() throws KeeperException,
+      InterruptedException {
+    String reservedInputSplitPath;
+    Stat reservedStat;
+    while (true) {
+      int splitToTry = currentIndex.getAndIncrement();
+      if (splitToTry >= pathList.size()) {
+        return null;
+      }
+      String nextSplitToClaim = pathList.get(splitToTry);
+      context.progress();
+      String tmpInputSplitReservedPath =
+          nextSplitToClaim + inputSplitReservedNode;
+      reservedStat =
+          zooKeeper.exists(tmpInputSplitReservedPath, this);
+      if (reservedStat == null) {
+        try {
+          // Attempt to reserve this InputSplit
+          zooKeeper.createExt(tmpInputSplitReservedPath,
+              null,
+              ZooDefs.Ids.OPEN_ACL_UNSAFE,
+              CreateMode.EPHEMERAL,
+              false);
+          reservedInputSplitPath = nextSplitToClaim;
+          if (LOG.isInfoEnabled()) {
+            float percentFinished =
+                splitToTry * 100.0f / pathList.size();
+            LOG.info("reserveInputSplit: Reserved input " +
+                "split path " + reservedInputSplitPath +
+                ", overall roughly " +
+                +percentFinished +
+                "% input splits reserved");
+          }
+          return reservedInputSplitPath;
+        } catch (KeeperException.NodeExistsException e) {
+          LOG.info("reserveInputSplit: Couldn't reserve " +
+              "(already reserved) inputSplit" +
+              " at " + tmpInputSplitReservedPath);
+        } catch (KeeperException e) {
+          throw new IllegalStateException(
+              "reserveInputSplit: KeeperException on reserve", e);
+        } catch (InterruptedException e) {
+          throw new IllegalStateException(
+              "reserveInputSplit: InterruptedException " +
+                  "on reserve", e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Mark an input split path as completed by this worker.  This notifies
+   * the master and the other workers that this input split has not only
+   * been reserved, but also marked processed.
+   *
+   * @param inputSplitPath Path to the input split.
+   */
+  public void markInputSplitPathFinished(String inputSplitPath) {
+    String inputSplitFinishedPath =
+        inputSplitPath + inputSplitFinishedNode;
+    try {
+      zooKeeper.createExt(inputSplitFinishedPath,
+          null,
+          ZooDefs.Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT,
+          true);
+    } catch (KeeperException.NodeExistsException e) {
+      LOG.warn("markInputSplitPathFinished: " + inputSplitFinishedPath +
+          " already exists!");
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "markInputSplitPathFinished: KeeperException on " +
+              inputSplitFinishedPath, e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "markInputSplitPathFinished: InterruptedException on " +
+              inputSplitFinishedPath, e);
+    }
+  }
+
+  @Override
+  public void process(WatchedEvent event) {
+    if (event.getPath() == null) {
+      LOG.warn("process: Problem with zookeeper, got event with path null, " +
+          "state " + event.getState() + ", event type " + event.getType());
+      return;
+    }
+    // Check if the reservation for the input split was lost
+    // (some worker died)
+    if (event.getPath().endsWith(inputSplitReservedNode) &&
+        event.getType() == Watcher.Event.EventType.NodeDeleted) {
+      synchronized (pathList) {
+        String split = event.getPath();
+        split = split.substring(0, split.indexOf(inputSplitReservedNode));
+        pathList.add(split);
+        if (LOG.isInfoEnabled()) {
+          LOG.info("process: Input split " + split + " lost reservation");
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1c967b8e/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
 
b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
index a4f98e1..a192aeb 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
@@ -20,7 +20,6 @@ package org.apache.giraph.worker;
 
 import com.yammer.metrics.core.Counter;
 import java.io.IOException;
-import java.util.List;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.VertexEdgeCount;
@@ -78,9 +77,7 @@ public class VertexInputSplitsCallable<I extends 
WritableComparable,
    * @param graphState Graph state
    * @param configuration Configuration
    * @param bspServiceWorker service worker
-   * @param inputSplitPathList List of the paths of the input splits
-   * @param workerInfo This worker's info
-   * @param threadId Id of input split thread
+   * @param splitsHandler Handler for input splits
    * @param zooKeeperExt Handle to ZooKeeperExt
    */
   public VertexInputSplitsCallable(
@@ -88,15 +85,10 @@ public class VertexInputSplitsCallable<I extends 
WritableComparable,
       GraphState<I, V, E, M> graphState,
       ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
       BspServiceWorker<I, V, E, M> bspServiceWorker,
-      List<String> inputSplitPathList,
-      WorkerInfo workerInfo,
-      int threadId,
+      InputSplitsHandler splitsHandler,
       ZooKeeperExt zooKeeperExt)  {
     super(context, graphState, configuration, bspServiceWorker,
-        inputSplitPathList, workerInfo, threadId, zooKeeperExt,
-        BspServiceWorker.VERTEX_INPUT_SPLIT_RESERVED_NODE,
-        BspServiceWorker.VERTEX_INPUT_SPLIT_FINISHED_NODE,
-        bspServiceWorker.getVertexInputSplitsEvents());
+        splitsHandler, zooKeeperExt);
 
     inputSplitMaxVertices = configuration.getInputSplitMaxVertices();
     this.bspServiceWorker = bspServiceWorker;

http://git-wip-us.apache.org/repos/asf/giraph/blob/1c967b8e/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
 
b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
index 0d617dc..aebca81 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
@@ -25,8 +25,6 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 
-import java.util.List;
-
 /**
  * Factory for {@link VertexInputSplitsCallable}s.
  *
@@ -46,10 +44,8 @@ public class VertexInputSplitsCallableFactory<I extends 
WritableComparable,
   private final ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
   /** {@link BspServiceWorker} we're running on. */
   private final BspServiceWorker<I, V, E, M> bspServiceWorker;
-  /** List of input split paths. */
-  private final List<String> inputSplitPathList;
-  /** Worker info. */
-  private final WorkerInfo workerInfo;
+  /** Handler for input splits */
+  private final InputSplitsHandler splitsHandler;
   /** {@link ZooKeeperExt} for this worker. */
   private final ZooKeeperExt zooKeeperExt;
 
@@ -60,8 +56,7 @@ public class VertexInputSplitsCallableFactory<I extends 
WritableComparable,
    * @param graphState Graph state
    * @param configuration Configuration
    * @param bspServiceWorker Calling {@link BspServiceWorker}
-   * @param inputSplitPathList List of input split paths
-   * @param workerInfo Worker info
+   * @param splitsHandler Handler for input splits
    * @param zooKeeperExt {@link ZooKeeperExt} for this worker
    */
   public VertexInputSplitsCallableFactory(
@@ -69,16 +64,14 @@ public class VertexInputSplitsCallableFactory<I extends 
WritableComparable,
       GraphState<I, V, E, M> graphState,
       ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
       BspServiceWorker<I, V, E, M> bspServiceWorker,
-      List<String> inputSplitPathList,
-      WorkerInfo workerInfo,
+      InputSplitsHandler splitsHandler,
       ZooKeeperExt zooKeeperExt) {
     this.context = context;
     this.graphState = graphState;
     this.configuration = configuration;
     this.bspServiceWorker = bspServiceWorker;
-    this.inputSplitPathList = inputSplitPathList;
-    this.workerInfo = workerInfo;
     this.zooKeeperExt = zooKeeperExt;
+    this.splitsHandler = splitsHandler;
   }
 
   @Override
@@ -88,9 +81,7 @@ public class VertexInputSplitsCallableFactory<I extends 
WritableComparable,
         graphState,
         configuration,
         bspServiceWorker,
-        inputSplitPathList,
-        workerInfo,
-        threadId,
+        splitsHandler,
         zooKeeperExt);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/1c967b8e/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java 
b/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java
index 987f51c..faf4126 100644
--- a/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java
+++ b/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java
@@ -60,6 +60,7 @@ import org.apache.zookeeper.KeeperException;
 import org.junit.Test;
 
 import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.io.Closeables;
 
@@ -368,11 +369,8 @@ else[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]*/
     when(zk.getData("remote2", false, null)).thenReturn(remote2);
     when(zk.getData("local", false, null)).thenReturn(local);
     InputSplitPathOrganizer lis =
-      new InputSplitPathOrganizer(zk, testListName, localHost, 0, 0, true);
-    final List<String> resultList = new ArrayList<String>();
-    for (String next : lis) {
-      resultList.add(next);
-    }
+      new InputSplitPathOrganizer(zk, testListName, localHost, true);
+    final List<String> resultList = Lists.newArrayList(lis.getPathList());
     assertEquals("local", resultList.get(0));
   }
 

Reply via email to