Author: jghoman
Date: Wed Aug  8 23:00:02 2012
New Revision: 1371009

URL: http://svn.apache.org/viewvc?rev=1371009&view=rev
Log:
GIRAPH-256. Partitioning outgoing graph data during INPUT_SUPERSTEP by # of 
vertices results in wide variance in RPC message sizes.  Contributed by Eli 
Reisman.

Added:
    
giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphTransferRegulator.java
Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
    giraph/trunk/src/test/java/org/apache/giraph/graph/TestEdgeListVertex.java

Modified: giraph/trunk/CHANGELOG
URL: 
http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1371009&r1=1371008&r2=1371009&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Wed Aug  8 23:00:02 2012
@@ -2,6 +2,9 @@ Giraph Change Log
 
 Release 0.2.0 - unreleased
 
+  GIRAPH-256: Partitioning outgoing graph data during INPUT_SUPERSTEP by # of 
+  vertices results in wide variance in RPC message sizes. (Eli Reisman via 
jghoman)
+  
   GIRAPH-290: Add committer information for Alessandro Presta to pom.xml
   (apresta)
 

Modified: 
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1371009&r1=1371008&r2=1371009&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java 
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java 
Wed Aug  8 23:00:02 2012
@@ -108,8 +108,11 @@ public class BspServiceWorker<I extends 
       new HashMap<Integer, Partition<I, V, E, M>>();
   /** Have the partition exchange children (workers) changed? */
   private final BspEvent partitionExchangeChildrenChanged;
-  /** Max vertices per partition before sending */
-  private final int maxVerticesPerPartition;
+  /** Regulates the size of outgoing Collections of vertices read
+   * by the local worker during INPUT_SUPERSTEP that are to be
+   * transfered from <code>inputSplitCache</code> to the owner
+   * of their initial, master-assigned Partition.*/
+  private GiraphTransferRegulator transferRegulator;
   /** Worker Context */
   private final WorkerContext workerContext;
   /** Total vertices loaded */
@@ -141,10 +144,8 @@ public class BspServiceWorker<I extends 
     super(serverPortList, sessionMsecTimeout, context, graphMapper);
     partitionExchangeChildrenChanged = new PredicateLock(context);
     registerBspEvent(partitionExchangeChildrenChanged);
-    maxVerticesPerPartition =
-        getConfiguration().getInt(
-            GiraphJob.MAX_VERTICES_PER_PARTITION,
-            GiraphJob.MAX_VERTICES_PER_PARTITION_DEFAULT);
+    transferRegulator =
+        new GiraphTransferRegulator(getConfiguration());
     inputSplitMaxVertices =
         getConfiguration().getLong(
             GiraphJob.INPUT_SPLIT_MAX_VERTICES,
@@ -160,8 +161,11 @@ public class BspServiceWorker<I extends 
           new RPCCommunications<I, V, E, M>(context, this, graphState);
     }
     if (LOG.isInfoEnabled()) {
-      LOG.info("BspServiceWorker: maxVerticesPerPartition = " +
-          maxVerticesPerPartition + " useNetty = " + useNetty);
+      LOG.info("BspServiceWorker: maxVerticesPerTransfer = " +
+          transferRegulator.getMaxVerticesPerTransfer());
+      LOG.info("BspServiceWorker: maxEdgesPerTransfer = " +
+          transferRegulator.getMaxEdgesPerTransfer() +
+          " useNetty = " + useNetty);
     }
 
     workerInfo = new WorkerInfo(
@@ -280,6 +284,7 @@ public class BspServiceWorker<I extends 
             " InputSplits are finished.");
       }
       if (finishedInputSplits == inputSplitPathList.size()) {
+        transferRegulator = null; // don't need this anymore
         return null;
       }
       // Wait for either a reservation to go away or a notification that
@@ -445,8 +450,7 @@ public class BspServiceWorker<I extends 
     VertexReader<I, V, E, M> vertexReader =
         vertexInputFormat.createVertexReader(inputSplit, getContext());
     vertexReader.initialize(inputSplit, getContext());
