This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new b1daa3c0f0 [ISSUE# 9333] Use fastjson2 in broker module (#9334)
b1daa3c0f0 is described below
commit b1daa3c0f0e8dd85f181331e4ec4ff276fb361c0
Author: yx9o <[email protected]>
AuthorDate: Tue Apr 22 10:48:56 2025 +0800
[ISSUE# 9333] Use fastjson2 in broker module (#9334)
* [#ISSUE 9333] Use fastjson2 in broker module
* Update
* Fix the serialization failure caused by improper get/set naming
---
broker/BUILD.bazel | 2 -
.../rocketmq/broker/RocksDBConfigManager.java | 7 +-
.../broker/offset/ConsumerOrderInfoManager.java | 19 ++-
.../rocketmq/broker/pop/PopConsumerRecord.java | 8 +-
.../rocketmq/broker/pop/PopConsumerService.java | 35 ++--
.../broker/processor/AckMessageProcessor.java | 7 +-
.../broker/processor/AdminBrokerProcessor.java | 49 +++---
.../processor/ChangeInvisibleTimeProcessor.java | 9 +-
.../broker/processor/PopBufferMergeService.java | 19 ++-
.../broker/processor/PopMessageProcessor.java | 31 ++--
.../broker/processor/PopReviveService.java | 25 +--
.../broker/topic/TopicQueueMappingManager.java | 21 ++-
.../broker/transaction/TransactionMetrics.java | 44 +++--
.../rocketmq/broker/RocksDBConfigManagerTest.java | 75 +++++++++
.../broker/processor/AdminBrokerProcessorTest.java | 79 ++++++++-
.../ChangeInvisibleTimeProcessorTest.java | 57 ++++++-
.../processor/PopBufferMergeServiceTest.java | 182 ++++++++++++++++-----
.../broker/processor/PopMessageProcessorTest.java | 50 +++++-
.../broker/processor/PopReviveServiceTest.java | 83 ++++++++--
.../broker/topic/TopicQueueMappingManagerTest.java | 68 ++++++--
.../transaction/queue/TransactionMetricsTest.java | 59 ++++++-
.../apache/rocketmq/store/pop/PopCheckPoint.java | 5 +-
22 files changed, 715 insertions(+), 219 deletions(-)
diff --git a/broker/BUILD.bazel b/broker/BUILD.bazel
index 77d456bc16..f00d01e8cc 100644
--- a/broker/BUILD.bazel
+++ b/broker/BUILD.bazel
@@ -32,7 +32,6 @@ java_library(
"//tieredstore",
"@maven//:org_slf4j_slf4j_api",
"@maven//:ch_qos_logback_logback_classic",
- "@maven//:com_alibaba_fastjson",
"@maven//:com_alibaba_fastjson2_fastjson2",
"@maven//:com_github_luben_zstd_jni",
"@maven//:com_google_guava_guava",
@@ -84,7 +83,6 @@ java_library(
"//remoting",
"//store",
"//tieredstore",
- "@maven//:com_alibaba_fastjson",
"@maven//:com_alibaba_fastjson2_fastjson2",
"@maven//:com_google_guava_guava",
"@maven//:io_netty_netty_all",
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java
index ee2d4e54a6..c59c00c040 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java
@@ -16,9 +16,7 @@
*/
package org.apache.rocketmq.broker;
-import com.alibaba.fastjson.JSON;
-import java.nio.charset.StandardCharsets;
-import java.util.function.BiConsumer;
+import com.alibaba.fastjson2.JSON;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.config.ConfigRocksDBStorage;
import org.apache.rocketmq.common.constant.LoggerName;
@@ -31,6 +29,9 @@ import org.rocksdb.RocksIterator;
import org.rocksdb.Statistics;
import org.rocksdb.WriteBatch;
+import java.nio.charset.StandardCharsets;
+import java.util.function.BiConsumer;
+
public class RocksDBConfigManager {
protected static final Logger BROKER_LOG =
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
public volatile boolean isStop = false;
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
index 120f5b104c..9f173daf46 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
@@ -16,17 +16,9 @@
*/
package org.apache.rocketmq.broker.offset;
-import com.alibaba.fastjson.annotation.JSONField;
+import com.alibaba.fastjson2.annotation.JSONField;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.ConfigManager;
@@ -37,6 +29,15 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
public class ConsumerOrderInfoManager extends ConfigManager {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java
index 1ee01fea1c..661ace9bcb 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java
@@ -16,9 +16,9 @@
*/
package org.apache.rocketmq.broker.pop;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
-import com.alibaba.fastjson.annotation.JSONField;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.annotation.JSONField;
+
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
@@ -119,7 +119,7 @@ public class PopConsumerRecord {
}
public static PopConsumerRecord decode(byte[] body) {
- return JSONObject.parseObject(body, PopConsumerRecord.class);
+ return JSON.parseObject(body, PopConsumerRecord.class);
}
public long getPopTime() {
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
index 1138ff4afe..a2198f2560 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
@@ -16,25 +16,9 @@
*/
package org.apache.rocketmq.broker.pop;
-import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson2.JSON;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
-import java.nio.ByteBuffer;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Objects;
-import java.util.Queue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.rocketmq.broker.BrokerController;
@@ -65,6 +49,23 @@ import org.apache.rocketmq.store.pop.PopCheckPoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.ByteBuffer;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
public class PopConsumerService extends ServiceThread {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
index 23a4f6167c..06a531552a 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
@@ -16,11 +16,9 @@
*/
package org.apache.rocketmq.broker.processor;
-import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson2.JSON;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
-import java.util.BitSet;
-import java.nio.charset.StandardCharsets;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.metrics.PopMetricsManager;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
@@ -52,6 +50,9 @@ import
org.apache.rocketmq.store.exception.ConsumeQueueException;
import org.apache.rocketmq.store.pop.AckMsg;
import org.apache.rocketmq.store.pop.BatchAckMsg;
+import java.nio.charset.StandardCharsets;
+import java.util.BitSet;
+
public class AckMessageProcessor implements NettyRequestProcessor {
private static final Logger POP_LOGGER =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 4200f34bde..c747fa15af 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -16,33 +16,12 @@
*/
package org.apache.rocketmq.broker.processor;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
import com.google.common.collect.Sets;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.opentelemetry.api.common.Attributes;
-import java.io.UnsupportedEncodingException;
-import java.net.UnknownHostException;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.AccessValidator;
@@ -236,6 +215,28 @@ import org.apache.rocketmq.store.timer.TimerCheckpoint;
import org.apache.rocketmq.store.timer.TimerMessageStore;
import org.apache.rocketmq.store.util.LibC;
+import java.io.UnsupportedEncodingException;
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_INVOCATION_STATUS;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
import static
org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
@@ -2891,7 +2892,7 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
} else {
ConsumerFilterData filterData =
this.brokerController.getConsumerFilterManager()
.get(requestHeader.getTopic(),
requestHeader.getConsumerGroup());
- body.setFilterData(JSON.toJSONString(filterData, true));
+ body.setFilterData(JSON.toJSONString(filterData));
messageFilter = new ExpressionMessageFilter(subscriptionData,
filterData,
this.brokerController.getConsumerFilterManager());
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
index de72ee7baf..f288c001b8 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
@@ -16,12 +16,9 @@
*/
package org.apache.rocketmq.broker.processor;
-import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson2.JSON;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.nio.charset.StandardCharsets;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.metrics.PopMetricsManager;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
@@ -50,6 +47,10 @@ import
org.apache.rocketmq.store.exception.ConsumeQueueException;
import org.apache.rocketmq.store.pop.AckMsg;
import org.apache.rocketmq.store.pop.PopCheckPoint;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
public class ChangeInvisibleTimeProcessor implements NettyRequestProcessor {
private static final Logger POP_LOGGER =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
private final BrokerController brokerController;
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
index 820388b18d..7c309ec5c4 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
@@ -16,15 +16,7 @@
*/
package org.apache.rocketmq.broker.processor;
-import com.alibaba.fastjson.JSON;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.atomic.AtomicInteger;
+import com.alibaba.fastjson2.JSON;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.metrics.PopMetricsManager;
import org.apache.rocketmq.common.KeyBuilder;
@@ -44,6 +36,15 @@ import org.apache.rocketmq.store.pop.AckMsg;
import org.apache.rocketmq.store.pop.BatchAckMsg;
import org.apache.rocketmq.store.pop.PopCheckPoint;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicInteger;
+
public class PopBufferMergeService extends ServiceThread {
private static final Logger POP_LOGGER =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
ConcurrentHashMap<String/*mergeKey*/, PopCheckPointWrapper>
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index d73acc84df..9f55269f39 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -16,27 +16,13 @@
*/
package org.apache.rocketmq.broker.processor;
-import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson2.JSON;
import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.FileRegion;
import io.opentelemetry.api.common.Attributes;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Objects;
-import java.util.Random;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.filter.ConsumerFilterData;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
@@ -91,6 +77,21 @@ import org.apache.rocketmq.store.pop.AckMsg;
import org.apache.rocketmq.store.pop.BatchAckMsg;
import org.apache.rocketmq.store.pop.PopCheckPoint;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_RETRY;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
index e1ead86169..dcffaf50cc 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
@@ -16,18 +16,8 @@
*/
package org.apache.rocketmq.broker.processor;
-import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson2.JSON;
import io.opentelemetry.api.common.Attributes;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.rocketmq.broker.BrokerController;
@@ -37,12 +27,12 @@ import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.PopAckConstants;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicFilterType;
+import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
@@ -61,6 +51,17 @@ import org.apache.rocketmq.store.pop.AckMsg;
import org.apache.rocketmq.store.pop.BatchAckMsg;
import org.apache.rocketmq.store.pop.PopCheckPoint;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC;
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index 6b9cf15938..dfbe5d347a 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -16,13 +16,8 @@
*/
package org.apache.rocketmq.broker.topic;
-import com.alibaba.fastjson.JSON;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONWriter;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.ConfigManager;
@@ -40,6 +35,13 @@ import
org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.remoting.rpc.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.rpc.TopicRequestHeader;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
import static
org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
public class TopicQueueMappingManager extends ConfigManager {
@@ -149,7 +151,10 @@ public class TopicQueueMappingManager extends
ConfigManager {
TopicQueueMappingSerializeWrapper wrapper = new
TopicQueueMappingSerializeWrapper();
wrapper.setTopicQueueMappingInfoMap(topicQueueMappingTable);
wrapper.setDataVersion(this.dataVersion);
- return JSON.toJSONString(wrapper, pretty);
+ if (pretty) {
+ return JSON.toJSONString(wrapper, JSONWriter.Feature.PrettyFormat);
+ }
+ return JSON.toJSONString(wrapper);
}
@Override
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java
index ad30c73c60..d8dd811db2 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java
@@ -16,16 +16,21 @@
*/
package org.apache.rocketmq.broker.transaction;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONWriter;
import com.google.common.io.Files;
-import java.io.BufferedWriter;
+import org.apache.rocketmq.common.ConfigManager;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.protocol.DataVersion;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.nio.charset.StandardCharsets;
+import java.io.OutputStream;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
@@ -33,14 +38,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.rocketmq.common.ConfigManager;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.topic.TopicValidator;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.apache.rocketmq.remoting.protocol.DataVersion;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
-
public class TransactionMetrics extends ConfigManager {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -90,11 +87,11 @@ public class TransactionMetrics extends ConfigManager {
this.transactionCounts = transactionCounts;
}
- protected void write0(Writer writer) {
+ protected void write0(OutputStream out) {
TransactionMetricsSerializeWrapper wrapper = new
TransactionMetricsSerializeWrapper();
wrapper.setTransactionCount(transactionCounts);
wrapper.setDataVersion(dataVersion);
- JSON.writeJSONString(writer, wrapper,
SerializerFeature.BrowserCompatible);
+ JSON.writeTo(out, wrapper, JSONWriter.Feature.BrowserCompatible);
}
@Override
@@ -182,7 +179,7 @@ public class TransactionMetrics extends ConfigManager {
String config = configFilePath();
String temp = config + ".tmp";
String backup = config + ".bak";
- BufferedWriter bufferedWriter = null;
+ FileOutputStream outputStream = null;
try {
File tmpFile = new File(temp);
File parentDirectory = tmpFile.getParentFile();
@@ -199,11 +196,10 @@ public class TransactionMetrics extends ConfigManager {
return;
}
}
- bufferedWriter = new BufferedWriter(new OutputStreamWriter(new
FileOutputStream(tmpFile, false),
- StandardCharsets.UTF_8));
- write0(bufferedWriter);
- bufferedWriter.flush();
- bufferedWriter.close();
+ outputStream = new FileOutputStream(tmpFile, false);
+ write0(outputStream);
+ outputStream.flush();
+ outputStream.close();
log.debug("Finished writing tmp file: {}", temp);
File configFile = new File(config);
@@ -216,9 +212,9 @@ public class TransactionMetrics extends ConfigManager {
} catch (IOException e) {
log.error("Failed to persist {}", temp, e);
} finally {
- if (null != bufferedWriter) {
+ if (null != outputStream) {
try {
- bufferedWriter.close();
+ outputStream.close();
} catch (IOException ignore) {
}
}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/RocksDBConfigManagerTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/RocksDBConfigManagerTest.java
new file mode 100644
index 0000000000..d9feb6a782
--- /dev/null
+++
b/broker/src/test/java/org/apache/rocketmq/broker/RocksDBConfigManagerTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker;
+
+import com.alibaba.fastjson2.JSON;
+import org.apache.rocketmq.common.config.ConfigRocksDBStorage;
+import org.apache.rocketmq.remoting.protocol.DataVersion;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+public class RocksDBConfigManagerTest {
+
+ private ConfigRocksDBStorage configRocksDBStorage;
+
+ private RocksDBConfigManager rocksDBConfigManager;
+
+ @Before
+ public void setUp() throws IllegalAccessException {
+ configRocksDBStorage = mock(ConfigRocksDBStorage.class);
+ rocksDBConfigManager = spy(new RocksDBConfigManager("testPath", 1000L,
null));
+ rocksDBConfigManager.configRocksDBStorage = configRocksDBStorage;
+ }
+
+ @Test
+ public void testLoadDataVersion() throws Exception {
+ DataVersion expected = new DataVersion();
+ expected.nextVersion();
+ String jsonData = JSON.toJSONString(expected);
+ byte[] mockDataVersion = jsonData.getBytes(StandardCharsets.UTF_8);
+
+
when(rocksDBConfigManager.configRocksDBStorage.getKvDataVersion()).thenReturn(mockDataVersion);
+
+ boolean result = rocksDBConfigManager.loadDataVersion();
+
+ assertTrue(result);
+ assertEquals(expected.getCounter().get(),
rocksDBConfigManager.getKvDataVersion().getCounter().get());
+ assertEquals(expected.getTimestamp(),
rocksDBConfigManager.getKvDataVersion().getTimestamp());
+ }
+
+ @Test
+ public void testUpdateKvDataVersion() throws Exception {
+ rocksDBConfigManager.updateKvDataVersion();
+
+ DataVersion expectedDataVersion =
rocksDBConfigManager.getKvDataVersion();
+ verify(rocksDBConfigManager.configRocksDBStorage,
times(1)).updateKvDataVersion(
+
eq(JSON.toJSONString(expectedDataVersion).getBytes(StandardCharsets.UTF_8))
+ );
+ }
+}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
index 959b147d9d..90c333b770 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
@@ -16,7 +16,8 @@
*/
package org.apache.rocketmq.broker.processor;
-import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
import com.google.common.collect.Sets;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
@@ -34,10 +35,10 @@ import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.broker.client.ConsumerManager;
import org.apache.rocketmq.broker.client.net.Broker2Client;
-import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
-import org.apache.rocketmq.broker.schedule.ScheduleMessageService;
import org.apache.rocketmq.broker.config.v1.RocksDBSubscriptionGroupManager;
import org.apache.rocketmq.broker.config.v1.RocksDBTopicConfigManager;
+import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
+import org.apache.rocketmq.broker.schedule.ScheduleMessageService;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.BrokerConfig;
@@ -73,6 +74,7 @@ import
org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.body.QueryCorrectionOffsetBody;
import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.body.UserInfo;
+import
org.apache.rocketmq.remoting.protocol.header.CheckRocksdbCqWriteProgressRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.CreateAclRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.CreateTopicRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.CreateUserRequestHeader;
@@ -87,11 +89,13 @@ import
org.apache.rocketmq.remoting.protocol.header.GetConsumerStatusRequestHead
import
org.apache.rocketmq.remoting.protocol.header.GetEarliestMsgStoretimeRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.GetSubscriptionGroupConfigRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.GetTopicConfigRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetUserRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ListAclsRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ListUsersRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.NotifyMinBrokerIdChangeRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.QueryConsumeQueueRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.QueryCorrectionOffsetHeader;
import
org.apache.rocketmq.remoting.protocol.header.QuerySubscriptionByConsumerRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.QueryTopicConsumeByWhoRequestHeader;
@@ -104,6 +108,7 @@ import
org.apache.rocketmq.remoting.protocol.header.UpdateAclRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.UpdateUserRequestHeader;
import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
+import
org.apache.rocketmq.remoting.protocol.statictopic.TopicConfigAndQueueMapping;
import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.store.CommitLog;
import org.apache.rocketmq.store.DefaultMessageStore;
@@ -111,6 +116,7 @@ import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.logfile.DefaultMappedFile;
+import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
import org.apache.rocketmq.store.stats.BrokerStats;
import org.apache.rocketmq.store.timer.TimerCheckpoint;
import org.apache.rocketmq.store.timer.TimerMessageStore;
@@ -145,6 +151,8 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.LongAdder;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
@@ -717,7 +725,7 @@ public class AdminBrokerProcessorTest {
consumerOffsetManager = mock(ConsumerOffsetManager.class);
when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
ConsumerOffsetManager consumerOffset = new ConsumerOffsetManager();
-
when(consumerOffsetManager.encode()).thenReturn(JSON.toJSONString(consumerOffset,
false));
+
when(consumerOffsetManager.encode()).thenReturn(JSON.toJSONString(consumerOffset));
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null);
RemotingCommand response =
adminBrokerProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
@@ -1328,6 +1336,69 @@ public class AdminBrokerProcessorTest {
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}
+ @Test
+ public void testGetSubscriptionGroup() throws RemotingCommandException {
+
brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().put("group",
new SubscriptionGroupConfig());
+ GetSubscriptionGroupConfigRequestHeader requestHeader = new
GetSubscriptionGroupConfigRequestHeader();
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_SUBSCRIPTIONGROUP_CONFIG,
requestHeader);
+ requestHeader.setGroup("group");
+ request.makeCustomHeaderToNet();
+ RemotingCommand response =
adminBrokerProcessor.processRequest(handlerContext, request);
+ assertEquals(ResponseCode.SUCCESS, response.getCode());
+ }
+
+ @Test
+ public void testCheckRocksdbCqWriteProgress() throws
RemotingCommandException {
+ CheckRocksdbCqWriteProgressRequestHeader requestHeader = new
CheckRocksdbCqWriteProgressRequestHeader();
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.CHECK_ROCKSDB_CQ_WRITE_PROGRESS,
requestHeader);
+ requestHeader.setTopic("topic");
+ request.makeCustomHeaderToNet();
+ RemotingCommand response =
adminBrokerProcessor.processRequest(handlerContext, request);
+ assertEquals(ResponseCode.SUCCESS, response.getCode());
+ }
+
+ @Test
+ public void testQueryConsumeQueue() throws RemotingCommandException {
+ messageStore = mock(MessageStore.class);
+ ConsumeQueueInterface consumeQueue = mock(ConsumeQueueInterface.class);
+ when(consumeQueue.getMinOffsetInQueue()).thenReturn(0L);
+ when(consumeQueue.getMaxOffsetInQueue()).thenReturn(1L);
+ when(messageStore.getConsumeQueue(anyString(),
anyInt())).thenReturn(consumeQueue);
+ when(brokerController.getMessageStore()).thenReturn(messageStore);
+ QueryConsumeQueueRequestHeader requestHeader = new
QueryConsumeQueueRequestHeader();
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUME_QUEUE,
requestHeader);
+ requestHeader.setTopic("topic");
+ requestHeader.setQueueId(0);
+ request.makeCustomHeaderToNet();
+ RemotingCommand response =
adminBrokerProcessor.processRequest(handlerContext, request);
+ assertEquals(ResponseCode.SUCCESS, response.getCode());
+ }
+
+ @Test
+ public void testProcessRequest_GetTopicConfig() throws Exception {
+ GetTopicConfigRequestHeader requestHeader = new
GetTopicConfigRequestHeader();
+ requestHeader.setTopic("testTopic");
+
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_CONFIG,
requestHeader);
+ request.makeCustomHeaderToNet();
+
+ TopicConfig topicConfig = new TopicConfig();
+ topicConfig.setTopicName("testTopic");
+ TopicConfigManager topicConfigManager = mock(TopicConfigManager.class);
+
when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
+ when(topicConfigManager.selectTopicConfig("testTopic"))
+ .thenReturn(topicConfig);
+
+ RemotingCommand response =
adminBrokerProcessor.processRequest(handlerContext, request);
+
+ assertNotNull(response);
+ assertEquals(ResponseCode.SUCCESS, response.getCode());
+
+ String responseBody = new String(response.getBody(),
StandardCharsets.UTF_8);
+ TopicConfigAndQueueMapping result =
JSONObject.parseObject(responseBody, TopicConfigAndQueueMapping.class);
+ assertEquals("testTopic", result.getTopicName());
+ }
+
private ResetOffsetRequestHeader createRequestHeader(String topic,String
group,long timestamp,boolean force,long offset,int queueId) {
ResetOffsetRequestHeader requestHeader = new
ResetOffsetRequestHeader();
requestHeader.setTopic(topic);
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
index e15d51b4a8..77490dbd69 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
@@ -18,12 +18,11 @@ package org.apache.rocketmq.broker.processor;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
-import java.lang.reflect.Field;
-import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.net.Broker2Client;
import org.apache.rocketmq.broker.failover.EscapeBridge;
+import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.MessageConst;
@@ -41,10 +40,12 @@ import
org.apache.rocketmq.remoting.protocol.heartbeat.ConsumerData;
import org.apache.rocketmq.store.AppendMessageResult;
import org.apache.rocketmq.store.AppendMessageStatus;
import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.exception.ConsumeQueueException;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -52,8 +53,13 @@ import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
+import java.lang.reflect.Field;
+import java.util.concurrent.CompletableFuture;
+
import static
org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
@@ -162,4 +168,51 @@ public class ChangeInvisibleTimeProcessorTest {
assertThat(responseToReturn.getCode()).isEqualTo(ResponseCode.NO_MESSAGE);
assertThat(responseToReturn.getOpaque()).isEqualTo(request.getOpaque());
}
+
+ @Test
+ public void testProcessRequestAsync_JsonParsing() throws Exception {
+ Channel mockChannel = mock(Channel.class);
+ RemotingCommand mockRequest = mock(RemotingCommand.class);
+ BrokerController mockBrokerController = mock(BrokerController.class);
+ TopicConfigManager mockTopicConfigManager =
mock(TopicConfigManager.class);
+ MessageStore mockMessageStore = mock(MessageStore.class);
+ BrokerConfig mockBrokerConfig = mock(BrokerConfig.class);
+ BrokerStatsManager mockBrokerStatsManager =
mock(BrokerStatsManager.class);
+ PopMessageProcessor mockPopMessageProcessor =
mock(PopMessageProcessor.class);
+ PopBufferMergeService mockPopBufferMergeService =
mock(PopBufferMergeService.class);
+
+
when(mockBrokerController.getTopicConfigManager()).thenReturn(mockTopicConfigManager);
+
when(mockBrokerController.getMessageStore()).thenReturn(mockMessageStore);
+
when(mockBrokerController.getBrokerConfig()).thenReturn(mockBrokerConfig);
+
when(mockBrokerController.getBrokerStatsManager()).thenReturn(mockBrokerStatsManager);
+
when(mockBrokerController.getPopMessageProcessor()).thenReturn(mockPopMessageProcessor);
+
when(mockPopMessageProcessor.getPopBufferMergeService()).thenReturn(mockPopBufferMergeService);
+ when(mockPopBufferMergeService.addAk(anyInt(),
any())).thenReturn(false);
+ when(mockBrokerController.getEscapeBridge()).thenReturn(escapeBridge);
+ PutMessageResult mockPutMessageResult = new
PutMessageResult(PutMessageStatus.PUT_OK, null, true);
+
when(mockBrokerController.getEscapeBridge().asyncPutMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenReturn(CompletableFuture.completedFuture(mockPutMessageResult));
+
+ TopicConfig topicConfig = new TopicConfig();
+ topicConfig.setReadQueueNums(4);
+
when(mockTopicConfigManager.selectTopicConfig(anyString())).thenReturn(topicConfig);
+ when(mockMessageStore.getMinOffsetInQueue(anyString(),
anyInt())).thenReturn(0L);
+ when(mockMessageStore.getMaxOffsetInQueue(anyString(),
anyInt())).thenReturn(10L);
+
when(mockBrokerConfig.isPopConsumerKVServiceEnable()).thenReturn(false);
+
+ ChangeInvisibleTimeRequestHeader requestHeader = new
ChangeInvisibleTimeRequestHeader();
+ requestHeader.setTopic("TestTopic");
+ requestHeader.setQueueId(1);
+ requestHeader.setOffset(5L);
+ requestHeader.setConsumerGroup("TestGroup");
+ requestHeader.setExtraInfo("0 10000 10000 0 TestBroker 1");
+ requestHeader.setInvisibleTime(60000L);
+
when(mockRequest.decodeCommandCustomHeader(ChangeInvisibleTimeRequestHeader.class)).thenReturn(requestHeader);
+
+ ChangeInvisibleTimeProcessor processor = new
ChangeInvisibleTimeProcessor(mockBrokerController);
+ CompletableFuture<RemotingCommand> futureResponse =
processor.processRequestAsync(mockChannel, mockRequest, true);
+
+ RemotingCommand response = futureResponse.get();
+ assertNotNull(response);
+ assertEquals(ResponseCode.SUCCESS, response.getCode());
+ }
}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java
index acc7a3da74..33d6820a7e 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java
@@ -16,19 +16,20 @@
*/
package org.apache.rocketmq.broker.processor;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
+import com.alibaba.fastjson2.JSON;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.broker.client.ClientChannelInfo;
+import org.apache.rocketmq.broker.client.ConsumerManager;
+import org.apache.rocketmq.broker.failover.EscapeBridge;
import org.apache.rocketmq.broker.schedule.ScheduleMessageService;
+import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.remoting.netty.NettyClientConfig;
-import org.apache.rocketmq.remoting.netty.NettyServerConfig;
-import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumerData;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.pop.AckMsg;
import org.apache.rocketmq.store.pop.PopCheckPoint;
@@ -37,56 +38,82 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
-import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
-import static
org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData;
+import java.lang.reflect.Method;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.Silent.class)
public class PopBufferMergeServiceTest {
- @Spy
- private BrokerController brokerController = new BrokerController(new
BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new
MessageStoreConfig());
+
@Mock
+ private BrokerController brokerController;
+
private PopMessageProcessor popMessageProcessor;
+
+ @Mock
+ private ScheduleMessageService scheduleMessageService;
+
@Mock
- private ChannelHandlerContext handlerContext;
+ private TopicConfigManager topicConfigManager;
+
+ @Mock
+ private ConsumerManager consumerManager;
+
@Mock
private DefaultMessageStore messageStore;
- private ScheduleMessageService scheduleMessageService;
- private ClientChannelInfo clientChannelInfo;
- private String group = "FooBarGroup";
- private String topic = "FooBar";
+
+ @Mock
+ private MessageStoreConfig messageStoreConfig;
+
+ private String defaultGroup = "defaultGroup";
+
+ private String defaultTopic = "defaultTopic";
+
+ private PopBufferMergeService popBufferMergeService;
+
+ @Mock
+ private BrokerConfig brokerConfig;
+
+ @Mock
+ private EscapeBridge escapeBridge;
@Before
public void init() throws Exception {
- FieldUtils.writeField(brokerController.getBrokerConfig(),
"enablePopBufferMerge", true, true);
- brokerController.setMessageStore(messageStore);
+ when(brokerConfig.getBrokerIP1()).thenReturn("127.0.0.1");
+ when(brokerConfig.isEnablePopBufferMerge()).thenReturn(true);
+ when(brokerConfig.getPopCkStayBufferTime()).thenReturn(10 * 1000);
+ when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
+ when(brokerController.getEscapeBridge()).thenReturn(escapeBridge);
+ when(brokerController.getMessageStore()).thenReturn(messageStore);
+
when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
+
when(brokerController.getScheduleMessageService()).thenReturn(scheduleMessageService);
+
when(brokerController.getConsumerManager()).thenReturn(consumerManager);
+
when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
popMessageProcessor = new PopMessageProcessor(brokerController);
- scheduleMessageService = new ScheduleMessageService(brokerController);
- scheduleMessageService.parseDelayLevel();
- Channel mockChannel = mock(Channel.class);
-
brokerController.getTopicConfigManager().getTopicConfigTable().put(topic, new
TopicConfig());
- clientChannelInfo = new ClientChannelInfo(mockChannel);
- ConsumerData consumerData = createConsumerData(group, topic);
- brokerController.getConsumerManager().registerConsumer(
- consumerData.getGroupName(),
- clientChannelInfo,
- consumerData.getConsumeType(),
- consumerData.getMessageModel(),
- consumerData.getConsumeFromWhere(),
- consumerData.getSubscriptionDataSet(),
- false);
+ popBufferMergeService = new PopBufferMergeService(brokerController,
popMessageProcessor);
+ FieldUtils.writeDeclaredField(popBufferMergeService,
"brokerController", brokerController, true);
+ ConcurrentMap<String, TopicConfig> topicConfigTable = new
ConcurrentHashMap<>();
+ topicConfigTable.put(defaultTopic, new TopicConfig());
+
when(topicConfigManager.getTopicConfigTable()).thenReturn(topicConfigTable);
}
- @Test(timeout = 10_000)
+ @Test(timeout = 15_000)
public void testBasic() throws Exception {
// This test case fails on Windows in CI pipeline
// Disable it for later fix
Assume.assumeFalse(MixAll.isWindows());
- PopBufferMergeService popBufferMergeService = new
PopBufferMergeService(brokerController, popMessageProcessor);
- popBufferMergeService.start();
PopCheckPoint ck = new PopCheckPoint();
ck.setBitMap(0);
int msgCnt = 1;
@@ -97,8 +124,8 @@ public class PopBufferMergeServiceTest {
ck.setInvisibleTime(invisibleTime);
int offset = 100;
ck.setStartOffset(offset);
- ck.setCId(group);
- ck.setTopic(topic);
+ ck.setCId(defaultGroup);
+ ck.setTopic(defaultTopic);
int queueId = 0;
ck.setQueueId(queueId);
@@ -108,18 +135,93 @@ public class PopBufferMergeServiceTest {
AckMsg ackMsg = new AckMsg();
ackMsg.setAckOffset(ackOffset);
ackMsg.setStartOffset(offset);
- ackMsg.setConsumerGroup(group);
- ackMsg.setTopic(topic);
+ ackMsg.setConsumerGroup(defaultGroup);
+ ackMsg.setTopic(defaultTopic);
ackMsg.setQueueId(queueId);
ackMsg.setPopTime(popTime);
try {
assertThat(popBufferMergeService.addCk(ck, reviveQid, ackOffset,
nextBeginOffset)).isTrue();
- assertThat(popBufferMergeService.getLatestOffset(topic, group,
queueId)).isEqualTo(nextBeginOffset);
+ assertThat(popBufferMergeService.getLatestOffset(defaultTopic,
defaultGroup, queueId)).isEqualTo(nextBeginOffset);
Thread.sleep(1000); // wait background threads of
PopBufferMergeService run for some time
assertThat(popBufferMergeService.addAk(reviveQid,
ackMsg)).isTrue();
- assertThat(popBufferMergeService.getLatestOffset(topic, group,
queueId)).isEqualTo(nextBeginOffset);
+ assertThat(popBufferMergeService.getLatestOffset(defaultTopic,
defaultGroup, queueId)).isEqualTo(nextBeginOffset);
} finally {
popBufferMergeService.shutdown(true);
}
}
+
+ @Test
+ public void testAddCkJustOffset_MergeKeyConflict() {
+ PopCheckPoint point = mock(PopCheckPoint.class);
+ String mergeKey = "testMergeKey";
+ when(point.getTopic()).thenReturn(mergeKey);
+ when(point.getCId()).thenReturn("");
+ when(point.getQueueId()).thenReturn(0);
+ when(point.getStartOffset()).thenReturn(0L);
+ when(point.getPopTime()).thenReturn(0L);
+ when(point.getBrokerName()).thenReturn("");
+ popBufferMergeService.buffer.put(mergeKey + "000",
mock(PopBufferMergeService.PopCheckPointWrapper.class));
+
+ assertFalse(popBufferMergeService.addCkJustOffset(point, 0, 0, 0));
+ }
+
+ @Test
+ public void testAddCkMock() {
+ int queueId = 0;
+ long startOffset = 100L;
+ long invisibleTime = 30_000L;
+ long popTime = System.currentTimeMillis();
+ int reviveQueueId = 0;
+ long nextBeginOffset = 101L;
+ String brokerName = "brokerName";
+ popBufferMergeService.addCkMock(defaultGroup, defaultTopic, queueId,
startOffset, invisibleTime, popTime, reviveQueueId, nextBeginOffset,
brokerName);
+ verify(brokerConfig, times(1)).isEnablePopLog();
+ }
+
+ @Test
+ public void testPutAckToStore() throws Exception {
+ PopCheckPoint point = new PopCheckPoint();
+ point.setStartOffset(100L);
+ point.setCId("testGroup");
+ point.setTopic("testTopic");
+ point.setQueueId(1);
+ point.setPopTime(System.currentTimeMillis());
+ point.setBrokerName("testBroker");
+
+ PopBufferMergeService.PopCheckPointWrapper pointWrapper =
mock(PopBufferMergeService.PopCheckPointWrapper.class);
+ when(pointWrapper.getCk()).thenReturn(point);
+ when(pointWrapper.getReviveQueueId()).thenReturn(0);
+
+ AtomicInteger toStoreBits = new AtomicInteger(0);
+ when(pointWrapper.getToStoreBits()).thenReturn(toStoreBits);
+
+ byte msgIndex = 0;
+ AtomicInteger count = new AtomicInteger(0);
+
+ EscapeBridge escapeBridge = mock(EscapeBridge.class);
+ when(brokerController.getEscapeBridge()).thenReturn(escapeBridge);
+
when(brokerController.getBrokerConfig().isAppendAckAsync()).thenReturn(false);
+
+
when(escapeBridge.putMessageToSpecificQueue(any())).thenAnswer(invocation -> {
+ MessageExtBrokerInner capturedMessage = invocation.getArgument(0);
+ AckMsg ackMsg = JSON.parseObject(capturedMessage.getBody(),
AckMsg.class);
+
+ assertEquals(point.ackOffsetByIndex(msgIndex),
ackMsg.getAckOffset());
+ assertEquals(point.getStartOffset(), ackMsg.getStartOffset());
+ assertEquals(point.getCId(), ackMsg.getConsumerGroup());
+ assertEquals(point.getTopic(), ackMsg.getTopic());
+ assertEquals(point.getQueueId(), ackMsg.getQueueId());
+ assertEquals(point.getPopTime(), ackMsg.getPopTime());
+ assertEquals(point.getBrokerName(), ackMsg.getBrokerName());
+
+ PutMessageResult result = mock(PutMessageResult.class);
+
when(result.getPutMessageStatus()).thenReturn(PutMessageStatus.PUT_OK);
+ return result;
+ });
+
+ Method method =
PopBufferMergeService.class.getDeclaredMethod("putAckToStore",
PopBufferMergeService.PopCheckPointWrapper.class, byte.class,
AtomicInteger.class);
+ method.setAccessible(true);
+ method.invoke(popBufferMergeService, pointWrapper, msgIndex, count);
+ verify(escapeBridge,
times(1)).putMessageToSpecificQueue(any(MessageExtBrokerInner.class));
+ }
}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
index fdb0690e5d..28476149ab 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
@@ -16,10 +16,9 @@
*/
package org.apache.rocketmq.broker.processor;
+import com.alibaba.fastjson2.JSON;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.embedded.EmbeddedChannel;
-import java.nio.ByteBuffer;
-import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.common.BrokerConfig;
@@ -27,6 +26,7 @@ import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.ConsumeInitMode;
import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
@@ -42,7 +42,7 @@ import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.exception.ConsumeQueueException;
import org.apache.rocketmq.store.logfile.DefaultMappedFile;
-import org.junit.Assert;
+import org.apache.rocketmq.store.pop.PopCheckPoint;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -50,8 +50,13 @@ import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CompletableFuture;
+
import static
org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
@@ -168,17 +173,17 @@ public class PopMessageProcessorTest {
.thenReturn(CompletableFuture.completedFuture(getMessageResult));
long offset =
brokerController.getConsumerOffsetManager().queryOffset(newGroup, retryTopic,
0);
- Assert.assertEquals(-1, offset);
+ assertEquals(-1, offset);
RemotingCommand request = createPopMsgCommand(newGroup, topic, 0,
ConsumeInitMode.MAX);
popMessageProcessor.processRequest(handlerContext, request);
offset =
brokerController.getConsumerOffsetManager().queryOffset(newGroup, retryTopic,
0);
- Assert.assertEquals(minOffset, offset);
+ assertEquals(minOffset, offset);
when(messageStore.getMinOffsetInQueue(retryTopic,
0)).thenReturn(minOffset * 2);
popMessageProcessor.processRequest(handlerContext, request);
offset =
brokerController.getConsumerOffsetManager().queryOffset(newGroup, retryTopic,
0);
- Assert.assertEquals(minOffset, offset); // will not entry
getInitOffset() again
+ assertEquals(minOffset, offset); // will not entry getInitOffset()
again
messageStore.getMinOffsetInQueue(retryTopic, 0); // prevent
UnnecessaryStubbingException
}
@@ -193,17 +198,17 @@ public class PopMessageProcessorTest {
.thenReturn(CompletableFuture.completedFuture(getMessageResult));
long offset =
brokerController.getConsumerOffsetManager().queryOffset(newGroup, topic, 0);
- Assert.assertEquals(-1, offset);
+ assertEquals(-1, offset);
RemotingCommand request = createPopMsgCommand(newGroup, topic, 0,
ConsumeInitMode.MAX);
popMessageProcessor.processRequest(handlerContext, request);
offset =
brokerController.getConsumerOffsetManager().queryOffset(newGroup, topic, 0);
- Assert.assertEquals(maxOffset - 1, offset); // checkInMem return false
+ assertEquals(maxOffset - 1, offset); // checkInMem return false
when(messageStore.getMaxOffsetInQueue(topic, 0)).thenReturn(maxOffset
* 2);
popMessageProcessor.processRequest(handlerContext, request);
offset =
brokerController.getConsumerOffsetManager().queryOffset(newGroup, topic, 0);
- Assert.assertEquals(maxOffset - 1, offset); // will not entry
getInitOffset() again
+ assertEquals(maxOffset - 1, offset); // will not entry getInitOffset()
again
messageStore.getMaxOffsetInQueue(topic, 0); // prevent
UnnecessaryStubbingException
}
@@ -240,4 +245,31 @@ public class PopMessageProcessorTest {
}
return getMessageResult;
}
+
+ @Test
+ public void testBuildCkMsgJsonParsing() {
+ PopCheckPoint ck = new PopCheckPoint();
+ ck.setTopic("TestTopic");
+ ck.setQueueId(1);
+ ck.setStartOffset(100L);
+ ck.setCId("TestConsumer");
+ ck.setPopTime(System.currentTimeMillis());
+ ck.setBrokerName("TestBroker");
+
+ int reviveQid = 0;
+ PopMessageProcessor processor = new
PopMessageProcessor(brokerController);
+
+ MessageExtBrokerInner result = processor.buildCkMsg(ck, reviveQid);
+
+ String jsonBody = new String(result.getBody(), StandardCharsets.UTF_8);
+ PopCheckPoint actual = JSON.parseObject(jsonBody, PopCheckPoint.class);
+
+ assertEquals(ck.getTopic(), actual.getTopic());
+ assertEquals(ck.getQueueId(), actual.getQueueId());
+ assertEquals(ck.getStartOffset(), actual.getStartOffset());
+ assertEquals(ck.getCId(), actual.getCId());
+ assertEquals(ck.getPopTime(), actual.getPopTime());
+ assertEquals(ck.getBrokerName(), actual.getBrokerName());
+ assertEquals(ck.getReviveTime(), actual.getReviveTime());
+ }
}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java
index 3010e83610..e6a2cdb6cd 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java
@@ -16,13 +16,7 @@
*/
package org.apache.rocketmq.broker.processor;
-import com.alibaba.fastjson.JSON;
-import java.net.SocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+import com.alibaba.fastjson2.JSON;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.failover.EscapeBridge;
@@ -40,12 +34,13 @@ import
org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.utils.DataConverter;
import org.apache.rocketmq.common.utils.NetworkUtil;
import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.store.AppendMessageResult;
+import org.apache.rocketmq.store.AppendMessageStatus;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
-import org.apache.rocketmq.store.AppendMessageResult;
-import org.apache.rocketmq.store.AppendMessageStatus;
import org.apache.rocketmq.store.pop.AckMsg;
+import org.apache.rocketmq.store.pop.BatchAckMsg;
import org.apache.rocketmq.store.pop.PopCheckPoint;
import org.apache.rocketmq.store.timer.TimerMessageStore;
import org.junit.Assert;
@@ -56,18 +51,27 @@ import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.any;
import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.Silent.class)
public class PopReviveServiceTest {
@@ -405,6 +409,59 @@ public class PopReviveServiceTest {
verify(messageStore,
times(1)).putMessage(any(MessageExtBrokerInner.class)); // rewrite CK
}
+ @Test
+ public void testReviveMsgFromBatchAck() throws Throwable {
+ brokerConfig.setEnableSkipLongAwaitingAck(true);
+ when(consumerOffsetManager.queryOffset(PopAckConstants.REVIVE_GROUP,
REVIVE_TOPIC, REVIVE_QUEUE_ID)).thenReturn(0L);
+ List<MessageExt> reviveMessageExtList = new ArrayList<>();
+ long basePopTime = System.currentTimeMillis();
+
reviveMessageExtList.add(buildBatchAckMsg(buildBatchAckMsg(Arrays.asList(1L,
2L, 3L), basePopTime), 1, 1, basePopTime));
+ doReturn(reviveMessageExtList, new
ArrayList<>()).when(popReviveService).getReviveMessage(anyLong(), anyInt());
+
+ PopReviveService.ConsumeReviveObj consumeReviveObj = new
PopReviveService.ConsumeReviveObj();
+ popReviveService.consumeReviveMessage(consumeReviveObj);
+ assertEquals(1, consumeReviveObj.map.size());
+
+ ArgumentCaptor<Long> commitOffsetCaptor =
ArgumentCaptor.forClass(Long.class);
+ doNothing().when(consumerOffsetManager).commitOffset(anyString(),
anyString(), anyString(), anyInt(), commitOffsetCaptor.capture());
+ popReviveService.mergeAndRevive(consumeReviveObj);
+ assertEquals(1, commitOffsetCaptor.getValue().longValue());
+ }
+
+ public static MessageExtBrokerInner buildBatchAckMsg(BatchAckMsg
batchAckMsg, long deliverMs, long reviveOffset, long deliverTime) {
+ MessageExtBrokerInner result = buildBatchAckInnerMessage(REVIVE_TOPIC,
batchAckMsg, REVIVE_QUEUE_ID, STORE_HOST, deliverMs,
PopMessageProcessor.genAckUniqueId(batchAckMsg));
+ result.setQueueOffset(reviveOffset);
+ result.setDeliverTimeMs(deliverMs);
+ result.setStoreTimestamp(deliverTime);
+ return result;
+ }
+
+ public static BatchAckMsg buildBatchAckMsg(Collection<Long> offsets, long
popTime) {
+ BatchAckMsg result = new BatchAckMsg();
+ result.setConsumerGroup(GROUP);
+ result.setTopic(TOPIC);
+ result.setQueueId(0);
+ result.setPopTime(popTime);
+ result.setBrokerName("broker-a");
+ result.getAckOffsetList().addAll(offsets);
+ return result;
+ }
+
+ public static MessageExtBrokerInner buildBatchAckInnerMessage(String
reviveTopic, AckMsg ackMsg, int reviveQid, SocketAddress host, long deliverMs,
String ackUniqueId) {
+ MessageExtBrokerInner result = new MessageExtBrokerInner();
+ result.setTopic(reviveTopic);
+
result.setBody(JSON.toJSONString(ackMsg).getBytes(DataConverter.CHARSET_UTF8));
+ result.setQueueId(reviveQid);
+ result.setTags(PopAckConstants.BATCH_ACK_TAG);
+ result.setBornTimestamp(System.currentTimeMillis());
+ result.setBornHost(host);
+ result.setStoreHost(host);
+ result.setDeliverTimeMs(deliverMs);
+
result.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
ackUniqueId);
+
result.setPropertiesString(MessageDecoder.messageProperties2String(result.getProperties()));
+ return result;
+ }
+
public static PopCheckPoint buildPopCheckPoint(long startOffset, long
popTime, long reviveOffset) {
PopCheckPoint ck = new PopCheckPoint();
ck.setStartOffset(startOffset);
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManagerTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManagerTest.java
index b74e57ab93..9b25e0134c 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManagerTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManagerTest.java
@@ -17,15 +17,11 @@
package org.apache.rocketmq.broker.topic;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONWriter;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.BrokerConfig;
+import
org.apache.rocketmq.remoting.protocol.body.TopicQueueMappingSerializeWrapper;
import
org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingDetail;
import
org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingUtils;
import
org.apache.rocketmq.remoting.protocol.statictopic.TopicRemappingDetailWrapper;
@@ -37,6 +33,16 @@ import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
@@ -79,9 +85,9 @@ public class TopicQueueMappingManagerTest {
String topic = UUID.randomUUID().toString();
int queueNum = 10;
TopicRemappingDetailWrapper topicRemappingDetailWrapper =
TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, brokers, new
HashMap<>());
- Assert.assertEquals(1,
topicRemappingDetailWrapper.getBrokerConfigMap().size());
+ assertEquals(1,
topicRemappingDetailWrapper.getBrokerConfigMap().size());
TopicQueueMappingDetail topicQueueMappingDetail =
topicRemappingDetailWrapper.getBrokerConfigMap().values().iterator().next().getMappingDetail();
- Assert.assertEquals(queueNum,
topicQueueMappingDetail.getHostedQueues().size());
+ assertEquals(queueNum,
topicQueueMappingDetail.getHostedQueues().size());
mappingDetailMap.put(topic, topicQueueMappingDetail);
}
}
@@ -89,7 +95,7 @@ public class TopicQueueMappingManagerTest {
{
topicQueueMappingManager = new
TopicQueueMappingManager(brokerController);
Assert.assertTrue(topicQueueMappingManager.load());
- Assert.assertEquals(0,
topicQueueMappingManager.getTopicQueueMappingTable().size());
+ assertEquals(0,
topicQueueMappingManager.getTopicQueueMappingTable().size());
for (TopicQueueMappingDetail mappingDetail :
mappingDetailMap.values()) {
for (int i = 0; i < 10; i++) {
topicQueueMappingManager.updateTopicQueueMapping(mappingDetail, false, false,
true);
@@ -101,11 +107,49 @@ public class TopicQueueMappingManagerTest {
{
topicQueueMappingManager = new
TopicQueueMappingManager(brokerController);
Assert.assertTrue(topicQueueMappingManager.load());
- Assert.assertEquals(mappingDetailMap.size(),
topicQueueMappingManager.getTopicQueueMappingTable().size());
+ assertEquals(mappingDetailMap.size(),
topicQueueMappingManager.getTopicQueueMappingTable().size());
for (TopicQueueMappingDetail topicQueueMappingDetail:
topicQueueMappingManager.getTopicQueueMappingTable().values()) {
- Assert.assertEquals(topicQueueMappingDetail,
mappingDetailMap.get(topicQueueMappingDetail.getTopic()));
+ assertEquals(topicQueueMappingDetail,
mappingDetailMap.get(topicQueueMappingDetail.getTopic()));
}
}
delete(topicQueueMappingManager);
}
+
+ @Test
+ public void testEncodePretty() {
+ TopicQueueMappingManager topicQueueMappingManager = new
TopicQueueMappingManager(null);
+ TopicQueueMappingDetail detail = new TopicQueueMappingDetail();
+ detail.setTopic("testTopic");
+ detail.setBname("testBroker");
+
+ topicQueueMappingManager.getTopicQueueMappingTable().put("testTopic",
detail);
+ topicQueueMappingManager.getDataVersion().nextVersion();
+
+ String actual = topicQueueMappingManager.encode(true);
+ TopicQueueMappingSerializeWrapper expectedWrapper = new
TopicQueueMappingSerializeWrapper();
+ expectedWrapper.setTopicQueueMappingInfoMap(new
ConcurrentHashMap<>(topicQueueMappingManager.getTopicQueueMappingTable()));
+
expectedWrapper.setDataVersion(topicQueueMappingManager.getDataVersion());
+ String expected = JSON.toJSONString(expectedWrapper,
JSONWriter.Feature.PrettyFormat);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testEncodeNonPretty() {
+ TopicQueueMappingManager topicQueueMappingManager = new
TopicQueueMappingManager(null);
+ TopicQueueMappingDetail detail = new TopicQueueMappingDetail();
+ detail.setTopic("testTopic");
+ detail.setBname("testBroker");
+
+ topicQueueMappingManager.getTopicQueueMappingTable().put("testTopic",
detail);
+ topicQueueMappingManager.getDataVersion().nextVersion();
+
+ String actual = topicQueueMappingManager.encode(false);
+ TopicQueueMappingSerializeWrapper expectedWrapper = new
TopicQueueMappingSerializeWrapper();
+ expectedWrapper.setTopicQueueMappingInfoMap(new
ConcurrentHashMap<>(topicQueueMappingManager.getTopicQueueMappingTable()));
+
expectedWrapper.setDataVersion(topicQueueMappingManager.getDataVersion());
+ String expected = JSON.toJSONString(expectedWrapper);
+
+ assertEquals(expected, actual);
+ }
}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionMetricsTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionMetricsTest.java
index 690b4eabb5..62a6ad8b5b 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionMetricsTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionMetricsTest.java
@@ -19,23 +19,40 @@ package org.apache.rocketmq.broker.transaction.queue;
import org.apache.rocketmq.broker.transaction.TransactionMetrics;
import org.apache.rocketmq.broker.transaction.TransactionMetrics.Metric;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
+import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.Collections;
+import java.util.UUID;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
@RunWith(MockitoJUnitRunner.class)
public class TransactionMetricsTest {
private TransactionMetrics transactionMetrics;
private String configPath;
+ private Path path;
@Before
- public void setUp() throws Exception {
- configPath = "configPath";
- transactionMetrics = new TransactionMetrics(configPath);
+ public void before() throws Exception {
+ configPath = createBaseDir();
+ path = Paths.get(configPath);
+ transactionMetrics = spy(new TransactionMetrics(configPath));
+ }
+
+ @After
+ public void after() throws Exception {
+ deleteFile(configPath);
+ assertFalse(path.toFile().exists());
}
/**
@@ -80,4 +97,40 @@ public class TransactionMetricsTest {
transactionMetrics.cleanMetrics(Collections.singleton(topic));
assert transactionMetrics.getTransactionCount(topic) == 0;
}
+
+ @Test
+ public void testPersist() {
+ assertFalse(path.toFile().exists());
+ transactionMetrics.persist();
+ assertTrue(path.toFile().exists());
+ verify(transactionMetrics).persist();
+ }
+
+ private String createBaseDir() {
+ String baseDir = System.getProperty("java.io.tmpdir") + File.separator
+ "unitteststore-" + UUID.randomUUID();
+ final File file = new File(baseDir);
+ if (file.exists()) {
+ System.exit(1);
+ }
+ return baseDir;
+ }
+
+ private void deleteFile(String fileName) {
+ deleteFile(new File(fileName));
+ }
+
+ private void deleteFile(File file) {
+ if (!file.exists()) {
+ return;
+ }
+ if (file.isFile()) {
+ file.delete();
+ } else if (file.isDirectory()) {
+ File[] files = file.listFiles();
+ for (File file1 : files) {
+ deleteFile(file1);
+ }
+ file.delete();
+ }
+ }
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java
b/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java
index 38e0a20752..67cf045cfb 100644
--- a/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java
+++ b/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java
@@ -16,7 +16,7 @@
*/
package org.apache.rocketmq.store.pop;
-import com.alibaba.fastjson.annotation.JSONField;
+import com.alibaba.fastjson2.annotation.JSONField;
import java.util.ArrayList;
import java.util.List;
@@ -35,7 +35,6 @@ public class PopCheckPoint implements
Comparable<PopCheckPoint> {
private int queueId;
@JSONField(name = "t")
private String topic;
- @JSONField(name = "c")
private String cid;
@JSONField(name = "ro")
private long reviveOffset;
@@ -114,10 +113,12 @@ public class PopCheckPoint implements
Comparable<PopCheckPoint> {
this.topic = topic;
}
+ @JSONField(name = "c")
public String getCId() {
return cid;
}
+ @JSONField(name = "c")
public void setCId(String cid) {
this.cid = cid;
}