Updated Branches: refs/heads/giraph474 [created] 8711d9bbc
GIRAPH-474: Add an option not to use direct byte buffers Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/8711d9bb Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/8711d9bb Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/8711d9bb Branch: refs/heads/giraph474 Commit: 8711d9bbc3f1cb3e304408feae1a4d8ee7ecdee2 Parents: 7a04bfd Author: Maja Kabiljo <[email protected]> Authored: Wed Jan 9 14:14:38 2013 -0800 Committer: Maja Kabiljo <[email protected]> Committed: Wed Jan 9 14:14:38 2013 -0800 ---------------------------------------------------------------------- CHANGELOG | 2 + .../org/apache/giraph/comm/netty/NettyClient.java | 8 +--- .../giraph/comm/netty/handler/RequestEncoder.java | 31 ++++++++++----- .../org/apache/giraph/conf/GiraphConstants.java | 9 ++++ 4 files changed, 34 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/8711d9bb/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index d67df78..6d469b0 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 0.2.0 - unreleased + GIRAPH-474: Add an option not to use direct byte buffers (majakabiljo) + GIRAPH-476: SequenceFileVertexOutputFormat (nitay) GIRAPH-409: Refactor / cleanups (nitay) http://git-wip-us.apache.org/repos/asf/giraph/blob/8711d9bb/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java index c66c819..ed92d82 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java @@ -278,9 +278,7 @@ public class NettyClient { // completes (as in non-auth pipeline below). pipeline.addLast("length-field-based-frame-decoder", new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4)); - pipeline.addLast("request-encoder", new RequestEncoder(conf.getInt( - GiraphConstants.NETTY_REQUEST_ENCODER_BUFFER_SIZE, - GiraphConstants.NETTY_REQUEST_ENCODER_BUFFER_SIZE_DEFAULT))); + pipeline.addLast("request-encoder", new RequestEncoder(conf)); // The following pipeline component responds to the server's SASL // tokens with its own responses. Both client and server share the // same Hadoop Job token, which is used to create the SASL tokens to @@ -298,9 +296,7 @@ public class NettyClient { pipeline.addLast("clientByteCounter", byteCounter); pipeline.addLast("responseFrameDecoder", new FixedLengthFrameDecoder(RequestServerHandler.RESPONSE_BYTES)); - pipeline.addLast("requestEncoder", new RequestEncoder(conf.getInt( - GiraphConstants.NETTY_REQUEST_ENCODER_BUFFER_SIZE, - GiraphConstants.NETTY_REQUEST_ENCODER_BUFFER_SIZE_DEFAULT))); + pipeline.addLast("requestEncoder", new RequestEncoder(conf)); pipeline.addLast("responseClientHandler", new ResponseClientHandler(clientRequestIdRequestInfoMap, conf)); if (executionHandler != null) { http://git-wip-us.apache.org/repos/asf/giraph/blob/8711d9bb/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java index 7fa0a4e..4e739cb 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java @@ -19,6 +19,8 @@ package org.apache.giraph.comm.netty.handler; import org.apache.giraph.comm.requests.WritableRequest; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.time.SystemTime; import org.apache.giraph.time.Time; import org.apache.giraph.time.Times; @@ -42,16 +44,23 @@ public class RequestEncoder extends OneToOneEncoder { private static final byte[] LENGTH_PLACEHOLDER = new byte[4]; /** Buffer starting size */ private final int bufferStartingSize; + /** Whether or not to use direct byte buffers */ + private final boolean useDirectBuffers; /** Start nanoseconds for the encoding time */ private long startEncodingNanoseconds = -1; /** * Constructor. * - * @param bufferStartingSize Starting size of the buffer + * @param conf Giraph configuration */ - public RequestEncoder(int bufferStartingSize) { - this.bufferStartingSize = bufferStartingSize; + public RequestEncoder(GiraphConfiguration conf) { + bufferStartingSize = conf.getInt( + GiraphConstants.NETTY_REQUEST_ENCODER_BUFFER_SIZE, + GiraphConstants.NETTY_REQUEST_ENCODER_BUFFER_SIZE_DEFAULT); + useDirectBuffers = conf.getBoolean( + GiraphConstants.NETTY_REQUEST_ENCODER_USE_DIRECT_BUFFERS, + GiraphConstants.NETTY_REQUEST_ENCODER_USE_DIRECT_BUFFERS_DEFAULT); } @Override @@ -68,17 +77,19 @@ public class RequestEncoder extends OneToOneEncoder { } WritableRequest writableRequest = (WritableRequest) msg; int requestSize = writableRequest.getSerializedSize(); - ChannelBufferOutputStream outputStream; + ChannelBuffer channelBuffer; if (requestSize == WritableRequest.UNKNOWN_SIZE) { - outputStream = - new ChannelBufferOutputStream(ChannelBuffers.dynamicBuffer( - bufferStartingSize, - ctx.getChannel().getConfig().getBufferFactory())); + channelBuffer = ChannelBuffers.dynamicBuffer( + bufferStartingSize, + ctx.getChannel().getConfig().getBufferFactory()); } else { requestSize += LENGTH_PLACEHOLDER.length + 1; - outputStream = new ChannelBufferOutputStream( - ChannelBuffers.directBuffer(requestSize)); + channelBuffer = useDirectBuffers ? + ChannelBuffers.directBuffer(requestSize) : + ChannelBuffers.buffer(requestSize); } + ChannelBufferOutputStream outputStream = + new ChannelBufferOutputStream(channelBuffer); outputStream.write(LENGTH_PLACEHOLDER); outputStream.writeByte(writableRequest.getType().ordinal()); try { http://git-wip-us.apache.org/repos/asf/giraph/blob/8711d9bb/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index 11d4a41..9acc50a 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -177,6 +177,15 @@ public interface GiraphConstants { /** Start with 32K */ int NETTY_REQUEST_ENCODER_BUFFER_SIZE_DEFAULT = 32 * 1024; + /** Whether or not netty request encoder should use direct byte buffers */ + String NETTY_REQUEST_ENCODER_USE_DIRECT_BUFFERS = + "giraph.nettyRequestEncoderUseDirectBuffers"; + /** + * By default don't use direct buffers, + * since jobs can take more than allowed heap memory in that case + */ + boolean NETTY_REQUEST_ENCODER_USE_DIRECT_BUFFERS_DEFAULT = false; + /** Netty client threads */ String NETTY_CLIENT_THREADS = "giraph.nettyClientThreads"; /** Default is 4 */
