Author: maja
Date: Wed Dec 5 20:52:02 2012
New Revision: 1417641
URL: http://svn.apache.org/viewvc?rev=1417641&view=rev
Log:
GIRAPH-443: Properly size netty buffers when encoding requests
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/ByteArrayRequest.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SaslTokenMessageRequest.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
Modified: giraph/trunk/CHANGELOG
URL:
http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1417641&r1=1417640&r2=1417641&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Wed Dec 5 20:52:02 2012
@@ -1,6 +1,8 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-443: Properly size netty buffers when encoding requests (majakabiljo)
+
GIRAPH-395: No need to make HashWorkerPartitioner thread-safe. (aching)
GIRAPH-441: Keep track of connected channels in NettyServer (majakabiljo)
Modified:
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java?rev=1417641&r1=1417640&r2=1417641&view=diff
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
(original)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
Wed Dec 5 20:52:02 2012
@@ -67,13 +67,28 @@ public class RequestEncoder extends OneT
startEncodingNanoseconds = TIME.getNanoseconds();
}
WritableRequest writableRequest = (WritableRequest) msg;
- ChannelBufferOutputStream outputStream =
- new ChannelBufferOutputStream(ChannelBuffers.dynamicBuffer(
- bufferStartingSize,
- ctx.getChannel().getConfig().getBufferFactory()));
+ int requestSize = writableRequest.getSerializedSize();
+ ChannelBufferOutputStream outputStream;
+ if (requestSize == WritableRequest.UNKNOWN_SIZE) {
+ outputStream =
+ new ChannelBufferOutputStream(ChannelBuffers.dynamicBuffer(
+ bufferStartingSize,
+ ctx.getChannel().getConfig().getBufferFactory()));
+ } else {
+ requestSize += LENGTH_PLACEHOLDER.length + 1;
+ outputStream = new ChannelBufferOutputStream(
+ ChannelBuffers.directBuffer(requestSize));
+ }
outputStream.write(LENGTH_PLACEHOLDER);
outputStream.writeByte(writableRequest.getType().ordinal());
- writableRequest.write(outputStream);
+ try {
+ writableRequest.write(outputStream);
+ } catch (IndexOutOfBoundsException e) {
+ LOG.error("encode: Most likely the size of request was not properly " +
+ "specified - see getSerializedSize() in " +
+ writableRequest.getType().getRequestClass());
+ throw new IllegalStateException(e);
+ }
outputStream.flush();
outputStream.close();
Modified:
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/ByteArrayRequest.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/ByteArrayRequest.java?rev=1417641&r1=1417640&r2=1417641&view=diff
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/ByteArrayRequest.java
(original)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/ByteArrayRequest.java
Wed Dec 5 20:52:02 2012
@@ -76,4 +76,10 @@ public abstract class ByteArrayRequest e
output.writeInt(data.length);
output.write(data);
}
+
+ @Override
+ public int getSerializedSize() {
+ // 4 for the length of data, plus number of data bytes
+ return super.getSerializedSize() + 4 + data.length;
+ }
}
Modified:
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SaslTokenMessageRequest.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SaslTokenMessageRequest.java?rev=1417641&r1=1417640&r2=1417641&view=diff
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SaslTokenMessageRequest.java
(original)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SaslTokenMessageRequest.java
Wed Dec 5 20:52:02 2012
@@ -103,4 +103,9 @@ public class SaslTokenMessageRequest ext
output.writeInt(token.length);
output.write(token);
}
+
+ @Override
+ public int getSerializedSize() {
+ return super.getSerializedSize() + 4 + token.length;
+ }
}
Modified:
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java?rev=1417641&r1=1417640&r2=1417641&view=diff
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
(original)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
Wed Dec 5 20:52:02 2012
@@ -88,4 +88,10 @@ public class SendPartitionCurrentMessage
throw new RuntimeException("doRequest: Got IOException ", e);
}
}
+
+ @Override
+ public int getSerializedSize() {
+ return super.getSerializedSize() + 4 +
+ vertexIdMessageMap.getSerializedSize();
+ }
}
Modified:
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java?rev=1417641&r1=1417640&r2=1417641&view=diff
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java
(original)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java
Wed Dec 5 20:52:02 2012
@@ -126,4 +126,9 @@ public class SendPartitionMutationsReque
}
}
}
+
+ @Override
+ public int getSerializedSize() {
+ return WritableRequest.UNKNOWN_SIZE;
+ }
}
Modified:
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java?rev=1417641&r1=1417640&r2=1417641&view=diff
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java
(original)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java
Wed Dec 5 20:52:02 2012
@@ -80,5 +80,10 @@ public class SendVertexRequest<I extends
public void doRequest(ServerData<I, V, E, M> serverData) {
serverData.getPartitionStore().addPartition(partition);
}
+
+ @Override
+ public int getSerializedSize() {
+ return WritableRequest.UNKNOWN_SIZE;
+ }
}
Modified:
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java?rev=1417641&r1=1417640&r2=1417641&view=diff
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
(original)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
Wed Dec 5 20:52:02 2012
@@ -116,4 +116,16 @@ public class SendWorkerMessagesRequest<I
}
}
}
+
+ @Override
+ public int getSerializedSize() {
+ int size = super.getSerializedSize() + 4;
+ PairList<Integer, ByteArrayVertexIdMessages<I, M>>.Iterator
+ iterator = partitionVertexMessages.getIterator();
+ while (iterator.hasNext()) {
+ iterator.next();
+ size += 4 + iterator.getCurrentSecond().getSerializedSize();
+ }
+ return size;
+ }
}
Modified:
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java?rev=1417641&r1=1417640&r2=1417641&view=diff
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java
(original)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java
Wed Dec 5 20:52:02 2012
@@ -38,6 +38,12 @@ public abstract class WritableRequest<I
V extends Writable, E extends Writable, M extends Writable>
implements Writable,
ImmutableClassesGiraphConfigurable<I, V, E, M> {
+ /**
+ * Value to use when size of the request in serialized form is not known
+ * or too expensive to calculate
+ */
+ public static final int UNKNOWN_SIZE = -1;
+
/** Configuration */
private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
/** Client id */
@@ -62,6 +68,20 @@ public abstract class WritableRequest<I
}
/**
+ * Get the size of the request in serialized form. The number returned by
+ * this function can't be less than the actual size - if the size can't be
+ * calculated correctly return WritableRequest.UNKNOWN_SIZE.
+ *
+ * @return The size (in bytes) of serialized request,
+ * or WritableRequest.UNKNOWN_SIZE if the size is not known
+ * or too expensive to calculate.
+ */
+ public int getSerializedSize() {
+ // 4 for clientId, 8 for requestId
+ return 4 + 8;
+ }
+
+ /**
* Get the type of the request
*
* @return Request type
Modified:
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java?rev=1417641&r1=1417640&r2=1417641&view=diff
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
(original)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
Wed Dec 5 20:52:02 2012
@@ -182,6 +182,15 @@ public class ByteArrayVertexIdMessages<I
}
/**
+ * Get the size of this object in serialized form.
+ *
+ * @return The size (in bytes) of serialized object
+ */
+ public int getSerializedSize() {
+ return 1 + 4 + getSize();
+ }
+
+ /**
* Common implementation for VertexIdMessageIterator
* and VertexIdMessageBytesIterator
*/