Author: maja
Date: Mon Dec 10 18:15:46 2012
New Revision: 1419623

URL: http://svn.apache.org/viewvc?rev=1419623&view=rev
Log:
GIRAPH-445: Max message request size in bytes, initialize buffers to expected 
size

Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
    
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java
    
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java

Modified: giraph/trunk/CHANGELOG
URL: 
http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1419623&r1=1419622&r2=1419623&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Mon Dec 10 18:15:46 2012
@@ -1,6 +1,9 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-445: Max message request size in bytes, initialize buffers to 
+  expected size (majakabiljo)
+
   GIRAPH-444: Cleanup VertexResolver (nitay)
 
   GIRAPH-446: Add a proper timeout for waiting for workers to join a

Modified: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java?rev=1419623&r1=1419622&r2=1419623&view=diff
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java 
(original)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java 
Mon Dec 10 18:15:46 2012
@@ -385,10 +385,10 @@ public class GiraphConfiguration extends
   /** Default server receive buffer size of 0.5 MB */
   public static final int DEFAULT_SERVER_RECEIVE_BUFFER_SIZE = 512 * 1024;
 
-  /** Maximum number of messages per peer before flush */
-  public static final String MSG_SIZE = "giraph.msgSize";
-  /** Default maximum number of messages per peer before flush */
-  public static final int MSG_SIZE_DEFAULT = 2000;
+  /** Maximum size of messages (in bytes) per peer before flush */
+  public static final String MAX_MSG_REQUEST_SIZE = "giraph.msgRequestSize";
+  /** Default maximum size of messages per peer before flush of 0.5MB */
+  public static final int MAX_MSG_REQUEST_SIZE_DEFAULT = 512 * 1024;
 
   /** Maximum number of mutations per partition before flush */
   public static final String MAX_MUTATIONS_PER_REQUEST =
@@ -396,12 +396,6 @@ public class GiraphConfiguration extends
   /** Default maximum number of mutations per partition before flush */
   public static final int MAX_MUTATIONS_PER_REQUEST_DEFAULT = 100;
 
-  /** Maximum number of messages that can be bulk sent during a flush */
-  public static final String MAX_MESSAGES_PER_FLUSH_PUT =
-      "giraph.maxMessagesPerFlushPut";
-  /** Default number of messages that can be bulk sent during a flush */
-  public static final int DEFAULT_MAX_MESSAGES_PER_FLUSH_PUT = 2000;
-
   /**
    * Use message size encoding (typically better for complex objects,
    * not meant for primitive wrapped messages)

Modified: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java?rev=1419623&r1=1419622&r2=1419623&view=diff
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java 
(original)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java 
Mon Dec 10 18:15:46 2012
@@ -22,6 +22,8 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import java.util.List;
 import java.util.Map;
+
+import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.graph.WorkerInfo;
@@ -41,10 +43,27 @@ import org.apache.hadoop.io.WritableComp
 @SuppressWarnings("rawtypes")
 public class SendMessageCache<I extends WritableComparable,
     M extends Writable> {
+  /**
+   * How much bigger than the average per partition size to make initial per
+   * partition buffers.
+   * If this value is A, message request size is M,
+   * and a worker has P partitions, than its initial partition buffer size
+   * will be (M / P) * (1 + A).
+   */
+  public static final String ADDITIONAL_MSG_REQUEST_SIZE =
+      "giraph.additionalMsgRequestSize";
+  /**
+   * Default factor for how bigger should initial per partition buffers be
+   * of 20%.
+   */
+  public static final float ADDITIONAL_MSG_REQUEST_SIZE_DEFAULT = 0.2f;
+
   /** Internal cache */
   private final ByteArrayVertexIdMessages<I, M>[] messageCache;
-  /** Number of messages in each partition */
-  private final int[] messageCounts;
+  /** Size of messages (in bytes) for each worker */
+  private final int[] messageSizes;
+  /** How big to initially make output streams for each worker's partitions */
+  private final int[] initialBufferSizes;
   /** List of partition ids belonging to a worker */
   private final Map<WorkerInfo, List<Integer>> workerPartitions =
       Maps.newHashMap();
@@ -79,7 +98,19 @@ public class SendMessageCache<I extends 
     for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
       maxWorker = Math.max(maxWorker, workerInfo.getTaskId());
     }
