Updated Branches:
  refs/heads/trunk 4bc133632 -> bda0f8b95

GIRAPH-488: ArrayOutOfBoundsException in 
org.apache.giraph.worker.InputSplitPathOrganizer (ereisman)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/bda0f8b9
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/bda0f8b9
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/bda0f8b9

Branch: refs/heads/trunk
Commit: bda0f8b95539ff5f84b72857c80eeebb5037acd6
Parents: 4bc1336
Author: Eli Reisman <[email protected]>
Authored: Wed Jan 23 17:22:56 2013 -0800
Committer: Eli Reisman <[email protected]>
Committed: Wed Jan 23 17:22:56 2013 -0800

----------------------------------------------------------------------
 CHANGELOG                                          |    2 ++
 .../giraph/worker/InputSplitPathOrganizer.java     |   15 ++++++++++-----
 2 files changed, 12 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/bda0f8b9/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 8888c32..71c21cd 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-488: ArrayOutOfBoundsException in 
org.apache.giraph.worker.InputSplitPathOrganizer (ereisman)
+
   GIRAPH-418: Create maven profile for CDH 4.1.2 (ekoontz)
 
   GIRAPH-487: VertexInputPath in GiraphRunner refers to EdgeInputPath (taguan 
via apresta)

http://git-wip-us.apache.org/repos/asf/giraph/blob/bda0f8b9/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
 
b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
index f5b054d..bfaefd2 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
@@ -88,9 +88,10 @@ public class InputSplitPathOrganizer implements 
Iterable<String> {
     this.zooKeeper = zooKeeper;
     this.pathList = Lists.newArrayList(inputSplitPathList);
     this.hostName = hostName;
-    this.baseOffset = computeBaseOffset(port, threadId);
     if (useLocality) {
-      prioritizeLocalInputSplits();
+      prioritizeLocalInputSplits(port, threadId);
+    } else {
+      this.baseOffset = computeBaseOffset(port, threadId);
     }
   }
 
@@ -102,7 +103,7 @@ public class InputSplitPathOrganizer implements 
Iterable<String> {
    * @param threadId id of the input split thread
    * @return the offset to start iterating from
    */
-  private int computeBaseOffset(int port, int threadId) {
+  private int computeBaseOffset(final int port, final int threadId) {
     return pathList.isEmpty() ? 0 :
         Math.abs(Objects.hashCode(hostName, port, threadId) % pathList.size());
   }
@@ -113,9 +114,10 @@ public class InputSplitPathOrganizer implements 
Iterable<String> {
   * a split to read. This will increase locality of data reads with greater
   * probability as the % of total nodes in the cluster hosting data and workers
   * BOTH increase towards 100%. Replication increases our chances of a "hit."
-  *
+  * @param port the port number to hash against
+  * @param threadId the threadId to hash against
   */
-  private void prioritizeLocalInputSplits() {
+  private void prioritizeLocalInputSplits(final int port, final int threadId) {
     List<String> sortedList = new ArrayList<String>();
     String hosts;
     for (Iterator<String> iterator = pathList.iterator(); iterator.hasNext();) 
{
@@ -136,6 +138,9 @@ public class InputSplitPathOrganizer implements 
Iterable<String> {
     }
     // shuffle the local blocks in case several workers exist on this host
     Collections.shuffle(sortedList);
+    // set the base offset for the split iterator based on the insertion
+    // point of the local list items back into the nonlocal split list.
+    baseOffset = computeBaseOffset(port, threadId);
     // re-insert local paths at "adjusted index zero" for caller to iterate on
     pathList.addAll(baseOffset, sortedList);
   }

Reply via email to