Author: maja
Date: Wed Dec  5 20:52:02 2012
New Revision: 1417641

URL: http://svn.apache.org/viewvc?rev=1417641&view=rev
Log:
GIRAPH-443: Properly size netty buffers when encoding requests

Modified:
    giraph/trunk/CHANGELOG
    
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
    
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/ByteArrayRequest.java
    
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SaslTokenMessageRequest.java
    
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
    
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java
    
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java
    
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
    
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java
    
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java

Modified: giraph/trunk/CHANGELOG
URL: 
http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1417641&r1=1417640&r2=1417641&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Wed Dec  5 20:52:02 2012
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-443: Properly size netty buffers when encoding requests (majakabiljo)
+
   GIRAPH-395: No need to make HashWorkerPartitioner thread-safe. (aching)
 
   GIRAPH-441: Keep track of connected channels in NettyServer (majakabiljo)

Modified: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java?rev=1417641&r1=1417640&r2=1417641&view=diff
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
 (original)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
 Wed Dec  5 20:52:02 2012
@@ -67,13 +67,28 @@ public class RequestEncoder extends OneT
       startEncodingNanoseconds = TIME.getNanoseconds();
     }
     WritableRequest writableRequest = (WritableRequest) msg;
-    ChannelBufferOutputStream outputStream =
-        new ChannelBufferOutputStream(ChannelBuffers.dynamicBuffer(
-            bufferStartingSize,
-            ctx.getChannel().getConfig().getBufferFactory()));
+    int requestSize = writableRequest.getSerializedSize();
+    ChannelBufferOutputStream outputStream;
+    if (requestSize == WritableRequest.UNKNOWN_SIZE) {
+      outputStream =
+          new ChannelBufferOutputStream(ChannelBuffers.dynamicBuffer(
+              bufferStartingSize,
+              ctx.getChannel().getConfig().getBufferFactory()));
+    } else {
+      requestSize += LENGTH_PLACEHOLDER.length + 1;
+      outputStream = new ChannelBufferOutputStream(
+          ChannelBuffers.directBuffer(requestSize));
+    }
     outputStream.write(LENGTH_PLACEHOLDER);
     outputStream.writeByte(writableRequest.getType().ordinal());
-    writableRequest.write(outputStream);
+    try {
+      writableRequest.write(outputStream);
+    } catch (IndexOutOfBoundsException e) {
+      LOG.error("encode: Most likely the size of request was not properly " +
+          "specified - see getSerializedSize() in " +
+          writableRequest.getType().getRequestClass());
+      throw new IllegalStateException(e);
+    }
     outputStream.flush();
     outputStream.close();
 

Modified: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/ByteArrayRequest.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/ByteArrayRequest.java?rev=1417641&r1=1417640&r2=1417641&view=diff
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/ByteArrayRequest.java
 (original)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/ByteArrayRequest.java
 Wed Dec  5 20:52:02 2012
@@ -76,4 +76,10 @@ public abstract class ByteArrayRequest e
     output.writeInt(data.length);
     output.write(data);
   }
+
+  @Override
+  public int getSerializedSize() {
+    // 4 for the length of data, plus number of data bytes
+    return super.getSerializedSize() + 4 + data.length;
+  }
 }

Modified: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SaslTokenMessageRequest.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SaslTokenMessageRequest.java?rev=1417641&r1=1417640&r2=1417641&view=diff
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SaslTokenMessageRequest.java
 (original)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SaslTokenMessageRequest.java
 Wed Dec  5 20:52:02 2012
@@ -103,4 +103,9 @@ public class SaslTokenMessageRequest ext
     output.writeInt(token.length);
     output.write(token);
   }
+
+  @Override
+  public int getSerializedSize() {
+    return super.getSerializedSize() + 4 + token.length;
+  }
 }

Modified: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java?rev=1417641&r1=1417640&r2=1417641&view=diff
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
 (original)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
 Wed Dec  5 20:52:02 2012
@@ -88,4 +88,10 @@ public class SendPartitionCurrentMessage
       throw new RuntimeException("doRequest: Got IOException ", e);
     }
   }
+
+  @Override
+  public int getSerializedSize() {
+    return super.getSerializedSize() + 4 +
+        vertexIdMessageMap.getSerializedSize();
+  }
 }

Modified: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java?rev=1417641&r1=1417640&r2=1417641&view=diff
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java
 (original)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java
 Wed Dec  5 20:52:02 2012
@@ -126,4 +126,9 @@ public class SendPartitionMutationsReque
       }
     }
   }
+
+  @Override
+  public int getSerializedSize() {
+    return WritableRequest.UNKNOWN_SIZE;
+  }
 }

Modified: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java?rev=1417641&r1=1417640&r2=1417641&view=diff
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java
 (original)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java
 Wed Dec  5 20:52:02 2012
@@ -80,5 +80,10 @@ public class SendVertexRequest<I extends
   public void doRequest(ServerData<I, V, E, M> serverData) {
     serverData.getPartitionStore().addPartition(partition);
   }
+
+  @Override
+  public int getSerializedSize() {
+    return WritableRequest.UNKNOWN_SIZE;
+  }
 }
 

Modified: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java?rev=1417641&r1=1417640&r2=1417641&view=diff
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
 (original)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
 Wed Dec  5 20:52:02 2012
@@ -116,4 +116,16 @@ public class SendWorkerMessagesRequest<I
       }
     }
   }
+
+  @Override
+  public int getSerializedSize() {
+    int size = super.getSerializedSize() + 4;
+    PairList<Integer, ByteArrayVertexIdMessages<I, M>>.Iterator
+        iterator = partitionVertexMessages.getIterator();
+    while (iterator.hasNext()) {
+      iterator.next();
+      size += 4 + iterator.getCurrentSecond().getSerializedSize();
+    }
+    return size;
+  }
 }

Modified: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java?rev=1417641&r1=1417640&r2=1417641&view=diff
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java
 (original)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java
 Wed Dec  5 20:52:02 2012
@@ -38,6 +38,12 @@ public abstract class WritableRequest<I 
     V extends Writable, E extends Writable, M extends Writable>
     implements Writable,
     ImmutableClassesGiraphConfigurable<I, V, E, M> {
+  /**
+   * Value to use when size of the request in serialized form is not known
+   * or too expensive to calculate
+   */
+  public static final int UNKNOWN_SIZE = -1;
+
   /** Configuration */
   private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
   /** Client id */
@@ -62,6 +68,20 @@ public abstract class WritableRequest<I 
   }
 
   /**
+   * Get the size of the request in serialized form. The number returned by
+   * this function can't be less than the actual size - if the size can't be
+   * calculated correctly return WritableRequest.UNKNOWN_SIZE.
+   *
+   * @return The size (in bytes) of serialized request,
+   * or WritableRequest.UNKNOWN_SIZE if the size is not known
+   * or too expensive to calculate.
+   */
+  public int getSerializedSize() {
+    // 4 for clientId, 8 for requestId
+    return 4 + 8;
+  }
+
+  /**
    * Get the type of the request
    *
    * @return Request type

Modified: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java?rev=1417641&r1=1417640&r2=1417641&view=diff
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
 (original)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
 Wed Dec  5 20:52:02 2012
@@ -182,6 +182,15 @@ public class ByteArrayVertexIdMessages<I
   }
 
   /**
+   * Get the size of this object in serialized form.
+   *
+   * @return The size (in bytes) of serialized object
+   */
+  public int getSerializedSize() {
+    return 1 + 4 + getSize();
+  }
+
+  /**
    * Common implementation for VertexIdMessageIterator
    * and VertexIdMessageBytesIterator
    */


Reply via email to