This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new dcc88c65f1 [ISSUE #8166] optimize: make compression type configurable in producer clinet level dcc88c65f1 is described below commit dcc88c65f1f29b392fbb300001b386dbf1901afc Author: Humkum <1109939...@qq.com> AuthorDate: Thu May 23 13:56:30 2024 +0800 [ISSUE #8166] optimize: make compression type configurable in producer clinet level --- .../impl/producer/DefaultMQProducerImpl.java | 28 ++-------------- .../client/producer/DefaultMQProducer.java | 39 ++++++++++++++++++++++ .../rocketmq/example/benchmark/BatchProducer.java | 4 +-- .../rocketmq/example/benchmark/Producer.java | 4 +-- 4 files changed, 45 insertions(+), 30 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 6268bcc0a1..7ef3402513 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -70,9 +70,6 @@ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ServiceState; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.UtilAll; -import org.apache.rocketmq.common.compression.CompressionType; -import org.apache.rocketmq.common.compression.Compressor; -import org.apache.rocketmq.common.compression.CompressorFactory; import org.apache.rocketmq.common.help.FAQUrl; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageAccessor; @@ -118,11 +115,6 @@ public class DefaultMQProducerImpl implements MQProducerInner { private MQFaultStrategy mqFaultStrategy; private ExecutorService asyncSenderExecutor; - // compression related - private int compressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5")); - private CompressionType compressType = CompressionType.of(System.getProperty(MixAll.MESSAGE_COMPRESS_TYPE, "ZLIB")); - private final Compressor compressor = CompressorFactory.getCompressor(compressType); - // backpressure related private Semaphore semaphoreAsyncSendNum; private Semaphore semaphoreAsyncSendSize; @@ -900,7 +892,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { boolean msgBodyCompressed = false; if (this.tryToCompressMessage(msg)) { sysFlag |= MessageSysFlag.COMPRESSED_FLAG; - sysFlag |= compressType.getCompressionFlag(); + sysFlag |= this.defaultMQProducer.getCompressType().getCompressionFlag(); msgBodyCompressed = true; } @@ -1070,7 +1062,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { if (body != null) { if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) { try { - byte[] data = compressor.compress(body, compressLevel); + byte[] data = this.defaultMQProducer.getCompressor().compress(body, this.defaultMQProducer.getCompressLevel()); if (data != null) { msg.setBody(data); return true; @@ -1763,22 +1755,6 @@ public class DefaultMQProducerImpl implements MQProducerInner { return topicPublishInfoTable; } - public int getCompressLevel() { - return compressLevel; - } - - public void setCompressLevel(int compressLevel) { - this.compressLevel = compressLevel; - } - - public CompressionType getCompressType() { - return compressType; - } - - public void setCompressType(CompressionType compressType) { - this.compressType = compressType; - } - public ServiceState getServiceState() { return serviceState; } diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index b350ba074d..5304887e38 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -36,6 +36,9 @@ import org.apache.rocketmq.client.trace.TraceDispatcher; import org.apache.rocketmq.client.trace.hook.EndTransactionTraceHookImpl; import org.apache.rocketmq.client.trace.hook.SendMessageTraceHookImpl; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.compression.CompressionType; +import org.apache.rocketmq.common.compression.Compressor; +import org.apache.rocketmq.common.compression.CompressorFactory; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageBatch; import org.apache.rocketmq.common.message.MessageClientIDSetter; @@ -170,6 +173,21 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { private RPCHook rpcHook = null; + /** + * Compress level of compress algorithm. + */ + private int compressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5")); + + /** + * Compress type of compress algorithm, default using ZLIB. + */ + private CompressionType compressType = CompressionType.of(System.getProperty(MixAll.MESSAGE_COMPRESS_TYPE, "ZLIB")); + + /** + * Compressor of compress algorithm. + */ + private Compressor compressor = CompressorFactory.getCompressor(compressType); + /** * Default constructor. */ @@ -1344,4 +1362,25 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { super.setStartDetectorEnable(startDetectorEnable); this.defaultMQProducerImpl.getMqFaultStrategy().setStartDetectorEnable(startDetectorEnable); } + + public int getCompressLevel() { + return compressLevel; + } + + public void setCompressLevel(int compressLevel) { + this.compressLevel = compressLevel; + } + + public CompressionType getCompressType() { + return compressType; + } + + public void setCompressType(CompressionType compressType) { + this.compressType = compressType; + this.compressor = CompressorFactory.getCompressor(compressType); + } + + public Compressor getCompressor() { + return compressor; + } } diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java index c4a6162a5f..21a4b3b7e7 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java @@ -102,8 +102,8 @@ public class BatchProducer { String compressType = commandLine.hasOption("ct") ? commandLine.getOptionValue("ct").trim() : "ZLIB"; int compressLevel = commandLine.hasOption("cl") ? Integer.parseInt(commandLine.getOptionValue("cl")) : 5; int compressOverHowMuch = commandLine.hasOption("ch") ? Integer.parseInt(commandLine.getOptionValue("ch")) : 4096; - producer.getDefaultMQProducerImpl().setCompressType(CompressionType.of(compressType)); - producer.getDefaultMQProducerImpl().setCompressLevel(compressLevel); + producer.setCompressType(CompressionType.of(compressType)); + producer.setCompressLevel(compressLevel); producer.setCompressMsgBodyOverHowmuch(compressOverHowMuch); System.out.printf("compressType: %s compressLevel: %s%n", compressType, compressLevel); } else { diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java index 480d16b758..a945283f57 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java @@ -160,8 +160,8 @@ public class Producer { String compressType = commandLine.hasOption("ct") ? commandLine.getOptionValue("ct").trim() : "ZLIB"; int compressLevel = commandLine.hasOption("cl") ? Integer.parseInt(commandLine.getOptionValue("cl")) : 5; int compressOverHowMuch = commandLine.hasOption("ch") ? Integer.parseInt(commandLine.getOptionValue("ch")) : 4096; - producer.getDefaultMQProducerImpl().setCompressType(CompressionType.of(compressType)); - producer.getDefaultMQProducerImpl().setCompressLevel(compressLevel); + producer.setCompressType(CompressionType.of(compressType)); + producer.setCompressLevel(compressLevel); producer.setCompressMsgBodyOverHowmuch(compressOverHowMuch); System.out.printf("compressType: %s compressLevel: %s%n", compressType, compressLevel); } else {