-    messageCounts = new int[maxWorker + 1];
+    messageSizes = new int[maxWorker + 1];
+
+    float additionalRequestSize =
+        conf.getFloat(ADDITIONAL_MSG_REQUEST_SIZE,
+            ADDITIONAL_MSG_REQUEST_SIZE_DEFAULT);
+    int requestSize = conf.getInt(GiraphConfiguration.MAX_MSG_REQUEST_SIZE,
+        GiraphConfiguration.MAX_MSG_REQUEST_SIZE_DEFAULT);
+    int initialRequestSize = (int) (requestSize * (1 + additionalRequestSize));
+    initialBufferSizes = new int[maxWorker + 1];
+    for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
+      initialBufferSizes[workerInfo.getTaskId()] =
+          initialRequestSize / workerPartitions.get(workerInfo).size();
+    }
   }
 
   /**
@@ -90,24 +121,28 @@ public class SendMessageCache<I extends 
    * @param destVertexId vertex id that is ultimate destination
    * @param message Message to be send to remote
    *                <b>host => partition => vertex</b>
-   * @return Number of messages in the partition.
+   * @return Size of messages for the worker.
    */
   public int addMessage(WorkerInfo workerInfo,
     final int partitionId, I destVertexId, M message) {
     // Get the message collection
     ByteArrayVertexIdMessages<I, M> partitionMessages =
         messageCache[partitionId];
+    int originalSize = 0;
     if (partitionMessages == null) {
       partitionMessages = new ByteArrayVertexIdMessages<I, M>();
       partitionMessages.setConf(conf);
-      partitionMessages.initialize();
+      partitionMessages.initialize(initialBufferSizes[workerInfo.getTaskId()]);
       messageCache[partitionId] = partitionMessages;
+    } else {
+      originalSize = partitionMessages.getSize();
     }
     partitionMessages.add(destVertexId, message);
 
-    // Update the number of cached, outgoing messages per worker
-    messageCounts[workerInfo.getTaskId()]++;
-    return messageCounts[workerInfo.getTaskId()];
+    // Update the size of cached, outgoing messages per worker
+    messageSizes[workerInfo.getTaskId()] +=
+        partitionMessages.getSize() - originalSize;
+    return messageSizes[workerInfo.getTaskId()];
   }
 
   /**
@@ -122,14 +157,15 @@ public class SendMessageCache<I extends 
   removeWorkerMessages(WorkerInfo workerInfo) {
     PairList<Integer, ByteArrayVertexIdMessages<I, M>> workerMessages =
         new PairList<Integer, ByteArrayVertexIdMessages<I, M>>();
-    workerMessages.initialize();
-    for (Integer partitionId : workerPartitions.get(workerInfo)) {
+    List<Integer> partitions = workerPartitions.get(workerInfo);
+    workerMessages.initialize(partitions.size());
+    for (Integer partitionId : partitions) {
       if (messageCache[partitionId] != null) {
         workerMessages.add(partitionId, messageCache[partitionId]);
         messageCache[partitionId] = null;
       }
     }
-    messageCounts[workerInfo.getTaskId()] = 0;
+    messageSizes[workerInfo.getTaskId()] = 0;
     return workerMessages;
   }
 
@@ -144,7 +180,7 @@ public class SendMessageCache<I extends 
         ByteArrayVertexIdMessages<I, M>>>
         allMessages = new PairList<WorkerInfo,
         PairList<Integer, ByteArrayVertexIdMessages<I, M>>>();
-    allMessages.initialize();
+    allMessages.initialize(messageSizes.length);
     for (WorkerInfo workerInfo : workerPartitions.keySet()) {
       PairList<Integer, ByteArrayVertexIdMessages<I,
                 M>> workerMessages =
@@ -152,7 +188,7 @@ public class SendMessageCache<I extends 
       if (!workerMessages.isEmpty()) {
         allMessages.add(workerInfo, workerMessages);
       }
-      messageCounts[workerInfo.getTaskId()] = 0;
+      messageSizes[workerInfo.getTaskId()] = 0;
     }
     return allMessages;
   }

Modified: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java?rev=1419623&r1=1419622&r2=1419623&view=diff
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
 (original)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
 Mon Dec 10 18:15:46 2012
@@ -35,6 +35,7 @@ import org.apache.giraph.comm.requests.S
 import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
 import org.apache.giraph.comm.requests.WorkerRequest;
 import org.apache.giraph.comm.requests.WritableRequest;
+import org.apache.giraph.graph.BspService;
 import org.apache.giraph.graph.Edge;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexMutations;
@@ -77,8 +78,8 @@ public class NettyWorkerClientRequestPro
   private final WorkerClient<I, V, E, M> workerClient;
   /** Messages sent during the last superstep */
   private long totalMsgsSentInSuperstep = 0;
