Repository: helix Updated Branches: refs/heads/master 3cb08dbba -> 0cf986e17
Cleaned up ByteBuf usage in IPC Removed dead code in IPC Made max frame length configurable (also bumped to 128MB default) Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/0cf986e1 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/0cf986e1 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/0cf986e1 Branch: refs/heads/master Commit: 0cf986e1734a8e2261f14d0e5a4463e8826c4457 Parents: 3cb08db Author: Greg Brandt <[email protected]> Authored: Wed Jul 1 15:50:04 2015 -0700 Committer: Greg Brandt <[email protected]> Committed: Sat Jul 18 12:54:21 2015 -0700 ---------------------------------------------------------------------- .../ipc/netty/NettyHelixIPCCallbackHandler.java | 31 +++----------------- .../helix/ipc/netty/NettyHelixIPCService.java | 16 ++++++---- .../helix/ipc/netty/NettyHelixIPCUtils.java | 4 ++- 3 files changed, 18 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/0cf986e1/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCCallbackHandler.java ---------------------------------------------------------------------- diff --git a/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCCallbackHandler.java b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCCallbackHandler.java index 164e6d1..90fc7a2 100644 --- a/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCCallbackHandler.java +++ b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCCallbackHandler.java @@ -54,75 +54,52 @@ public class NettyHelixIPCCallbackHandler extends SimpleChannelInboundHandler<By @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception { try { - int idx = 0; - // Message length int messageLength = byteBuf.readInt(); - idx += 4; // Message version @SuppressWarnings("unused") int messageVersion = byteBuf.readInt(); - idx += 4; // Message type int messageType = byteBuf.readInt(); - idx += 4; // Message ID UUID messageId = new UUID(byteBuf.readLong(), byteBuf.readLong()); - idx += 16; // Cluster - byteBuf.readerIndex(idx); int clusterSize = byteBuf.readInt(); - idx += 4; checkLength("clusterSize", clusterSize, messageLength); String clusterName = toNonEmptyString(clusterSize, byteBuf); - idx += clusterSize; // Resource - byteBuf.readerIndex(idx); int resourceSize = byteBuf.readInt(); - idx += 4; checkLength("resourceSize", resourceSize, messageLength); String resourceName = toNonEmptyString(resourceSize, byteBuf); - idx += resourceSize; // Partition - byteBuf.readerIndex(idx); int partitionSize = byteBuf.readInt(); - idx += 4; checkLength("partitionSize", partitionSize, messageLength); String partitionName = toNonEmptyString(partitionSize, byteBuf); - idx += partitionSize; // State - byteBuf.readerIndex(idx); int stateSize = byteBuf.readInt(); - idx += 4; checkLength("stateSize", stateSize, messageLength); String state = toNonEmptyString(stateSize, byteBuf); - idx += stateSize; // Source instance - byteBuf.readerIndex(idx); int srcInstanceSize = byteBuf.readInt(); - idx += 4; checkLength("srcInstanceSize", srcInstanceSize, messageLength); String srcInstance = toNonEmptyString(srcInstanceSize, byteBuf); - idx += srcInstanceSize; // Destination instance - byteBuf.readerIndex(idx); int dstInstanceSize = byteBuf.readInt(); - idx += 4; checkLength("dstInstanceSize", dstInstanceSize, messageLength); String dstInstance = toNonEmptyString(dstInstanceSize, byteBuf); - idx += dstInstanceSize; - // Position at message - byteBuf.readerIndex(idx + 4); + // Message + int messageSize = byteBuf.readInt(); + ByteBuf message = byteBuf.slice(byteBuf.readerIndex(), messageSize); // Error check if (dstInstance == null) { @@ -147,7 +124,7 @@ public class NettyHelixIPCCallbackHandler extends SimpleChannelInboundHandler<By } // Handle callback - callback.onMessage(scope, messageId, byteBuf); + callback.onMessage(scope, messageId, message); // Stats statRxMsg.mark(); http://git-wip-us.apache.org/repos/asf/helix/blob/0cf986e1/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java ---------------------------------------------------------------------- diff --git a/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java index 68f6fbc..c6393df 100644 --- a/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java +++ b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java @@ -96,7 +96,6 @@ public class NettyHelixIPCService implements HelixIPCService { // Parameters for length header field of message (tells decoder to interpret but preserve length // field in message) - private static final int MAX_FRAME_LENGTH = 1024 * 1024; private static final int LENGTH_FIELD_OFFSET = 0; private static final int LENGTH_FIELD_LENGTH = 4; private static final int LENGTH_ADJUSTMENT = -4; @@ -158,7 +157,7 @@ public class NettyHelixIPCService implements HelixIPCService { protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast( new LengthFieldBasedFrameDecoder( - MAX_FRAME_LENGTH, + config.getMaxFrameLength(), LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH, LENGTH_ADJUSTMENT, @@ -231,9 +230,6 @@ public class NettyHelixIPCService implements HelixIPCService { synchronized (channelMap) { channel = channels.get(idx); if (channel == null || !channel.isOpen()) { - if (channel != null && channel.isOpen()) { - channel.close(); - } channel = clientBootstrap.connect(destination.getSocketAddress()).sync().channel(); channels.set(idx, channel); statChannelOpen.inc(); @@ -302,6 +298,7 @@ public class NettyHelixIPCService implements HelixIPCService { private String instanceName; private int port; private int numConnections = 1; + private int maxFrameLength = 128 * 1024 * 1024; public Config setInstanceName(String instanceName) { this.instanceName = instanceName; @@ -318,6 +315,11 @@ public class NettyHelixIPCService implements HelixIPCService { return this; } + public Config setMaxFrameLength(int maxFrameLength) { + this.maxFrameLength = maxFrameLength; + return this; + } + public String getInstanceName() { return instanceName; } @@ -329,5 +331,9 @@ public class NettyHelixIPCService implements HelixIPCService { public int getNumConnections() { return numConnections; } + + public int getMaxFrameLength() { + return maxFrameLength; + } } } http://git-wip-us.apache.org/repos/asf/helix/blob/0cf986e1/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCUtils.java ---------------------------------------------------------------------- diff --git a/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCUtils.java b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCUtils.java index 77b9123..19e4d87 100644 --- a/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCUtils.java +++ b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCUtils.java @@ -45,7 +45,9 @@ public class NettyHelixIPCUtils { /** Given a byte buf w/ a certain reader index, encodes the next length bytes as a String */ public static String toNonEmptyString(int length, ByteBuf byteBuf) { if (byteBuf.readableBytes() >= length) { - return byteBuf.toString(byteBuf.readerIndex(), length, Charset.defaultCharset()); + String string = byteBuf.toString(byteBuf.readerIndex(), length, Charset.defaultCharset()); + byteBuf.readerIndex(byteBuf.readerIndex() + length); + return string; } return null; }