-    long vertexCount = 0;
-    long edgeCount = 0;
+    transferRegulator.clearCounters();
     while (vertexReader.nextVertex()) {
       Vertex<I, V, E, M> readerVertex =
           vertexReader.getCurrentVertex();
@@ -476,17 +480,16 @@ public class BspServiceWorker<I extends 
         LOG.warn("readVertices: Replacing vertex " + oldVertex +
             " with " + readerVertex);
       }
-      if (partition.getVertices().size() >= maxVerticesPerPartition) {
+      getContext().progress(); // do this before potential data transfer
+      transferRegulator.incrementCounters(partitionOwner, readerVertex);
+      if (transferRegulator.transferThisPartition(partitionOwner)) {
         commService.sendPartitionRequest(partitionOwner.getWorkerInfo(),
             partition);
         partition.getVertices().clear();
       }
-      ++vertexCount;
-      edgeCount += readerVertex.getNumEdges();
-      getContext().progress();
-
       ++totalVerticesLoaded;
       totalEdgesLoaded += readerVertex.getNumEdges();
+
       // Update status every half a million vertices
       if ((totalVerticesLoaded % 500000) == 0) {
         String status = "readVerticesFromInputSplit: Loaded " +
@@ -504,19 +507,21 @@ public class BspServiceWorker<I extends 
 
       // For sampling, or to limit outlier input splits, the number of
       // records per input split can be limited
-      if ((inputSplitMaxVertices > 0) &&
-          (vertexCount >= inputSplitMaxVertices)) {
+      if (inputSplitMaxVertices > 0 &&
+        transferRegulator.getTotalVertices() >=
+        inputSplitMaxVertices) {
         if (LOG.isInfoEnabled()) {
           LOG.info("readVerticesFromInputSplit: Leaving the input " +
               "split early, reached maximum vertices " +
-              vertexCount);
+              transferRegulator.getTotalVertices());
         }
         break;
       }
     }
     vertexReader.close();
 
-    return new VertexEdgeCount(vertexCount, edgeCount);
+    return new VertexEdgeCount(transferRegulator.getTotalVertices(),
+      transferRegulator.getTotalEdges());
   }
 
   @Override

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1371009&r1=1371008&r2=1371009&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Wed Aug  
8 23:00:02 2012
@@ -207,15 +207,6 @@ public class GiraphJob {
   /** Default server receive buffer size of 0.5 MB */
   public static final int DEFAULT_SERVER_RECEIVE_BUFFER_SIZE = 512 * 1024;
 
-  /**
-   *  Maximum number of vertices per partition before sending.
-   *  (input superstep only).
-   */
-  public static final String MAX_VERTICES_PER_PARTITION =
-      "giraph.maxVerticesPerPartition";
-  /** Default maximum number of vertices per partition before sending. */
-  public static final int MAX_VERTICES_PER_PARTITION_DEFAULT = 10000;
-
   /** Maximum number of messages per peer before flush */
   public static final String MSG_SIZE = "giraph.msgSize";
   /** Default maximum number of messages per peer before flush */

Added: 
giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphTransferRegulator.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphTransferRegulator.java?rev=1371009&view=auto
==============================================================================
--- 
giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphTransferRegulator.java 
(added)
+++ 
giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphTransferRegulator.java 
Wed Aug  8 23:00:02 2012
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.giraph.graph.partition.PartitionOwner;
+import com.google.common.collect.Maps;
+import java.util.Map;
+
+
+/** Utility class to manage data transfers from
+ * a local worker reading InputSplits.
+ * Currently, this class measures # of vertices and edges
+ * per outgoing Collection of graph data (destined for a
+ * particular Partition and remote worker node, preselected
+ * by the master.)
+ *
+ * TODO: implement defaults and configurable options for
+ * measuring the size of input &lt;V&gt; or &lt;E&gt; data
+ * per read vertex, and setting limits on totals per outgoing
+ * graph data Collection etc. (See GIRAPH-260)
+ */
+public class GiraphTransferRegulator {
+  /** Maximum vertices to read from an InputSplit locally that are
+   * to be routed to a remote worker, before sending them. */
+  public static final String MAX_VERTICES_PER_TRANSFER =
+    "giraph.maxVerticesPerTransfer";
+  /** Default maximum number of vertices per
+   * temp partition before sending. */
+  public static final int MAX_VERTICES_PER_TRANSFER_DEFAULT = 10000;
+  /**
+   * Maximum edges to read from an InputSplit locally that are
+   * to be routed to a remote worker, before sending them.
+   */
+  public static final String MAX_EDGES_PER_TRANSFER =
+    "giraph.maxEdgesPerTransfer";
+  /** Default maximum number of vertices per
+   * temp partition before sending. */
+  public static final int MAX_EDGES_PER_TRANSFER_DEFAULT = 80000;
+
+  /** Internal state to measure when
+   * the next data transfer of a Collection
+   * of vertices read by the local worker that
+   * owns this regulator is ready to be sent
+   * to the remote worker node that the master
+   * has assigned the vertices to */
+  private Map<Integer, Integer> edgeAccumulator;
+
+  /** Internal state to measure when
+   * the next data transfer of a Collection
+   * of vertices read by the local worker that
+   * owns this regulator is ready to be sent
+   * to the remote worker node that the master
+   * has assigned the vertices to */
+  private Map<Integer, Integer> vertexAccumulator;
+
+  /** Number of vertices per data transfer */
+  private final int maxVerticesPerTransfer;
+
+  /** Number of edges per data transfer */
+  private final int maxEdgesPerTransfer;
+
+  /** Vertex count total for this InputSplit */
+  private long totalVertexCount;
+
+  /** Edge count total for this InputSplit */
+  private long totalEdgeCount;
+
+  /** Default constructor
+   * @param conf the Configuration for this job
+   */
+  public GiraphTransferRegulator(Configuration conf) {
+    vertexAccumulator = Maps.<Integer, Integer>newHashMap();
+    edgeAccumulator = Maps.<Integer, Integer>newHashMap();
+    maxVerticesPerTransfer = conf.getInt(
+        MAX_VERTICES_PER_TRANSFER,
+        MAX_VERTICES_PER_TRANSFER_DEFAULT);
+    maxEdgesPerTransfer = conf.getInt(
+        MAX_EDGES_PER_TRANSFER,
+        MAX_EDGES_PER_TRANSFER_DEFAULT);
+    totalEdgeCount = 0;
+    totalVertexCount = 0;
+  }
+
+  /** Is this outbound data Collection full,
+   * and ready to transfer?
+   * @param owner the partition owner for the outbound data
+   * @return 'true' if the temp partition data is ready to transfer
+   */
+  public boolean transferThisPartition(PartitionOwner owner) {
+    final int partitionId = owner.getPartitionId();
+    if (getEdgesForPartition(partitionId) >=
+      maxEdgesPerTransfer ||
+      getVerticesForPartition(partitionId) >=
+      maxVerticesPerTransfer) {
+      vertexAccumulator.put(partitionId, 0);
+      edgeAccumulator.put(partitionId, 0);
+      return true;
+    }
+    return false;
+  }
+
+  /** get current vertex count for a given Collection of
+   * data soon to be transfered to its permanent home.
+   * @param partId the partition id to check the count on.
+   * @return the count of vertices.
+   */
+  private int getVerticesForPartition(final int partId) {
+    return vertexAccumulator.get(partId) == null ?
+      0 : vertexAccumulator.get(partId);
+  }
+
+  /** get current edge count for a given Collection of
+   * data soon to be transfered to its permanent home.
+   * @param partId the partition id to check the count on.
+   * @return the count of edges.
+   */
+  private int getEdgesForPartition(final int partId) {
+    return edgeAccumulator.get(partId) == null ?
+      0 : edgeAccumulator.get(partId);
+  }
+
+  /** Clear storage to reset for reading new InputSplit */
+  public void clearCounters() {
+    totalEdgeCount = 0;
+    totalVertexCount = 0;
+    vertexAccumulator.clear();
+    edgeAccumulator.clear();
+  }
+
+  /** Increment V & E counts for new vertex read, store values
+   * for that outgoing _temporary_ Partition, which shares the
+   * Partition ID for the actual remote Partition the collection
+   * will eventually be processed in.
+   * @param partitionOwner the owner of the Partition this data
+   *  will eventually belong to.
+   * @param vertex the vertex to extract counts from.
+   * @param <I> the vertex id type.
+   * @param <V> the vertex value type.
+   * @param <E> the edge value type.
+   * @param <M> the message value type.
+   */
+  public <I extends WritableComparable, V extends Writable,
+  E extends Writable, M extends Writable> void
+  incrementCounters(PartitionOwner partitionOwner,
+    Vertex<I, V, E, M> vertex) {
+    final int id = partitionOwner.getPartitionId();
+    // vertex counts
+    vertexAccumulator
+      .put(id, getVerticesForPartition(id) + 1);
+    totalVertexCount++;
+    // edge counts
+    totalEdgeCount += vertex.getNumEdges();
+    edgeAccumulator.put(id, getEdgesForPartition(id) +
+      vertex.getNumEdges());
+  }
+
+  /** Getter for MAX edge count to initiate a transfer
+    * @return max edge count per transfer */
+  public long getMaxEdgesPerTransfer() {
+    return maxEdgesPerTransfer;
+  }
+
+  /** Getter for MAX vertex count to initiate a transfer
+   * @return max edge count per transfer */
+  public long getMaxVerticesPerTransfer() {
+    return maxVerticesPerTransfer;
+  }
+
+  /** Getter for total edge count for the current InputSplit
+    * @return the # of total edges counted in this InputSplit */
+  public long getTotalEdges() {
+    return totalEdgeCount;
+  }
+
+  /** Getter for total vetex count for the current InputSplit
+   * @return the total # of vertices counted in this InputSplit */
+  public long getTotalVertices() {
+    return totalVertexCount;
+  }
+}
+

Modified: 
giraph/trunk/src/test/java/org/apache/giraph/graph/TestEdgeListVertex.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/graph/TestEdgeListVertex.java?rev=1371009&r1=1371008&r2=1371009&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/graph/TestEdgeListVertex.java 
(original)
+++ giraph/trunk/src/test/java/org/apache/giraph/graph/TestEdgeListVertex.java 
Wed Aug  8 23:00:02 2012
@@ -18,6 +18,7 @@
 package org.apache.giraph.graph;
 
 import org.apache.giraph.utils.WritableUtils;
+import org.apache.giraph.graph.partition.PartitionOwner;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
@@ -32,10 +33,14 @@ import static org.junit.Assert.assertEqu
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -156,6 +161,26 @@ public class TestEdgeListVertex {
   }
 
   @Test
+  public void testGiraphTransferRegulator() {
+     job.getConfiguration()
+       .setInt(GiraphTransferRegulator.MAX_VERTICES_PER_TRANSFER, 1);
+     job.getConfiguration()
+       .setInt(GiraphTransferRegulator.MAX_EDGES_PER_TRANSFER, 3);
+     Map<IntWritable, DoubleWritable> edgeMap = Maps.newHashMap();
+     edgeMap.put(new IntWritable(2), new DoubleWritable(22));
+     edgeMap.put(new IntWritable(3), new DoubleWritable(33));
+     edgeMap.put(new IntWritable(4), new DoubleWritable(44));
+     vertex.initialize(null, null, edgeMap, null);
+     GiraphTransferRegulator gtr =
+       new GiraphTransferRegulator(job.getConfiguration());
+     PartitionOwner owner = mock(PartitionOwner.class);
+     when(owner.getPartitionId()).thenReturn(57);
+     assertFalse(gtr.transferThisPartition(owner));
+     gtr.incrementCounters(owner, vertex);
+     assertTrue(gtr.transferThisPartition(owner));
+  }
+
+  @Test
   public void testSerialize() {
     Map<IntWritable, DoubleWritable> edgeMap = Maps.newHashMap();
     for (int i = 1000; i > 0; --i) {


Reply via email to