Author: jghoman
Date: Wed Aug 8 23:00:02 2012
New Revision: 1371009
URL: http://svn.apache.org/viewvc?rev=1371009&view=rev
Log:
GIRAPH-256. Partitioning outgoing graph data during INPUT_SUPERSTEP by # of
vertices results in wide variance in RPC message sizes. Contributed by Eli
Reisman.
Added:
giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphTransferRegulator.java
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
giraph/trunk/src/test/java/org/apache/giraph/graph/TestEdgeListVertex.java
Modified: giraph/trunk/CHANGELOG
URL:
http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1371009&r1=1371008&r2=1371009&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Wed Aug 8 23:00:02 2012
@@ -2,6 +2,9 @@ Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-256: Partitioning outgoing graph data during INPUT_SUPERSTEP by # of
+ vertices results in wide variance in RPC message sizes. (Eli Reisman via
jghoman)
+
GIRAPH-290: Add committer information for Alessandro Presta to pom.xml
(apresta)
Modified:
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1371009&r1=1371008&r2=1371009&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
Wed Aug 8 23:00:02 2012
@@ -108,8 +108,11 @@ public class BspServiceWorker<I extends
new HashMap<Integer, Partition<I, V, E, M>>();
/** Have the partition exchange children (workers) changed? */
private final BspEvent partitionExchangeChildrenChanged;
- /** Max vertices per partition before sending */
- private final int maxVerticesPerPartition;
+ /** Regulates the size of outgoing Collections of vertices read
+ * by the local worker during INPUT_SUPERSTEP that are to be
+ * transfered from <code>inputSplitCache</code> to the owner
+ * of their initial, master-assigned Partition.*/
+ private GiraphTransferRegulator transferRegulator;
/** Worker Context */
private final WorkerContext workerContext;
/** Total vertices loaded */
@@ -141,10 +144,8 @@ public class BspServiceWorker<I extends
super(serverPortList, sessionMsecTimeout, context, graphMapper);
partitionExchangeChildrenChanged = new PredicateLock(context);
registerBspEvent(partitionExchangeChildrenChanged);
- maxVerticesPerPartition =
- getConfiguration().getInt(
- GiraphJob.MAX_VERTICES_PER_PARTITION,
- GiraphJob.MAX_VERTICES_PER_PARTITION_DEFAULT);
+ transferRegulator =
+ new GiraphTransferRegulator(getConfiguration());
inputSplitMaxVertices =
getConfiguration().getLong(
GiraphJob.INPUT_SPLIT_MAX_VERTICES,
@@ -160,8 +161,11 @@ public class BspServiceWorker<I extends
new RPCCommunications<I, V, E, M>(context, this, graphState);
}
if (LOG.isInfoEnabled()) {
- LOG.info("BspServiceWorker: maxVerticesPerPartition = " +
- maxVerticesPerPartition + " useNetty = " + useNetty);
+ LOG.info("BspServiceWorker: maxVerticesPerTransfer = " +
+ transferRegulator.getMaxVerticesPerTransfer());
+ LOG.info("BspServiceWorker: maxEdgesPerTransfer = " +
+ transferRegulator.getMaxEdgesPerTransfer() +
+ " useNetty = " + useNetty);
}
workerInfo = new WorkerInfo(
@@ -280,6 +284,7 @@ public class BspServiceWorker<I extends
" InputSplits are finished.");
}
if (finishedInputSplits == inputSplitPathList.size()) {
+ transferRegulator = null; // don't need this anymore
return null;
}
// Wait for either a reservation to go away or a notification that
@@ -445,8 +450,7 @@ public class BspServiceWorker<I extends
VertexReader<I, V, E, M> vertexReader =
vertexInputFormat.createVertexReader(inputSplit, getContext());
vertexReader.initialize(inputSplit, getContext());
- long vertexCount = 0;
- long edgeCount = 0;
+ transferRegulator.clearCounters();
while (vertexReader.nextVertex()) {
Vertex<I, V, E, M> readerVertex =
vertexReader.getCurrentVertex();
@@ -476,17 +480,16 @@ public class BspServiceWorker<I extends
LOG.warn("readVertices: Replacing vertex " + oldVertex +
" with " + readerVertex);
}
- if (partition.getVertices().size() >= maxVerticesPerPartition) {
+ getContext().progress(); // do this before potential data transfer
+ transferRegulator.incrementCounters(partitionOwner, readerVertex);
+ if (transferRegulator.transferThisPartition(partitionOwner)) {
commService.sendPartitionRequest(partitionOwner.getWorkerInfo(),
partition);
partition.getVertices().clear();
}
- ++vertexCount;
- edgeCount += readerVertex.getNumEdges();
- getContext().progress();
-
++totalVerticesLoaded;
totalEdgesLoaded += readerVertex.getNumEdges();
+
// Update status every half a million vertices
if ((totalVerticesLoaded % 500000) == 0) {
String status = "readVerticesFromInputSplit: Loaded " +
@@ -504,19 +507,21 @@ public class BspServiceWorker<I extends
// For sampling, or to limit outlier input splits, the number of
// records per input split can be limited
- if ((inputSplitMaxVertices > 0) &&
- (vertexCount >= inputSplitMaxVertices)) {
+ if (inputSplitMaxVertices > 0 &&
+ transferRegulator.getTotalVertices() >=
+ inputSplitMaxVertices) {
if (LOG.isInfoEnabled()) {
LOG.info("readVerticesFromInputSplit: Leaving the input " +
"split early, reached maximum vertices " +
- vertexCount);
+ transferRegulator.getTotalVertices());
}
break;
}
}
vertexReader.close();
- return new VertexEdgeCount(vertexCount, edgeCount);
+ return new VertexEdgeCount(transferRegulator.getTotalVertices(),
+ transferRegulator.getTotalEdges());
}
@Override
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1371009&r1=1371008&r2=1371009&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Wed Aug
8 23:00:02 2012
@@ -207,15 +207,6 @@ public class GiraphJob {
/** Default server receive buffer size of 0.5 MB */
public static final int DEFAULT_SERVER_RECEIVE_BUFFER_SIZE = 512 * 1024;
- /**
- * Maximum number of vertices per partition before sending.
- * (input superstep only).
- */
- public static final String MAX_VERTICES_PER_PARTITION =
- "giraph.maxVerticesPerPartition";
- /** Default maximum number of vertices per partition before sending. */
- public static final int MAX_VERTICES_PER_PARTITION_DEFAULT = 10000;
-
/** Maximum number of messages per peer before flush */
public static final String MSG_SIZE = "giraph.msgSize";
/** Default maximum number of messages per peer before flush */
Added:
giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphTransferRegulator.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphTransferRegulator.java?rev=1371009&view=auto
==============================================================================
---
giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphTransferRegulator.java
(added)
+++
giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphTransferRegulator.java
Wed Aug 8 23:00:02 2012
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.giraph.graph.partition.PartitionOwner;
+import com.google.common.collect.Maps;
+import java.util.Map;
+
+
+/** Utility class to manage data transfers from
+ * a local worker reading InputSplits.
+ * Currently, this class measures # of vertices and edges
+ * per outgoing Collection of graph data (destined for a
+ * particular Partition and remote worker node, preselected
+ * by the master.)
+ *
+ * TODO: implement defaults and configurable options for
+ * measuring the size of input <V> or <E> data
+ * per read vertex, and setting limits on totals per outgoing
+ * graph data Collection etc. (See GIRAPH-260)
+ */
+public class GiraphTransferRegulator {
+ /** Maximum vertices to read from an InputSplit locally that are
+ * to be routed to a remote worker, before sending them. */
+ public static final String MAX_VERTICES_PER_TRANSFER =
+ "giraph.maxVerticesPerTransfer";
+ /** Default maximum number of vertices per
+ * temp partition before sending. */
+ public static final int MAX_VERTICES_PER_TRANSFER_DEFAULT = 10000;
+ /**
+ * Maximum edges to read from an InputSplit locally that are
+ * to be routed to a remote worker, before sending them.
+ */
+ public static final String MAX_EDGES_PER_TRANSFER =
+ "giraph.maxEdgesPerTransfer";
+ /** Default maximum number of vertices per
+ * temp partition before sending. */
+ public static final int MAX_EDGES_PER_TRANSFER_DEFAULT = 80000;
+
+ /** Internal state to measure when
+ * the next data transfer of a Collection
+ * of vertices read by the local worker that
+ * owns this regulator is ready to be sent
+ * to the remote worker node that the master
+ * has assigned the vertices to */
+ private Map<Integer, Integer> edgeAccumulator;
+
+ /** Internal state to measure when
+ * the next data transfer of a Collection
+ * of vertices read by the local worker that
+ * owns this regulator is ready to be sent
+ * to the remote worker node that the master
+ * has assigned the vertices to */
+ private Map<Integer, Integer> vertexAccumulator;
+
+ /** Number of vertices per data transfer */
+ private final int maxVerticesPerTransfer;
+
+ /** Number of edges per data transfer */
+ private final int maxEdgesPerTransfer;
+
+ /** Vertex count total for this InputSplit */
+ private long totalVertexCount;
+
+ /** Edge count total for this InputSplit */
+ private long totalEdgeCount;
+
+ /** Default constructor
+ * @param conf the Configuration for this job
+ */
+ public GiraphTransferRegulator(Configuration conf) {
+ vertexAccumulator = Maps.<Integer, Integer>newHashMap();
+ edgeAccumulator = Maps.<Integer, Integer>newHashMap();
+ maxVerticesPerTransfer = conf.getInt(
+ MAX_VERTICES_PER_TRANSFER,
+ MAX_VERTICES_PER_TRANSFER_DEFAULT);
+ maxEdgesPerTransfer = conf.getInt(
+ MAX_EDGES_PER_TRANSFER,
+ MAX_EDGES_PER_TRANSFER_DEFAULT);
+ totalEdgeCount = 0;
+ totalVertexCount = 0;
+ }
+
+ /** Is this outbound data Collection full,
+ * and ready to transfer?
+ * @param owner the partition owner for the outbound data
+ * @return 'true' if the temp partition data is ready to transfer
+ */
+ public boolean transferThisPartition(PartitionOwner owner) {
+ final int partitionId = owner.getPartitionId();
+ if (getEdgesForPartition(partitionId) >=
+ maxEdgesPerTransfer ||
+ getVerticesForPartition(partitionId) >=
+ maxVerticesPerTransfer) {
+ vertexAccumulator.put(partitionId, 0);
+ edgeAccumulator.put(partitionId, 0);
+ return true;
+ }
+ return false;
+ }
+
+ /** get current vertex count for a given Collection of
+ * data soon to be transfered to its permanent home.
+ * @param partId the partition id to check the count on.
+ * @return the count of vertices.
+ */
+ private int getVerticesForPartition(final int partId) {
+ return vertexAccumulator.get(partId) == null ?
+ 0 : vertexAccumulator.get(partId);
+ }
+
+ /** get current edge count for a given Collection of
+ * data soon to be transfered to its permanent home.
+ * @param partId the partition id to check the count on.
+ * @return the count of edges.
+ */
+ private int getEdgesForPartition(final int partId) {
+ return edgeAccumulator.get(partId) == null ?
+ 0 : edgeAccumulator.get(partId);
+ }
+
+ /** Clear storage to reset for reading new InputSplit */
+ public void clearCounters() {
+ totalEdgeCount = 0;
+ totalVertexCount = 0;
+ vertexAccumulator.clear();
+ edgeAccumulator.clear();
+ }
+
+ /** Increment V & E counts for new vertex read, store values
+ * for that outgoing _temporary_ Partition, which shares the
+ * Partition ID for the actual remote Partition the collection
+ * will eventually be processed in.
+ * @param partitionOwner the owner of the Partition this data
+ * will eventually belong to.
+ * @param vertex the vertex to extract counts from.
+ * @param <I> the vertex id type.
+ * @param <V> the vertex value type.
+ * @param <E> the edge value type.
+ * @param <M> the message value type.
+ */
+ public <I extends WritableComparable, V extends Writable,
+ E extends Writable, M extends Writable> void
+ incrementCounters(PartitionOwner partitionOwner,
+ Vertex<I, V, E, M> vertex) {
+ final int id = partitionOwner.getPartitionId();
+ // vertex counts
+ vertexAccumulator
+ .put(id, getVerticesForPartition(id) + 1);
+ totalVertexCount++;
+ // edge counts
+ totalEdgeCount += vertex.getNumEdges();
+ edgeAccumulator.put(id, getEdgesForPartition(id) +
+ vertex.getNumEdges());
+ }
+
+ /** Getter for MAX edge count to initiate a transfer
+ * @return max edge count per transfer */
+ public long getMaxEdgesPerTransfer() {
+ return maxEdgesPerTransfer;
+ }
+
+ /** Getter for MAX vertex count to initiate a transfer
+ * @return max edge count per transfer */
+ public long getMaxVerticesPerTransfer() {
+ return maxVerticesPerTransfer;
+ }
+
+ /** Getter for total edge count for the current InputSplit
+ * @return the # of total edges counted in this InputSplit */
+ public long getTotalEdges() {
+ return totalEdgeCount;
+ }
+
+ /** Getter for total vetex count for the current InputSplit
+ * @return the total # of vertices counted in this InputSplit */
+ public long getTotalVertices() {
+ return totalVertexCount;
+ }
+}
+
Modified:
giraph/trunk/src/test/java/org/apache/giraph/graph/TestEdgeListVertex.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/graph/TestEdgeListVertex.java?rev=1371009&r1=1371008&r2=1371009&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/graph/TestEdgeListVertex.java
(original)
+++ giraph/trunk/src/test/java/org/apache/giraph/graph/TestEdgeListVertex.java
Wed Aug 8 23:00:02 2012
@@ -18,6 +18,7 @@
package org.apache.giraph.graph;
import org.apache.giraph.utils.WritableUtils;
+import org.apache.giraph.graph.partition.PartitionOwner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
@@ -32,10 +33,14 @@ import static org.junit.Assert.assertEqu
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -156,6 +161,26 @@ public class TestEdgeListVertex {
}
@Test
+ public void testGiraphTransferRegulator() {
+ job.getConfiguration()
+ .setInt(GiraphTransferRegulator.MAX_VERTICES_PER_TRANSFER, 1);
+ job.getConfiguration()
+ .setInt(GiraphTransferRegulator.MAX_EDGES_PER_TRANSFER, 3);
+ Map<IntWritable, DoubleWritable> edgeMap = Maps.newHashMap();
+ edgeMap.put(new IntWritable(2), new DoubleWritable(22));
+ edgeMap.put(new IntWritable(3), new DoubleWritable(33));
+ edgeMap.put(new IntWritable(4), new DoubleWritable(44));
+ vertex.initialize(null, null, edgeMap, null);
+ GiraphTransferRegulator gtr =
+ new GiraphTransferRegulator(job.getConfiguration());
+ PartitionOwner owner = mock(PartitionOwner.class);
+ when(owner.getPartitionId()).thenReturn(57);
+ assertFalse(gtr.transferThisPartition(owner));
+ gtr.incrementCounters(owner, vertex);
+ assertTrue(gtr.transferThisPartition(owner));
+ }
+
+ @Test
public void testSerialize() {
Map<IntWritable, DoubleWritable> edgeMap = Maps.newHashMap();
for (int i = 1000; i > 0; --i) {