-  /** Maximum number of messages per remote worker to cache before sending */
-  private final int maxMessagesPerWorker;
+  /** Maximum size of messages per remote worker to cache before sending */
+  private final int maxMessagesSizePerWorker;
   /** Maximum number of mutations per partition before sending */
   private final int maxMutationsPerPartition;
   /** Giraph configuration */
@@ -110,9 +111,9 @@ public class NettyWorkerClientRequestPro
         configuration);
     sendMessageCache =
         new SendMessageCache<I, M>(configuration, serviceWorker);
-    maxMessagesPerWorker = configuration.getInt(
-        GiraphConfiguration.MSG_SIZE,
-        GiraphConfiguration.MSG_SIZE_DEFAULT);
+    maxMessagesSizePerWorker = configuration.getInt(
+        GiraphConfiguration.MAX_MSG_REQUEST_SIZE,
+        GiraphConfiguration.MAX_MSG_REQUEST_SIZE_DEFAULT);
     maxMutationsPerPartition = configuration.getInt(
         GiraphConfiguration.MAX_MUTATIONS_PER_REQUEST,
         GiraphConfiguration.MAX_MUTATIONS_PER_REQUEST_DEFAULT);
@@ -138,12 +139,12 @@ public class NettyWorkerClientRequestPro
     ++totalMsgsSentInSuperstep;
 
     // Add the message to the cache
-    int workerMessageCount = sendMessageCache.addMessage(
+    int workerMessageSize = sendMessageCache.addMessage(
         workerInfo, partitionId, destVertexId, message);
 
     // Send a request if the cache of outgoing message to
     // the remote worker 'workerInfo' is full enough to be flushed
-    if (workerMessageCount >= maxMessagesPerWorker) {
+    if (workerMessageSize >= maxMessagesSizePerWorker) {
       PairList<Integer, ByteArrayVertexIdMessages<I, M>>
           workerMessages =
           sendMessageCache.removeWorkerMessages(workerInfo);
@@ -159,7 +160,6 @@ public class NettyWorkerClientRequestPro
   @Override
   public void sendPartitionRequest(WorkerInfo workerInfo,
                                    Partition<I, V, E, M> partition) {
-    final int partitionId = partition.getId();
     if (LOG.isTraceEnabled()) {
       LOG.trace("sendVertexRequest: Sending to " + workerInfo +
           ", with partition " + partition);
@@ -170,13 +170,26 @@ public class NettyWorkerClientRequestPro
     doRequest(workerInfo, vertexRequest);
 
     // Messages are stored separately
+    if (serviceWorker.getSuperstep() != BspService.INPUT_SUPERSTEP) {
+      sendPartitionMessages(workerInfo, partition);
+    }
+  }
+
+  /**
+   * Send all messages for a partition to another worker.
+   *
+   * @param workerInfo Worker to send the partition messages to
+   * @param partition Partition whose messages to send
+   */
+  private void sendPartitionMessages(WorkerInfo workerInfo,
+                                     Partition<I, V, E, M> partition) {
+    final int partitionId = partition.getId();
     MessageStoreByPartition<I, M> messageStore =
         serverData.getCurrentMessageStore();
     ByteArrayVertexIdMessages<I, M> vertexIdMessages =
         new ByteArrayVertexIdMessages<I, M>();
     vertexIdMessages.setConf(configuration);
     vertexIdMessages.initialize();
-    int messagesInMap = 0;
     for (I vertexId :
         messageStore.getPartitionDestinationVertices(partitionId)) {
       try {
@@ -185,13 +198,12 @@ public class NettyWorkerClientRequestPro
         Iterable<M> messages = messageStore.getVertexMessages(vertexId);
         for (M message : messages) {
           vertexIdMessages.add(vertexId, message);
-          ++messagesInMap;
         }
       } catch (IOException e) {
         throw new IllegalStateException(
             "sendVertexRequest: Got IOException ", e);
       }
-      if (messagesInMap > maxMessagesPerWorker) {
+      if (vertexIdMessages.getSize() > maxMessagesSizePerWorker) {
         WritableRequest messagesRequest = new
             SendPartitionCurrentMessagesRequest<I, V, E, M>(
             partitionId, vertexIdMessages);
@@ -200,10 +212,9 @@ public class NettyWorkerClientRequestPro
             new ByteArrayVertexIdMessages<I, M>();
         vertexIdMessages.setConf(configuration);
         vertexIdMessages.initialize();
-        messagesInMap = 0;
       }
     }
-    if (vertexIdMessages != null) {
+    if (!vertexIdMessages.isEmpty()) {
       WritableRequest messagesRequest = new
           SendPartitionCurrentMessagesRequest<I, V, E, M>(
           partitionId, vertexIdMessages);


Reply via email to