Author: maja
Date: Mon Dec 10 18:15:46 2012
New Revision: 1419623
URL: http://svn.apache.org/viewvc?rev=1419623&view=rev
Log:
GIRAPH-445: Max message request size in bytes, initialize buffers to expected
size
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
Modified: giraph/trunk/CHANGELOG
URL:
http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1419623&r1=1419622&r2=1419623&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Mon Dec 10 18:15:46 2012
@@ -1,6 +1,9 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-445: Max message request size in bytes, initialize buffers to
+ expected size (majakabiljo)
+
GIRAPH-444: Cleanup VertexResolver (nitay)
GIRAPH-446: Add a proper timeout for waiting for workers to join a
Modified:
giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java?rev=1419623&r1=1419622&r2=1419623&view=diff
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
(original)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
Mon Dec 10 18:15:46 2012
@@ -385,10 +385,10 @@ public class GiraphConfiguration extends
/** Default server receive buffer size of 0.5 MB */
public static final int DEFAULT_SERVER_RECEIVE_BUFFER_SIZE = 512 * 1024;
- /** Maximum number of messages per peer before flush */
- public static final String MSG_SIZE = "giraph.msgSize";
- /** Default maximum number of messages per peer before flush */
- public static final int MSG_SIZE_DEFAULT = 2000;
+ /** Maximum size of messages (in bytes) per peer before flush */
+ public static final String MAX_MSG_REQUEST_SIZE = "giraph.msgRequestSize";
+ /** Default maximum size of messages per peer before flush of 0.5MB */
+ public static final int MAX_MSG_REQUEST_SIZE_DEFAULT = 512 * 1024;
/** Maximum number of mutations per partition before flush */
public static final String MAX_MUTATIONS_PER_REQUEST =
@@ -396,12 +396,6 @@ public class GiraphConfiguration extends
/** Default maximum number of mutations per partition before flush */
public static final int MAX_MUTATIONS_PER_REQUEST_DEFAULT = 100;
- /** Maximum number of messages that can be bulk sent during a flush */
- public static final String MAX_MESSAGES_PER_FLUSH_PUT =
- "giraph.maxMessagesPerFlushPut";
- /** Default number of messages that can be bulk sent during a flush */
- public static final int DEFAULT_MAX_MESSAGES_PER_FLUSH_PUT = 2000;
-
/**
* Use message size encoding (typically better for complex objects,
* not meant for primitive wrapped messages)
Modified:
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java?rev=1419623&r1=1419622&r2=1419623&view=diff
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java
(original)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java
Mon Dec 10 18:15:46 2012
@@ -22,6 +22,8 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
+
+import org.apache.giraph.GiraphConfiguration;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.graph.WorkerInfo;
@@ -41,10 +43,27 @@ import org.apache.hadoop.io.WritableComp
@SuppressWarnings("rawtypes")
public class SendMessageCache<I extends WritableComparable,
M extends Writable> {
+ /**
+ * How much bigger than the average per partition size to make initial per
+ * partition buffers.
+ * If this value is A, message request size is M,
+ * and a worker has P partitions, than its initial partition buffer size
+ * will be (M / P) * (1 + A).
+ */
+ public static final String ADDITIONAL_MSG_REQUEST_SIZE =
+ "giraph.additionalMsgRequestSize";
+ /**
+ * Default factor for how bigger should initial per partition buffers be
+ * of 20%.
+ */
+ public static final float ADDITIONAL_MSG_REQUEST_SIZE_DEFAULT = 0.2f;
+
/** Internal cache */
private final ByteArrayVertexIdMessages<I, M>[] messageCache;
- /** Number of messages in each partition */
- private final int[] messageCounts;
+ /** Size of messages (in bytes) for each worker */
+ private final int[] messageSizes;
+ /** How big to initially make output streams for each worker's partitions */
+ private final int[] initialBufferSizes;
/** List of partition ids belonging to a worker */
private final Map<WorkerInfo, List<Integer>> workerPartitions =
Maps.newHashMap();
@@ -79,7 +98,19 @@ public class SendMessageCache<I extends
for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
maxWorker = Math.max(maxWorker, workerInfo.getTaskId());
}
- messageCounts = new int[maxWorker + 1];
+ messageSizes = new int[maxWorker + 1];
+
+ float additionalRequestSize =
+ conf.getFloat(ADDITIONAL_MSG_REQUEST_SIZE,
+ ADDITIONAL_MSG_REQUEST_SIZE_DEFAULT);
+ int requestSize = conf.getInt(GiraphConfiguration.MAX_MSG_REQUEST_SIZE,
+ GiraphConfiguration.MAX_MSG_REQUEST_SIZE_DEFAULT);
+ int initialRequestSize = (int) (requestSize * (1 + additionalRequestSize));
+ initialBufferSizes = new int[maxWorker + 1];
+ for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
+ initialBufferSizes[workerInfo.getTaskId()] =
+ initialRequestSize / workerPartitions.get(workerInfo).size();
+ }
}
/**
@@ -90,24 +121,28 @@ public class SendMessageCache<I extends
* @param destVertexId vertex id that is ultimate destination
* @param message Message to be send to remote
* <b>host => partition => vertex</b>
- * @return Number of messages in the partition.
+ * @return Size of messages for the worker.
*/
public int addMessage(WorkerInfo workerInfo,
final int partitionId, I destVertexId, M message) {
// Get the message collection
ByteArrayVertexIdMessages<I, M> partitionMessages =
messageCache[partitionId];
+ int originalSize = 0;
if (partitionMessages == null) {
partitionMessages = new ByteArrayVertexIdMessages<I, M>();
partitionMessages.setConf(conf);
- partitionMessages.initialize();
+ partitionMessages.initialize(initialBufferSizes[workerInfo.getTaskId()]);
messageCache[partitionId] = partitionMessages;
+ } else {
+ originalSize = partitionMessages.getSize();
}
partitionMessages.add(destVertexId, message);
- // Update the number of cached, outgoing messages per worker
- messageCounts[workerInfo.getTaskId()]++;
- return messageCounts[workerInfo.getTaskId()];
+ // Update the size of cached, outgoing messages per worker
+ messageSizes[workerInfo.getTaskId()] +=
+ partitionMessages.getSize() - originalSize;
+ return messageSizes[workerInfo.getTaskId()];
}
/**
@@ -122,14 +157,15 @@ public class SendMessageCache<I extends
removeWorkerMessages(WorkerInfo workerInfo) {
PairList<Integer, ByteArrayVertexIdMessages<I, M>> workerMessages =
new PairList<Integer, ByteArrayVertexIdMessages<I, M>>();
- workerMessages.initialize();
- for (Integer partitionId : workerPartitions.get(workerInfo)) {
+ List<Integer> partitions = workerPartitions.get(workerInfo);
+ workerMessages.initialize(partitions.size());
+ for (Integer partitionId : partitions) {
if (messageCache[partitionId] != null) {
workerMessages.add(partitionId, messageCache[partitionId]);
messageCache[partitionId] = null;
}
}
- messageCounts[workerInfo.getTaskId()] = 0;
+ messageSizes[workerInfo.getTaskId()] = 0;
return workerMessages;
}
@@ -144,7 +180,7 @@ public class SendMessageCache<I extends
ByteArrayVertexIdMessages<I, M>>>
allMessages = new PairList<WorkerInfo,
PairList<Integer, ByteArrayVertexIdMessages<I, M>>>();
- allMessages.initialize();
+ allMessages.initialize(messageSizes.length);
for (WorkerInfo workerInfo : workerPartitions.keySet()) {
PairList<Integer, ByteArrayVertexIdMessages<I,
M>> workerMessages =
@@ -152,7 +188,7 @@ public class SendMessageCache<I extends
if (!workerMessages.isEmpty()) {
allMessages.add(workerInfo, workerMessages);
}
- messageCounts[workerInfo.getTaskId()] = 0;
+ messageSizes[workerInfo.getTaskId()] = 0;
}
return allMessages;
}
Modified:
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java?rev=1419623&r1=1419622&r2=1419623&view=diff
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
(original)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
Mon Dec 10 18:15:46 2012
@@ -35,6 +35,7 @@ import org.apache.giraph.comm.requests.S
import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
import org.apache.giraph.comm.requests.WorkerRequest;
import org.apache.giraph.comm.requests.WritableRequest;
+import org.apache.giraph.graph.BspService;
import org.apache.giraph.graph.Edge;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexMutations;
@@ -77,8 +78,8 @@ public class NettyWorkerClientRequestPro
private final WorkerClient<I, V, E, M> workerClient;
/** Messages sent during the last superstep */
private long totalMsgsSentInSuperstep = 0;
- /** Maximum number of messages per remote worker to cache before sending */
- private final int maxMessagesPerWorker;
+ /** Maximum size of messages per remote worker to cache before sending */
+ private final int maxMessagesSizePerWorker;
/** Maximum number of mutations per partition before sending */
private final int maxMutationsPerPartition;
/** Giraph configuration */
@@ -110,9 +111,9 @@ public class NettyWorkerClientRequestPro
configuration);
sendMessageCache =
new SendMessageCache<I, M>(configuration, serviceWorker);
- maxMessagesPerWorker = configuration.getInt(
- GiraphConfiguration.MSG_SIZE,
- GiraphConfiguration.MSG_SIZE_DEFAULT);
+ maxMessagesSizePerWorker = configuration.getInt(
+ GiraphConfiguration.MAX_MSG_REQUEST_SIZE,
+ GiraphConfiguration.MAX_MSG_REQUEST_SIZE_DEFAULT);
maxMutationsPerPartition = configuration.getInt(
GiraphConfiguration.MAX_MUTATIONS_PER_REQUEST,
GiraphConfiguration.MAX_MUTATIONS_PER_REQUEST_DEFAULT);
@@ -138,12 +139,12 @@ public class NettyWorkerClientRequestPro
++totalMsgsSentInSuperstep;
// Add the message to the cache
- int workerMessageCount = sendMessageCache.addMessage(
+ int workerMessageSize = sendMessageCache.addMessage(
workerInfo, partitionId, destVertexId, message);
// Send a request if the cache of outgoing message to
// the remote worker 'workerInfo' is full enough to be flushed
- if (workerMessageCount >= maxMessagesPerWorker) {
+ if (workerMessageSize >= maxMessagesSizePerWorker) {
PairList<Integer, ByteArrayVertexIdMessages<I, M>>
workerMessages =
sendMessageCache.removeWorkerMessages(workerInfo);
@@ -159,7 +160,6 @@ public class NettyWorkerClientRequestPro
@Override
public void sendPartitionRequest(WorkerInfo workerInfo,
Partition<I, V, E, M> partition) {
- final int partitionId = partition.getId();
if (LOG.isTraceEnabled()) {
LOG.trace("sendVertexRequest: Sending to " + workerInfo +
", with partition " + partition);
@@ -170,13 +170,26 @@ public class NettyWorkerClientRequestPro
doRequest(workerInfo, vertexRequest);
// Messages are stored separately
+ if (serviceWorker.getSuperstep() != BspService.INPUT_SUPERSTEP) {
+ sendPartitionMessages(workerInfo, partition);
+ }
+ }
+
+ /**
+ * Send all messages for a partition to another worker.
+ *
+ * @param workerInfo Worker to send the partition messages to
+ * @param partition Partition whose messages to send
+ */
+ private void sendPartitionMessages(WorkerInfo workerInfo,
+ Partition<I, V, E, M> partition) {
+ final int partitionId = partition.getId();
MessageStoreByPartition<I, M> messageStore =
serverData.getCurrentMessageStore();
ByteArrayVertexIdMessages<I, M> vertexIdMessages =
new ByteArrayVertexIdMessages<I, M>();
vertexIdMessages.setConf(configuration);
vertexIdMessages.initialize();
- int messagesInMap = 0;
for (I vertexId :
messageStore.getPartitionDestinationVertices(partitionId)) {
try {
@@ -185,13 +198,12 @@ public class NettyWorkerClientRequestPro
Iterable<M> messages = messageStore.getVertexMessages(vertexId);
for (M message : messages) {
vertexIdMessages.add(vertexId, message);
- ++messagesInMap;
}
} catch (IOException e) {
throw new IllegalStateException(
"sendVertexRequest: Got IOException ", e);
}
- if (messagesInMap > maxMessagesPerWorker) {
+ if (vertexIdMessages.getSize() > maxMessagesSizePerWorker) {
WritableRequest messagesRequest = new
SendPartitionCurrentMessagesRequest<I, V, E, M>(
partitionId, vertexIdMessages);
@@ -200,10 +212,9 @@ public class NettyWorkerClientRequestPro
new ByteArrayVertexIdMessages<I, M>();
vertexIdMessages.setConf(configuration);
vertexIdMessages.initialize();
- messagesInMap = 0;
}
}
- if (vertexIdMessages != null) {
+ if (!vertexIdMessages.isEmpty()) {
WritableRequest messagesRequest = new
SendPartitionCurrentMessagesRequest<I, V, E, M>(
partitionId, vertexIdMessages);