GIRAPH-713: Provide an option to do request compression (pavanka)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/4223ccc0 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/4223ccc0 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/4223ccc0 Branch: refs/heads/release-1.1 Commit: 4223ccc08bcd3689bddb310dddedab0485f7a6bd Parents: 666d5fd Author: Pavan Kumar <[email protected]> Authored: Mon Jul 7 16:48:08 2014 -0700 Committer: Pavan Kumar <[email protected]> Committed: Mon Jul 7 16:48:08 2014 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 + .../apache/giraph/comm/netty/NettyClient.java | 21 +++++++- .../apache/giraph/comm/netty/NettyServer.java | 20 ++++++++ .../org/apache/giraph/conf/GiraphConstants.java | 5 ++ .../ImmutableClassesGiraphConfiguration.java | 54 ++++++++++++++++++++ 5 files changed, 101 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/4223ccc0/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 43aea7a..ea2f911 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.1.0 - unreleased + GIRAPH-713: Provide an option to do request compression (pavanka) + GIRAPH-923: Upgrade Netty version to a latest stable one (pavanka) GIRAPH-916: Wrong number of vertices stored reported to command line (majakabiljo) http://git-wip-us.apache.org/repos/asf/giraph/blob/4223ccc0/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java index ae40c3b..5bb5545 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java @@ -268,11 +268,20 @@ public class NettyClient { PipelineUtils.addLastWithExecutorCheck("clientInboundByteCounter", inboundByteCounter, handlerToUseExecutionGroup, executionGroup, ch); + if (conf.doCompression()) { + PipelineUtils.addLastWithExecutorCheck("compressionDecoder", + conf.getNettyCompressionDecoder(), + handlerToUseExecutionGroup, executionGroup, ch); + } PipelineUtils.addLastWithExecutorCheck( "clientOutboundByteCounter", outboundByteCounter, handlerToUseExecutionGroup, executionGroup, ch); - + if (conf.doCompression()) { + PipelineUtils.addLastWithExecutorCheck("compressionEncoder", + conf.getNettyCompressionEncoder(), + handlerToUseExecutionGroup, executionGroup, ch); + } // The following pipeline component is needed to decode the // server's SASL tokens. It is replaced with a // FixedLengthFrameDecoder (same as used with the @@ -303,10 +312,20 @@ public class NettyClient { PipelineUtils.addLastWithExecutorCheck("clientInboundByteCounter", inboundByteCounter, handlerToUseExecutionGroup, executionGroup, ch); + if (conf.doCompression()) { + PipelineUtils.addLastWithExecutorCheck("compressionDecoder", + conf.getNettyCompressionDecoder(), + handlerToUseExecutionGroup, executionGroup, ch); + } PipelineUtils.addLastWithExecutorCheck( "clientOutboundByteCounter", outboundByteCounter, handlerToUseExecutionGroup, executionGroup, ch); + if (conf.doCompression()) { + PipelineUtils.addLastWithExecutorCheck("compressionEncoder", + conf.getNettyCompressionEncoder(), + handlerToUseExecutionGroup, executionGroup, ch); + } PipelineUtils.addLastWithExecutorCheck( "fixed-length-frame-decoder", new FixedLengthFrameDecoder( http://git-wip-us.apache.org/repos/asf/giraph/blob/4223ccc0/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java index 14d4ea8..8162857 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java @@ -240,8 +240,18 @@ public class NettyServer { // configuration except for the presence of the Authorize component. PipelineUtils.addLastWithExecutorCheck("serverInboundByteCounter", inByteCounter, handlerToUseExecutionGroup, executionGroup, ch); + if (conf.doCompression()) { + PipelineUtils.addLastWithExecutorCheck("compressionDecoder", + conf.getNettyCompressionDecoder(), + handlerToUseExecutionGroup, executionGroup, ch); + } PipelineUtils.addLastWithExecutorCheck("serverOutboundByteCounter", outByteCounter, handlerToUseExecutionGroup, executionGroup, ch); + if (conf.doCompression()) { + PipelineUtils.addLastWithExecutorCheck("compressionEncoder", + conf.getNettyCompressionEncoder(), + handlerToUseExecutionGroup, executionGroup, ch); + } PipelineUtils.addLastWithExecutorCheck("requestFrameDecoder", new LengthFieldBasedFrameDecoder(1024 * 1024 * 1024, 0, 4, 0, 4), handlerToUseExecutionGroup, executionGroup, ch); @@ -280,8 +290,18 @@ public class NettyServer { }); PipelineUtils.addLastWithExecutorCheck("serverInboundByteCounter", inByteCounter, handlerToUseExecutionGroup, executionGroup, ch); + if (conf.doCompression()) { + PipelineUtils.addLastWithExecutorCheck("compressionDecoder", + conf.getNettyCompressionDecoder(), + handlerToUseExecutionGroup, executionGroup, ch); + } PipelineUtils.addLastWithExecutorCheck("serverOutboundByteCounter", outByteCounter, handlerToUseExecutionGroup, executionGroup, ch); + if (conf.doCompression()) { + PipelineUtils.addLastWithExecutorCheck("compressionEncoder", + conf.getNettyCompressionEncoder(), + handlerToUseExecutionGroup, executionGroup, ch); + } PipelineUtils.addLastWithExecutorCheck("requestFrameDecoder", new LengthFieldBasedFrameDecoder(1024 * 1024 * 1024, 0, 4, 0, 4), handlerToUseExecutionGroup, executionGroup, ch); http://git-wip-us.apache.org/repos/asf/giraph/blob/4223ccc0/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index 7d7ceb2..1879a25 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -613,6 +613,11 @@ public interface GiraphConstants { new BooleanConfOption("giraph.nettySimulateFirstResponseFailed", false, "Netty simulate a first response failed"); + /** Netty - set which compression to use */ + StrConfOption NETTY_COMPRESSION_ALGORITHM = + new StrConfOption("giraph.nettyCompressionAlgorithm", "", + "Which compression algorithm to use in netty"); + /** Max resolve address attempts */ IntConfOption MAX_RESOLVE_ADDRESS_ATTEMPTS = new IntConfOption("giraph.maxResolveAddressAttempts", 5, http://git-wip-us.apache.org/repos/asf/giraph/blob/4223ccc0/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java index 3d7b3db..3121fa8 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java @@ -19,6 +19,12 @@ package org.apache.giraph.conf; import com.google.common.base.Preconditions; +import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.handler.codec.MessageToByteEncoder; +import io.netty.handler.codec.compression.JdkZlibDecoder; +import io.netty.handler.codec.compression.JdkZlibEncoder; +import io.netty.handler.codec.compression.SnappyFramedDecoder; +import io.netty.handler.codec.compression.SnappyFramedEncoder; import org.apache.giraph.aggregators.AggregatorWriter; import org.apache.giraph.combiner.MessageCombiner; import org.apache.giraph.edge.Edge; @@ -1220,4 +1226,52 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable, } classes.setMessageCombiner(superstepClasses.getMessageCombinerClass()); } + + /** + * Has the user enabled compression in netty client & server + * + * @return true if ok to do compression of netty requests + */ + public boolean doCompression() { + switch (GiraphConstants.NETTY_COMPRESSION_ALGORITHM.get(this)) { + case "SNAPPY": + return true; + case "INFLATE": + return true; + default: + return false; + } + } + + /** + * Get encoder for message compression in netty + * + * @return message to byte encoder + */ + public MessageToByteEncoder getNettyCompressionEncoder() { + switch (GiraphConstants.NETTY_COMPRESSION_ALGORITHM.get(this)) { + case "SNAPPY": + return new SnappyFramedEncoder(); + case "INFLATE": + return new JdkZlibEncoder(); + default: + return null; + } + } + + /** + * Get decoder for message decompression in netty + * + * @return byte to message decoder + */ + public ByteToMessageDecoder getNettyCompressionDecoder() { + switch (GiraphConstants.NETTY_COMPRESSION_ALGORITHM.get(this)) { + case "SNAPPY": + return new SnappyFramedDecoder(true); + case "INFLATE": + return new JdkZlibDecoder(); + default: + return null; + } + } }
