This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new dcc88c65f1 [ISSUE #8166] optimize: make compression type configurable 
in producer clinet level
dcc88c65f1 is described below

commit dcc88c65f1f29b392fbb300001b386dbf1901afc
Author: Humkum <1109939...@qq.com>
AuthorDate: Thu May 23 13:56:30 2024 +0800

    [ISSUE #8166] optimize: make compression type configurable in producer 
clinet level
---
 .../impl/producer/DefaultMQProducerImpl.java       | 28 ++--------------
 .../client/producer/DefaultMQProducer.java         | 39 ++++++++++++++++++++++
 .../rocketmq/example/benchmark/BatchProducer.java  |  4 +--
 .../rocketmq/example/benchmark/Producer.java       |  4 +--
 4 files changed, 45 insertions(+), 30 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 6268bcc0a1..7ef3402513 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -70,9 +70,6 @@ import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ServiceState;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.common.compression.CompressionType;
-import org.apache.rocketmq.common.compression.Compressor;
-import org.apache.rocketmq.common.compression.CompressorFactory;
 import org.apache.rocketmq.common.help.FAQUrl;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageAccessor;
@@ -118,11 +115,6 @@ public class DefaultMQProducerImpl implements 
MQProducerInner {
     private MQFaultStrategy mqFaultStrategy;
     private ExecutorService asyncSenderExecutor;
 
-    // compression related
-    private int compressLevel = 
Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));
-    private CompressionType compressType = 
CompressionType.of(System.getProperty(MixAll.MESSAGE_COMPRESS_TYPE, "ZLIB"));
-    private final Compressor compressor = 
CompressorFactory.getCompressor(compressType);
-
     // backpressure related
     private Semaphore semaphoreAsyncSendNum;
     private Semaphore semaphoreAsyncSendSize;
@@ -900,7 +892,7 @@ public class DefaultMQProducerImpl implements 
MQProducerInner {
                 boolean msgBodyCompressed = false;
                 if (this.tryToCompressMessage(msg)) {
                     sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
-                    sysFlag |= compressType.getCompressionFlag();
+                    sysFlag |= 
this.defaultMQProducer.getCompressType().getCompressionFlag();
                     msgBodyCompressed = true;
                 }
 
@@ -1070,7 +1062,7 @@ public class DefaultMQProducerImpl implements 
MQProducerInner {
         if (body != null) {
             if (body.length >= 
this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
                 try {
-                    byte[] data = compressor.compress(body, compressLevel);
+                    byte[] data = 
this.defaultMQProducer.getCompressor().compress(body, 
this.defaultMQProducer.getCompressLevel());
                     if (data != null) {
                         msg.setBody(data);
                         return true;
@@ -1763,22 +1755,6 @@ public class DefaultMQProducerImpl implements 
MQProducerInner {
         return topicPublishInfoTable;
     }
 
-    public int getCompressLevel() {
-        return compressLevel;
-    }
-
-    public void setCompressLevel(int compressLevel) {
-        this.compressLevel = compressLevel;
-    }
-
-    public CompressionType getCompressType() {
-        return compressType;
-    }
-
-    public void setCompressType(CompressionType compressType) {
-        this.compressType = compressType;
-    }
-
     public ServiceState getServiceState() {
         return serviceState;
     }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
 
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index b350ba074d..5304887e38 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -36,6 +36,9 @@ import org.apache.rocketmq.client.trace.TraceDispatcher;
 import org.apache.rocketmq.client.trace.hook.EndTransactionTraceHookImpl;
 import org.apache.rocketmq.client.trace.hook.SendMessageTraceHookImpl;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.compression.CompressionType;
+import org.apache.rocketmq.common.compression.Compressor;
+import org.apache.rocketmq.common.compression.CompressorFactory;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageBatch;
 import org.apache.rocketmq.common.message.MessageClientIDSetter;
@@ -170,6 +173,21 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
 
     private RPCHook rpcHook = null;
 
+    /**
+     * Compress level of compress algorithm.
+     */
+    private int compressLevel = 
Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));
+
+    /**
+     * Compress type of compress algorithm, default using ZLIB.
+     */
+    private CompressionType compressType = 
CompressionType.of(System.getProperty(MixAll.MESSAGE_COMPRESS_TYPE, "ZLIB"));
+
+    /**
+     * Compressor of compress algorithm.
+     */
+    private Compressor compressor = 
CompressorFactory.getCompressor(compressType);
+
     /**
      * Default constructor.
      */
@@ -1344,4 +1362,25 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
         super.setStartDetectorEnable(startDetectorEnable);
         
this.defaultMQProducerImpl.getMqFaultStrategy().setStartDetectorEnable(startDetectorEnable);
     }
+
+    public int getCompressLevel() {
+        return compressLevel;
+    }
+
+    public void setCompressLevel(int compressLevel) {
+        this.compressLevel = compressLevel;
+    }
+
+    public CompressionType getCompressType() {
+        return compressType;
+    }
+
+    public void setCompressType(CompressionType compressType) {
+        this.compressType = compressType;
+        this.compressor = CompressorFactory.getCompressor(compressType);
+    }
+
+    public Compressor getCompressor() {
+        return compressor;
+    }
 }
diff --git 
a/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
 
b/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
index c4a6162a5f..21a4b3b7e7 100644
--- 
a/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
+++ 
b/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
@@ -102,8 +102,8 @@ public class BatchProducer {
             String compressType = commandLine.hasOption("ct") ? 
commandLine.getOptionValue("ct").trim() : "ZLIB";
             int compressLevel = commandLine.hasOption("cl") ? 
Integer.parseInt(commandLine.getOptionValue("cl")) : 5;
             int compressOverHowMuch = commandLine.hasOption("ch") ? 
Integer.parseInt(commandLine.getOptionValue("ch")) : 4096;
-            
producer.getDefaultMQProducerImpl().setCompressType(CompressionType.of(compressType));
-            
producer.getDefaultMQProducerImpl().setCompressLevel(compressLevel);
+            producer.setCompressType(CompressionType.of(compressType));
+            producer.setCompressLevel(compressLevel);
             producer.setCompressMsgBodyOverHowmuch(compressOverHowMuch);
             System.out.printf("compressType: %s compressLevel: %s%n", 
compressType, compressLevel);
         } else {
diff --git 
a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java 
b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
index 480d16b758..a945283f57 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
@@ -160,8 +160,8 @@ public class Producer {
             String compressType = commandLine.hasOption("ct") ? 
commandLine.getOptionValue("ct").trim() : "ZLIB";
             int compressLevel = commandLine.hasOption("cl") ? 
Integer.parseInt(commandLine.getOptionValue("cl")) : 5;
             int compressOverHowMuch = commandLine.hasOption("ch") ? 
Integer.parseInt(commandLine.getOptionValue("ch")) : 4096;
-            
producer.getDefaultMQProducerImpl().setCompressType(CompressionType.of(compressType));
-            
producer.getDefaultMQProducerImpl().setCompressLevel(compressLevel);
+            producer.setCompressType(CompressionType.of(compressType));
+            producer.setCompressLevel(compressLevel);
             producer.setCompressMsgBodyOverHowmuch(compressOverHowMuch);
             System.out.printf("compressType: %s compressLevel: %s%n", 
compressType, compressLevel);
         } else {

Reply via email to