This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new b1daa3c0f0 [ISSUE# 9333] Use fastjson2 in broker module (#9334)
b1daa3c0f0 is described below

commit b1daa3c0f0e8dd85f181331e4ec4ff276fb361c0
Author: yx9o <[email protected]>
AuthorDate: Tue Apr 22 10:48:56 2025 +0800

    [ISSUE# 9333] Use fastjson2 in broker module (#9334)
    
    * [#ISSUE 9333] Use fastjson2 in broker module
    
    * Update
    
    * Fix the serialization failure caused by improper get/set naming
---
 broker/BUILD.bazel                                 |   2 -
 .../rocketmq/broker/RocksDBConfigManager.java      |   7 +-
 .../broker/offset/ConsumerOrderInfoManager.java    |  19 ++-
 .../rocketmq/broker/pop/PopConsumerRecord.java     |   8 +-
 .../rocketmq/broker/pop/PopConsumerService.java    |  35 ++--
 .../broker/processor/AckMessageProcessor.java      |   7 +-
 .../broker/processor/AdminBrokerProcessor.java     |  49 +++---
 .../processor/ChangeInvisibleTimeProcessor.java    |   9 +-
 .../broker/processor/PopBufferMergeService.java    |  19 ++-
 .../broker/processor/PopMessageProcessor.java      |  31 ++--
 .../broker/processor/PopReviveService.java         |  25 +--
 .../broker/topic/TopicQueueMappingManager.java     |  21 ++-
 .../broker/transaction/TransactionMetrics.java     |  44 +++--
 .../rocketmq/broker/RocksDBConfigManagerTest.java  |  75 +++++++++
 .../broker/processor/AdminBrokerProcessorTest.java |  79 ++++++++-
 .../ChangeInvisibleTimeProcessorTest.java          |  57 ++++++-
 .../processor/PopBufferMergeServiceTest.java       | 182 ++++++++++++++++-----
 .../broker/processor/PopMessageProcessorTest.java  |  50 +++++-
 .../broker/processor/PopReviveServiceTest.java     |  83 ++++++++--
 .../broker/topic/TopicQueueMappingManagerTest.java |  68 ++++++--
 .../transaction/queue/TransactionMetricsTest.java  |  59 ++++++-
 .../apache/rocketmq/store/pop/PopCheckPoint.java   |   5 +-
 22 files changed, 715 insertions(+), 219 deletions(-)

diff --git a/broker/BUILD.bazel b/broker/BUILD.bazel
index 77d456bc16..f00d01e8cc 100644
--- a/broker/BUILD.bazel
+++ b/broker/BUILD.bazel
@@ -32,7 +32,6 @@ java_library(
         "//tieredstore",
         "@maven//:org_slf4j_slf4j_api",
         "@maven//:ch_qos_logback_logback_classic",
-        "@maven//:com_alibaba_fastjson",
         "@maven//:com_alibaba_fastjson2_fastjson2",
         "@maven//:com_github_luben_zstd_jni",
         "@maven//:com_google_guava_guava",
@@ -84,7 +83,6 @@ java_library(
         "//remoting",
         "//store",
         "//tieredstore",
-        "@maven//:com_alibaba_fastjson",
         "@maven//:com_alibaba_fastjson2_fastjson2",
         "@maven//:com_google_guava_guava",
         "@maven//:io_netty_netty_all",
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java 
b/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java
index ee2d4e54a6..c59c00c040 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java
@@ -16,9 +16,7 @@
  */
 package org.apache.rocketmq.broker;
 
-import com.alibaba.fastjson.JSON;
-import java.nio.charset.StandardCharsets;
-import java.util.function.BiConsumer;
+import com.alibaba.fastjson2.JSON;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.config.ConfigRocksDBStorage;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -31,6 +29,9 @@ import org.rocksdb.RocksIterator;
 import org.rocksdb.Statistics;
 import org.rocksdb.WriteBatch;
 
+import java.nio.charset.StandardCharsets;
+import java.util.function.BiConsumer;
+
 public class RocksDBConfigManager {
     protected static final Logger BROKER_LOG = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     public volatile boolean isStop = false;
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
index 120f5b104c..9f173daf46 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
@@ -16,17 +16,9 @@
  */
 package org.apache.rocketmq.broker.offset;
 
-import com.alibaba.fastjson.annotation.JSONField;
+import com.alibaba.fastjson2.annotation.JSONField;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.BrokerPathConfigHelper;
 import org.apache.rocketmq.common.ConfigManager;
@@ -37,6 +29,15 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
 public class ConsumerOrderInfoManager extends ConfigManager {
 
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java 
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java
index 1ee01fea1c..661ace9bcb 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java
@@ -16,9 +16,9 @@
  */
 package org.apache.rocketmq.broker.pop;
 
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
-import com.alibaba.fastjson.annotation.JSONField;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.annotation.JSONField;
+
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 
@@ -119,7 +119,7 @@ public class PopConsumerRecord {
     }
 
     public static PopConsumerRecord decode(byte[] body) {
-        return JSONObject.parseObject(body, PopConsumerRecord.class);
+        return JSON.parseObject(body, PopConsumerRecord.class);
     }
 
     public long getPopTime() {
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java 
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
index 1138ff4afe..a2198f2560 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
@@ -16,25 +16,9 @@
  */
 package org.apache.rocketmq.broker.pop;
 
-import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson2.JSON;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Stopwatch;
-import java.nio.ByteBuffer;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Objects;
-import java.util.Queue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.rocketmq.broker.BrokerController;
@@ -65,6 +49,23 @@ import org.apache.rocketmq.store.pop.PopCheckPoint;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.ByteBuffer;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
 public class PopConsumerService extends ServiceThread {
 
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
index 23a4f6167c..06a531552a 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
@@ -16,11 +16,9 @@
  */
 package org.apache.rocketmq.broker.processor;
 
-import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson2.JSON;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
-import java.util.BitSet;
-import java.nio.charset.StandardCharsets;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.metrics.PopMetricsManager;
 import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
@@ -52,6 +50,9 @@ import 
org.apache.rocketmq.store.exception.ConsumeQueueException;
 import org.apache.rocketmq.store.pop.AckMsg;
 import org.apache.rocketmq.store.pop.BatchAckMsg;
 
+import java.nio.charset.StandardCharsets;
+import java.util.BitSet;
+
 public class AckMessageProcessor implements NettyRequestProcessor {
 
     private static final Logger POP_LOGGER = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 4200f34bde..c747fa15af 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -16,33 +16,12 @@
  */
 package org.apache.rocketmq.broker.processor;
 
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
 import com.google.common.collect.Sets;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.opentelemetry.api.common.Attributes;
-import java.io.UnsupportedEncodingException;
-import java.net.UnknownHostException;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.acl.AccessValidator;
@@ -236,6 +215,28 @@ import org.apache.rocketmq.store.timer.TimerCheckpoint;
 import org.apache.rocketmq.store.timer.TimerMessageStore;
 import org.apache.rocketmq.store.util.LibC;
 
+import java.io.UnsupportedEncodingException;
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_INVOCATION_STATUS;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
 import static 
org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
@@ -2891,7 +2892,7 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
             } else {
                 ConsumerFilterData filterData = 
this.brokerController.getConsumerFilterManager()
                     .get(requestHeader.getTopic(), 
requestHeader.getConsumerGroup());
-                body.setFilterData(JSON.toJSONString(filterData, true));
+                body.setFilterData(JSON.toJSONString(filterData));
 
                 messageFilter = new ExpressionMessageFilter(subscriptionData, 
filterData,
                     this.brokerController.getConsumerFilterManager());
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
index de72ee7baf..f288c001b8 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
@@ -16,12 +16,9 @@
  */
 package org.apache.rocketmq.broker.processor;
 
-import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson2.JSON;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.nio.charset.StandardCharsets;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.metrics.PopMetricsManager;
 import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
@@ -50,6 +47,10 @@ import 
org.apache.rocketmq.store.exception.ConsumeQueueException;
 import org.apache.rocketmq.store.pop.AckMsg;
 import org.apache.rocketmq.store.pop.PopCheckPoint;
 
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
 public class ChangeInvisibleTimeProcessor implements NettyRequestProcessor {
     private static final Logger POP_LOGGER = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
     private final BrokerController brokerController;
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
index 820388b18d..7c309ec5c4 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
@@ -16,15 +16,7 @@
  */
 package org.apache.rocketmq.broker.processor;
 
-import com.alibaba.fastjson.JSON;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.atomic.AtomicInteger;
+import com.alibaba.fastjson2.JSON;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.metrics.PopMetricsManager;
 import org.apache.rocketmq.common.KeyBuilder;
@@ -44,6 +36,15 @@ import org.apache.rocketmq.store.pop.AckMsg;
 import org.apache.rocketmq.store.pop.BatchAckMsg;
 import org.apache.rocketmq.store.pop.PopCheckPoint;
 
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicInteger;
+
 public class PopBufferMergeService extends ServiceThread {
     private static final Logger POP_LOGGER = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
     ConcurrentHashMap<String/*mergeKey*/, PopCheckPointWrapper>
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index d73acc84df..9f55269f39 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -16,27 +16,13 @@
  */
 package org.apache.rocketmq.broker.processor;
 
-import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson2.JSON;
 import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.FileRegion;
 import io.opentelemetry.api.common.Attributes;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Objects;
-import java.util.Random;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.filter.ConsumerFilterData;
 import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
@@ -91,6 +77,21 @@ import org.apache.rocketmq.store.pop.AckMsg;
 import org.apache.rocketmq.store.pop.BatchAckMsg;
 import org.apache.rocketmq.store.pop.PopCheckPoint;
 
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_RETRY;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
index e1ead86169..dcffaf50cc 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
@@ -16,18 +16,8 @@
  */
 package org.apache.rocketmq.broker.processor;
 
-import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson2.JSON;
 import io.opentelemetry.api.common.Attributes;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-import java.util.concurrent.CompletableFuture;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.rocketmq.broker.BrokerController;
@@ -37,12 +27,12 @@ import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.consumer.PullStatus;
 import org.apache.rocketmq.common.KeyBuilder;
 import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.PopAckConstants;
 import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.TopicFilterType;
+import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageConst;
@@ -61,6 +51,17 @@ import org.apache.rocketmq.store.pop.AckMsg;
 import org.apache.rocketmq.store.pop.BatchAckMsg;
 import org.apache.rocketmq.store.pop.PopCheckPoint;
 
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC;
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index 6b9cf15938..dfbe5d347a 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -16,13 +16,8 @@
  */
 package org.apache.rocketmq.broker.topic;
 
-import com.alibaba.fastjson.JSON;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONWriter;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.BrokerPathConfigHelper;
 import org.apache.rocketmq.common.ConfigManager;
@@ -40,6 +35,13 @@ import 
org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingUtils;
 import org.apache.rocketmq.remoting.rpc.TopicQueueRequestHeader;
 import org.apache.rocketmq.remoting.rpc.TopicRequestHeader;
 
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
 import static 
org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
 
 public class TopicQueueMappingManager extends ConfigManager {
@@ -149,7 +151,10 @@ public class TopicQueueMappingManager extends 
ConfigManager {
         TopicQueueMappingSerializeWrapper wrapper = new 
TopicQueueMappingSerializeWrapper();
         wrapper.setTopicQueueMappingInfoMap(topicQueueMappingTable);
         wrapper.setDataVersion(this.dataVersion);
-        return JSON.toJSONString(wrapper, pretty);
+        if (pretty) {
+            return JSON.toJSONString(wrapper, JSONWriter.Feature.PrettyFormat);
+        }
+        return JSON.toJSONString(wrapper);
     }
 
     @Override
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java
index ad30c73c60..d8dd811db2 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java
@@ -16,16 +16,21 @@
  */
 package org.apache.rocketmq.broker.transaction;
 
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONWriter;
 import com.google.common.io.Files;
-import java.io.BufferedWriter;
+import org.apache.rocketmq.common.ConfigManager;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.protocol.DataVersion;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.nio.charset.StandardCharsets;
+import java.io.OutputStream;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
@@ -33,14 +38,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.rocketmq.common.ConfigManager;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.topic.TopicValidator;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.apache.rocketmq.remoting.protocol.DataVersion;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
-
 public class TransactionMetrics extends ConfigManager {
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
 
@@ -90,11 +87,11 @@ public class TransactionMetrics extends ConfigManager {
         this.transactionCounts = transactionCounts;
     }
 
-    protected void write0(Writer writer) {
+    protected void write0(OutputStream out) {
         TransactionMetricsSerializeWrapper wrapper = new 
TransactionMetricsSerializeWrapper();
         wrapper.setTransactionCount(transactionCounts);
         wrapper.setDataVersion(dataVersion);
-        JSON.writeJSONString(writer, wrapper, 
SerializerFeature.BrowserCompatible);
+        JSON.writeTo(out, wrapper, JSONWriter.Feature.BrowserCompatible);
     }
 
     @Override
@@ -182,7 +179,7 @@ public class TransactionMetrics extends ConfigManager {
         String config = configFilePath();
         String temp = config + ".tmp";
         String backup = config + ".bak";
-        BufferedWriter bufferedWriter = null;
+        FileOutputStream outputStream = null;
         try {
             File tmpFile = new File(temp);
             File parentDirectory = tmpFile.getParentFile();
@@ -199,11 +196,10 @@ public class TransactionMetrics extends ConfigManager {
                     return;
                 }
             }
-            bufferedWriter = new BufferedWriter(new OutputStreamWriter(new 
FileOutputStream(tmpFile, false),
-                    StandardCharsets.UTF_8));
-            write0(bufferedWriter);
-            bufferedWriter.flush();
-            bufferedWriter.close();
+            outputStream = new FileOutputStream(tmpFile, false);
+            write0(outputStream);
+            outputStream.flush();
+            outputStream.close();
             log.debug("Finished writing tmp file: {}", temp);
 
             File configFile = new File(config);
@@ -216,9 +212,9 @@ public class TransactionMetrics extends ConfigManager {
         } catch (IOException e) {
             log.error("Failed to persist {}", temp, e);
         } finally {
-            if (null != bufferedWriter) {
+            if (null != outputStream) {
                 try {
-                    bufferedWriter.close();
+                    outputStream.close();
                 } catch (IOException ignore) {
                 }
             }
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/RocksDBConfigManagerTest.java 
b/broker/src/test/java/org/apache/rocketmq/broker/RocksDBConfigManagerTest.java
new file mode 100644
index 0000000000..d9feb6a782
--- /dev/null
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/RocksDBConfigManagerTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker;
+
+import com.alibaba.fastjson2.JSON;
+import org.apache.rocketmq.common.config.ConfigRocksDBStorage;
+import org.apache.rocketmq.remoting.protocol.DataVersion;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+public class RocksDBConfigManagerTest {
+
+    private ConfigRocksDBStorage configRocksDBStorage;
+
+    private RocksDBConfigManager rocksDBConfigManager;
+
+    @Before
+    public void setUp() throws IllegalAccessException {
+        configRocksDBStorage = mock(ConfigRocksDBStorage.class);
+        rocksDBConfigManager = spy(new RocksDBConfigManager("testPath", 1000L, 
null));
+        rocksDBConfigManager.configRocksDBStorage = configRocksDBStorage;
+    }
+
+    @Test
+    public void testLoadDataVersion() throws Exception {
+        DataVersion expected = new DataVersion();
+        expected.nextVersion();
+        String jsonData = JSON.toJSONString(expected);
+        byte[] mockDataVersion = jsonData.getBytes(StandardCharsets.UTF_8);
+
+        
when(rocksDBConfigManager.configRocksDBStorage.getKvDataVersion()).thenReturn(mockDataVersion);
+
+        boolean result = rocksDBConfigManager.loadDataVersion();
+
+        assertTrue(result);
+        assertEquals(expected.getCounter().get(), 
rocksDBConfigManager.getKvDataVersion().getCounter().get());
+        assertEquals(expected.getTimestamp(), 
rocksDBConfigManager.getKvDataVersion().getTimestamp());
+    }
+
+    @Test
+    public void testUpdateKvDataVersion() throws Exception {
+        rocksDBConfigManager.updateKvDataVersion();
+
+        DataVersion expectedDataVersion = 
rocksDBConfigManager.getKvDataVersion();
+        verify(rocksDBConfigManager.configRocksDBStorage, 
times(1)).updateKvDataVersion(
+                
eq(JSON.toJSONString(expectedDataVersion).getBytes(StandardCharsets.UTF_8))
+        );
+    }
+}
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
index 959b147d9d..90c333b770 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
@@ -16,7 +16,8 @@
  */
 package org.apache.rocketmq.broker.processor;
 
-import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
 import com.google.common.collect.Sets;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
@@ -34,10 +35,10 @@ import org.apache.rocketmq.broker.client.ClientChannelInfo;
 import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
 import org.apache.rocketmq.broker.client.ConsumerManager;
 import org.apache.rocketmq.broker.client.net.Broker2Client;
-import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
-import org.apache.rocketmq.broker.schedule.ScheduleMessageService;
 import org.apache.rocketmq.broker.config.v1.RocksDBSubscriptionGroupManager;
 import org.apache.rocketmq.broker.config.v1.RocksDBTopicConfigManager;
+import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
+import org.apache.rocketmq.broker.schedule.ScheduleMessageService;
 import org.apache.rocketmq.broker.topic.TopicConfigManager;
 import org.apache.rocketmq.common.BoundaryType;
 import org.apache.rocketmq.common.BrokerConfig;
@@ -73,6 +74,7 @@ import 
org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
 import org.apache.rocketmq.remoting.protocol.body.QueryCorrectionOffsetBody;
 import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
 import org.apache.rocketmq.remoting.protocol.body.UserInfo;
+import 
org.apache.rocketmq.remoting.protocol.header.CheckRocksdbCqWriteProgressRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.CreateAclRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.CreateTopicRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.CreateUserRequestHeader;
@@ -87,11 +89,13 @@ import 
org.apache.rocketmq.remoting.protocol.header.GetConsumerStatusRequestHead
 import 
org.apache.rocketmq.remoting.protocol.header.GetEarliestMsgStoretimeRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.GetSubscriptionGroupConfigRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.GetTopicConfigRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.GetUserRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.ListAclsRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.ListUsersRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.NotifyMinBrokerIdChangeRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.QueryConsumeQueueRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.QueryCorrectionOffsetHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.QuerySubscriptionByConsumerRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.QueryTopicConsumeByWhoRequestHeader;
@@ -104,6 +108,7 @@ import 
org.apache.rocketmq.remoting.protocol.header.UpdateAclRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.UpdateUserRequestHeader;
 import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
 import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
+import 
org.apache.rocketmq.remoting.protocol.statictopic.TopicConfigAndQueueMapping;
 import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.store.CommitLog;
 import org.apache.rocketmq.store.DefaultMessageStore;
@@ -111,6 +116,7 @@ import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.logfile.DefaultMappedFile;
+import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
 import org.apache.rocketmq.store.stats.BrokerStats;
 import org.apache.rocketmq.store.timer.TimerCheckpoint;
 import org.apache.rocketmq.store.timer.TimerMessageStore;
@@ -145,6 +151,8 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.LongAdder;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
@@ -717,7 +725,7 @@ public class AdminBrokerProcessorTest {
         consumerOffsetManager = mock(ConsumerOffsetManager.class);
         
when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
         ConsumerOffsetManager consumerOffset = new ConsumerOffsetManager();
-        
when(consumerOffsetManager.encode()).thenReturn(JSON.toJSONString(consumerOffset,
 false));
+        
when(consumerOffsetManager.encode()).thenReturn(JSON.toJSONString(consumerOffset));
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null);
         RemotingCommand response = 
adminBrokerProcessor.processRequest(handlerContext, request);
         assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
@@ -1328,6 +1336,69 @@ public class AdminBrokerProcessorTest {
         assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
     }
 
+    @Test
+    public void testGetSubscriptionGroup() throws RemotingCommandException {
+        
brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().put("group",
 new SubscriptionGroupConfig());
+        GetSubscriptionGroupConfigRequestHeader requestHeader = new 
GetSubscriptionGroupConfigRequestHeader();
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_SUBSCRIPTIONGROUP_CONFIG, 
requestHeader);
+        requestHeader.setGroup("group");
+        request.makeCustomHeaderToNet();
+        RemotingCommand response = 
adminBrokerProcessor.processRequest(handlerContext, request);
+        assertEquals(ResponseCode.SUCCESS, response.getCode());
+    }
+
+    @Test
+    public void testCheckRocksdbCqWriteProgress() throws 
RemotingCommandException {
+        CheckRocksdbCqWriteProgressRequestHeader requestHeader = new 
CheckRocksdbCqWriteProgressRequestHeader();
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CHECK_ROCKSDB_CQ_WRITE_PROGRESS,
 requestHeader);
+        requestHeader.setTopic("topic");
+        request.makeCustomHeaderToNet();
+        RemotingCommand response = 
adminBrokerProcessor.processRequest(handlerContext, request);
+        assertEquals(ResponseCode.SUCCESS, response.getCode());
+    }
+
+    @Test
+    public void testQueryConsumeQueue() throws RemotingCommandException {
+        messageStore = mock(MessageStore.class);
+        ConsumeQueueInterface consumeQueue = mock(ConsumeQueueInterface.class);
+        when(consumeQueue.getMinOffsetInQueue()).thenReturn(0L);
+        when(consumeQueue.getMaxOffsetInQueue()).thenReturn(1L);
+        when(messageStore.getConsumeQueue(anyString(), 
anyInt())).thenReturn(consumeQueue);
+        when(brokerController.getMessageStore()).thenReturn(messageStore);
+        QueryConsumeQueueRequestHeader requestHeader = new 
QueryConsumeQueueRequestHeader();
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUME_QUEUE, 
requestHeader);
+        requestHeader.setTopic("topic");
+        requestHeader.setQueueId(0);
+        request.makeCustomHeaderToNet();
+        RemotingCommand response = 
adminBrokerProcessor.processRequest(handlerContext, request);
+        assertEquals(ResponseCode.SUCCESS, response.getCode());
+    }
+
+    @Test
+    public void testProcessRequest_GetTopicConfig() throws Exception {
+        GetTopicConfigRequestHeader requestHeader = new 
GetTopicConfigRequestHeader();
+        requestHeader.setTopic("testTopic");
+
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_CONFIG, 
requestHeader);
+        request.makeCustomHeaderToNet();
+
+        TopicConfig topicConfig = new TopicConfig();
+        topicConfig.setTopicName("testTopic");
+        TopicConfigManager topicConfigManager = mock(TopicConfigManager.class);
+        
when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
+        when(topicConfigManager.selectTopicConfig("testTopic"))
+                .thenReturn(topicConfig);
+
+        RemotingCommand response = 
adminBrokerProcessor.processRequest(handlerContext, request);
+
+        assertNotNull(response);
+        assertEquals(ResponseCode.SUCCESS, response.getCode());
+
+        String responseBody = new String(response.getBody(), 
StandardCharsets.UTF_8);
+        TopicConfigAndQueueMapping result = 
JSONObject.parseObject(responseBody, TopicConfigAndQueueMapping.class);
+        assertEquals("testTopic", result.getTopicName());
+    }
+
     private ResetOffsetRequestHeader createRequestHeader(String topic,String 
group,long timestamp,boolean force,long offset,int queueId) {
         ResetOffsetRequestHeader requestHeader = new 
ResetOffsetRequestHeader();
         requestHeader.setTopic(topic);
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
index e15d51b4a8..77490dbd69 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
@@ -18,12 +18,11 @@ package org.apache.rocketmq.broker.processor;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
-import java.lang.reflect.Field;
-import java.util.concurrent.CompletableFuture;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.client.ClientChannelInfo;
 import org.apache.rocketmq.broker.client.net.Broker2Client;
 import org.apache.rocketmq.broker.failover.EscapeBridge;
+import org.apache.rocketmq.broker.topic.TopicConfigManager;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.message.MessageConst;
@@ -41,10 +40,12 @@ import 
org.apache.rocketmq.remoting.protocol.heartbeat.ConsumerData;
 import org.apache.rocketmq.store.AppendMessageResult;
 import org.apache.rocketmq.store.AppendMessageStatus;
 import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.PutMessageStatus;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.exception.ConsumeQueueException;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -52,8 +53,13 @@ import org.mockito.Mock;
 import org.mockito.Spy;
 import org.mockito.junit.MockitoJUnitRunner;
 
+import java.lang.reflect.Field;
+import java.util.concurrent.CompletableFuture;
+
 import static 
org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -162,4 +168,51 @@ public class ChangeInvisibleTimeProcessorTest {
         
assertThat(responseToReturn.getCode()).isEqualTo(ResponseCode.NO_MESSAGE);
         
assertThat(responseToReturn.getOpaque()).isEqualTo(request.getOpaque());
     }
+
+    @Test
+    public void testProcessRequestAsync_JsonParsing() throws Exception {
+        Channel mockChannel = mock(Channel.class);
+        RemotingCommand mockRequest = mock(RemotingCommand.class);
+        BrokerController mockBrokerController = mock(BrokerController.class);
+        TopicConfigManager mockTopicConfigManager = 
mock(TopicConfigManager.class);
+        MessageStore mockMessageStore = mock(MessageStore.class);
+        BrokerConfig mockBrokerConfig = mock(BrokerConfig.class);
+        BrokerStatsManager mockBrokerStatsManager = 
mock(BrokerStatsManager.class);
+        PopMessageProcessor mockPopMessageProcessor = 
mock(PopMessageProcessor.class);
+        PopBufferMergeService mockPopBufferMergeService = 
mock(PopBufferMergeService.class);
+
+        
when(mockBrokerController.getTopicConfigManager()).thenReturn(mockTopicConfigManager);
+        
when(mockBrokerController.getMessageStore()).thenReturn(mockMessageStore);
+        
when(mockBrokerController.getBrokerConfig()).thenReturn(mockBrokerConfig);
+        
when(mockBrokerController.getBrokerStatsManager()).thenReturn(mockBrokerStatsManager);
+        
when(mockBrokerController.getPopMessageProcessor()).thenReturn(mockPopMessageProcessor);
+        
when(mockPopMessageProcessor.getPopBufferMergeService()).thenReturn(mockPopBufferMergeService);
+        when(mockPopBufferMergeService.addAk(anyInt(), 
any())).thenReturn(false);
+        when(mockBrokerController.getEscapeBridge()).thenReturn(escapeBridge);
+        PutMessageResult mockPutMessageResult = new 
PutMessageResult(PutMessageStatus.PUT_OK, null, true);
+        
when(mockBrokerController.getEscapeBridge().asyncPutMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenReturn(CompletableFuture.completedFuture(mockPutMessageResult));
+
+        TopicConfig topicConfig = new TopicConfig();
+        topicConfig.setReadQueueNums(4);
+        
when(mockTopicConfigManager.selectTopicConfig(anyString())).thenReturn(topicConfig);
+        when(mockMessageStore.getMinOffsetInQueue(anyString(), 
anyInt())).thenReturn(0L);
+        when(mockMessageStore.getMaxOffsetInQueue(anyString(), 
anyInt())).thenReturn(10L);
+        
when(mockBrokerConfig.isPopConsumerKVServiceEnable()).thenReturn(false);
+
+        ChangeInvisibleTimeRequestHeader requestHeader = new 
ChangeInvisibleTimeRequestHeader();
+        requestHeader.setTopic("TestTopic");
+        requestHeader.setQueueId(1);
+        requestHeader.setOffset(5L);
+        requestHeader.setConsumerGroup("TestGroup");
+        requestHeader.setExtraInfo("0 10000 10000 0 TestBroker 1");
+        requestHeader.setInvisibleTime(60000L);
+        
when(mockRequest.decodeCommandCustomHeader(ChangeInvisibleTimeRequestHeader.class)).thenReturn(requestHeader);
+
+        ChangeInvisibleTimeProcessor processor = new 
ChangeInvisibleTimeProcessor(mockBrokerController);
+        CompletableFuture<RemotingCommand> futureResponse = 
processor.processRequestAsync(mockChannel, mockRequest, true);
+
+        RemotingCommand response = futureResponse.get();
+        assertNotNull(response);
+        assertEquals(ResponseCode.SUCCESS, response.getCode());
+    }
 }
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java
index acc7a3da74..33d6820a7e 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java
@@ -16,19 +16,20 @@
  */
 package org.apache.rocketmq.broker.processor;
 
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
+import com.alibaba.fastjson2.JSON;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.broker.client.ClientChannelInfo;
+import org.apache.rocketmq.broker.client.ConsumerManager;
+import org.apache.rocketmq.broker.failover.EscapeBridge;
 import org.apache.rocketmq.broker.schedule.ScheduleMessageService;
+import org.apache.rocketmq.broker.topic.TopicConfigManager;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.remoting.netty.NettyClientConfig;
-import org.apache.rocketmq.remoting.netty.NettyServerConfig;
-import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumerData;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
 import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.PutMessageStatus;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.pop.AckMsg;
 import org.apache.rocketmq.store.pop.PopCheckPoint;
@@ -37,56 +38,82 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
-import org.mockito.Spy;
 import org.mockito.junit.MockitoJUnitRunner;
 
-import static 
org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData;
+import java.lang.reflect.Method;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.Silent.class)
 public class PopBufferMergeServiceTest {
-    @Spy
-    private BrokerController brokerController = new BrokerController(new 
BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new 
MessageStoreConfig());
+
     @Mock
+    private BrokerController brokerController;
+
     private PopMessageProcessor popMessageProcessor;
+
+    @Mock
+    private ScheduleMessageService scheduleMessageService;
+
     @Mock
-    private ChannelHandlerContext handlerContext;
+    private TopicConfigManager topicConfigManager;
+
+    @Mock
+    private ConsumerManager consumerManager;
+
     @Mock
     private DefaultMessageStore messageStore;
-    private ScheduleMessageService scheduleMessageService;
-    private ClientChannelInfo clientChannelInfo;
-    private String group = "FooBarGroup";
-    private String topic = "FooBar";
+
+    @Mock
+    private MessageStoreConfig messageStoreConfig;
+
+    private String defaultGroup = "defaultGroup";
+
+    private String defaultTopic = "defaultTopic";
+
+    private PopBufferMergeService popBufferMergeService;
+
+    @Mock
+    private BrokerConfig brokerConfig;
+
+    @Mock
+    private EscapeBridge escapeBridge;
 
     @Before
     public void init() throws Exception {
-        FieldUtils.writeField(brokerController.getBrokerConfig(), 
"enablePopBufferMerge", true, true);
-        brokerController.setMessageStore(messageStore);
+        when(brokerConfig.getBrokerIP1()).thenReturn("127.0.0.1");
+        when(brokerConfig.isEnablePopBufferMerge()).thenReturn(true);
+        when(brokerConfig.getPopCkStayBufferTime()).thenReturn(10 * 1000);
+        when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
+        when(brokerController.getEscapeBridge()).thenReturn(escapeBridge);
+        when(brokerController.getMessageStore()).thenReturn(messageStore);
+        
when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
+        
when(brokerController.getScheduleMessageService()).thenReturn(scheduleMessageService);
+        
when(brokerController.getConsumerManager()).thenReturn(consumerManager);
+        
when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
         popMessageProcessor = new PopMessageProcessor(brokerController);
-        scheduleMessageService = new ScheduleMessageService(brokerController);
-        scheduleMessageService.parseDelayLevel();
-        Channel mockChannel = mock(Channel.class);
-        
brokerController.getTopicConfigManager().getTopicConfigTable().put(topic, new 
TopicConfig());
-        clientChannelInfo = new ClientChannelInfo(mockChannel);
-        ConsumerData consumerData = createConsumerData(group, topic);
-        brokerController.getConsumerManager().registerConsumer(
-            consumerData.getGroupName(),
-            clientChannelInfo,
-            consumerData.getConsumeType(),
-            consumerData.getMessageModel(),
-            consumerData.getConsumeFromWhere(),
-            consumerData.getSubscriptionDataSet(),
-            false);
+        popBufferMergeService = new PopBufferMergeService(brokerController, 
popMessageProcessor);
+        FieldUtils.writeDeclaredField(popBufferMergeService, 
"brokerController", brokerController, true);
+        ConcurrentMap<String, TopicConfig> topicConfigTable = new 
ConcurrentHashMap<>();
+        topicConfigTable.put(defaultTopic, new TopicConfig());
+        
when(topicConfigManager.getTopicConfigTable()).thenReturn(topicConfigTable);
     }
 
-    @Test(timeout = 10_000)
+    @Test(timeout = 15_000)
     public void testBasic() throws Exception {
         // This test case fails on Windows in CI pipeline
         // Disable it for later fix
         Assume.assumeFalse(MixAll.isWindows());
-        PopBufferMergeService popBufferMergeService = new 
PopBufferMergeService(brokerController, popMessageProcessor);
-        popBufferMergeService.start();
         PopCheckPoint ck = new PopCheckPoint();
         ck.setBitMap(0);
         int msgCnt = 1;
@@ -97,8 +124,8 @@ public class PopBufferMergeServiceTest {
         ck.setInvisibleTime(invisibleTime);
         int offset = 100;
         ck.setStartOffset(offset);
-        ck.setCId(group);
-        ck.setTopic(topic);
+        ck.setCId(defaultGroup);
+        ck.setTopic(defaultTopic);
         int queueId = 0;
         ck.setQueueId(queueId);
 
@@ -108,18 +135,93 @@ public class PopBufferMergeServiceTest {
         AckMsg ackMsg = new AckMsg();
         ackMsg.setAckOffset(ackOffset);
         ackMsg.setStartOffset(offset);
-        ackMsg.setConsumerGroup(group);
-        ackMsg.setTopic(topic);
+        ackMsg.setConsumerGroup(defaultGroup);
+        ackMsg.setTopic(defaultTopic);
         ackMsg.setQueueId(queueId);
         ackMsg.setPopTime(popTime);
         try {
             assertThat(popBufferMergeService.addCk(ck, reviveQid, ackOffset, 
nextBeginOffset)).isTrue();
-            assertThat(popBufferMergeService.getLatestOffset(topic, group, 
queueId)).isEqualTo(nextBeginOffset);
+            assertThat(popBufferMergeService.getLatestOffset(defaultTopic, 
defaultGroup, queueId)).isEqualTo(nextBeginOffset);
             Thread.sleep(1000); // wait background threads of 
PopBufferMergeService run for some time
             assertThat(popBufferMergeService.addAk(reviveQid, 
ackMsg)).isTrue();
-            assertThat(popBufferMergeService.getLatestOffset(topic, group, 
queueId)).isEqualTo(nextBeginOffset);
+            assertThat(popBufferMergeService.getLatestOffset(defaultTopic, 
defaultGroup, queueId)).isEqualTo(nextBeginOffset);
         } finally {
             popBufferMergeService.shutdown(true);
         }
     }
+
+    @Test
+    public void testAddCkJustOffset_MergeKeyConflict() {
+        PopCheckPoint point = mock(PopCheckPoint.class);
+        String mergeKey = "testMergeKey";
+        when(point.getTopic()).thenReturn(mergeKey);
+        when(point.getCId()).thenReturn("");
+        when(point.getQueueId()).thenReturn(0);
+        when(point.getStartOffset()).thenReturn(0L);
+        when(point.getPopTime()).thenReturn(0L);
+        when(point.getBrokerName()).thenReturn("");
+        popBufferMergeService.buffer.put(mergeKey + "000", 
mock(PopBufferMergeService.PopCheckPointWrapper.class));
+
+        assertFalse(popBufferMergeService.addCkJustOffset(point, 0, 0, 0));
+    }
+
+    @Test
+    public void testAddCkMock() {
+        int queueId = 0;
+        long startOffset = 100L;
+        long invisibleTime = 30_000L;
+        long popTime = System.currentTimeMillis();
+        int reviveQueueId = 0;
+        long nextBeginOffset = 101L;
+        String brokerName = "brokerName";
+        popBufferMergeService.addCkMock(defaultGroup, defaultTopic, queueId, 
startOffset, invisibleTime, popTime, reviveQueueId, nextBeginOffset, 
brokerName);
+        verify(brokerConfig, times(1)).isEnablePopLog();
+    }
+
+    @Test
+    public void testPutAckToStore() throws Exception {
+        PopCheckPoint point = new PopCheckPoint();
+        point.setStartOffset(100L);
+        point.setCId("testGroup");
+        point.setTopic("testTopic");
+        point.setQueueId(1);
+        point.setPopTime(System.currentTimeMillis());
+        point.setBrokerName("testBroker");
+
+        PopBufferMergeService.PopCheckPointWrapper pointWrapper = 
mock(PopBufferMergeService.PopCheckPointWrapper.class);
+        when(pointWrapper.getCk()).thenReturn(point);
+        when(pointWrapper.getReviveQueueId()).thenReturn(0);
+
+        AtomicInteger toStoreBits = new AtomicInteger(0);
+        when(pointWrapper.getToStoreBits()).thenReturn(toStoreBits);
+
+        byte msgIndex = 0;
+        AtomicInteger count = new AtomicInteger(0);
+
+        EscapeBridge escapeBridge = mock(EscapeBridge.class);
+        when(brokerController.getEscapeBridge()).thenReturn(escapeBridge);
+        
when(brokerController.getBrokerConfig().isAppendAckAsync()).thenReturn(false);
+
+        
when(escapeBridge.putMessageToSpecificQueue(any())).thenAnswer(invocation -> {
+            MessageExtBrokerInner capturedMessage = invocation.getArgument(0);
+            AckMsg ackMsg = JSON.parseObject(capturedMessage.getBody(), 
AckMsg.class);
+
+            assertEquals(point.ackOffsetByIndex(msgIndex), 
ackMsg.getAckOffset());
+            assertEquals(point.getStartOffset(), ackMsg.getStartOffset());
+            assertEquals(point.getCId(), ackMsg.getConsumerGroup());
+            assertEquals(point.getTopic(), ackMsg.getTopic());
+            assertEquals(point.getQueueId(), ackMsg.getQueueId());
+            assertEquals(point.getPopTime(), ackMsg.getPopTime());
+            assertEquals(point.getBrokerName(), ackMsg.getBrokerName());
+
+            PutMessageResult result = mock(PutMessageResult.class);
+            
when(result.getPutMessageStatus()).thenReturn(PutMessageStatus.PUT_OK);
+            return result;
+        });
+
+        Method method = 
PopBufferMergeService.class.getDeclaredMethod("putAckToStore", 
PopBufferMergeService.PopCheckPointWrapper.class, byte.class, 
AtomicInteger.class);
+        method.setAccessible(true);
+        method.invoke(popBufferMergeService, pointWrapper, msgIndex, count);
+        verify(escapeBridge, 
times(1)).putMessageToSpecificQueue(any(MessageExtBrokerInner.class));
+    }
 }
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
index fdb0690e5d..28476149ab 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
@@ -16,10 +16,9 @@
  */
 package org.apache.rocketmq.broker.processor;
 
+import com.alibaba.fastjson2.JSON;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.embedded.EmbeddedChannel;
-import java.nio.ByteBuffer;
-import java.util.concurrent.CompletableFuture;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.client.ClientChannelInfo;
 import org.apache.rocketmq.common.BrokerConfig;
@@ -27,6 +26,7 @@ import org.apache.rocketmq.common.KeyBuilder;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.constant.ConsumeInitMode;
 import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
 import org.apache.rocketmq.remoting.netty.NettyServerConfig;
@@ -42,7 +42,7 @@ import org.apache.rocketmq.store.SelectMappedBufferResult;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.exception.ConsumeQueueException;
 import org.apache.rocketmq.store.logfile.DefaultMappedFile;
-import org.junit.Assert;
+import org.apache.rocketmq.store.pop.PopCheckPoint;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -50,8 +50,13 @@ import org.mockito.Mock;
 import org.mockito.Spy;
 import org.mockito.junit.MockitoJUnitRunner;
 
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CompletableFuture;
+
 import static 
org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
@@ -168,17 +173,17 @@ public class PopMessageProcessorTest {
                 
.thenReturn(CompletableFuture.completedFuture(getMessageResult));
 
         long offset = 
brokerController.getConsumerOffsetManager().queryOffset(newGroup, retryTopic, 
0);
-        Assert.assertEquals(-1, offset);
+        assertEquals(-1, offset);
 
         RemotingCommand request = createPopMsgCommand(newGroup, topic, 0, 
ConsumeInitMode.MAX);
         popMessageProcessor.processRequest(handlerContext, request);
         offset = 
brokerController.getConsumerOffsetManager().queryOffset(newGroup, retryTopic, 
0);
-        Assert.assertEquals(minOffset, offset);
+        assertEquals(minOffset, offset);
 
         when(messageStore.getMinOffsetInQueue(retryTopic, 
0)).thenReturn(minOffset * 2);
         popMessageProcessor.processRequest(handlerContext, request);
         offset = 
brokerController.getConsumerOffsetManager().queryOffset(newGroup, retryTopic, 
0);
-        Assert.assertEquals(minOffset, offset); // will not entry 
getInitOffset() again
+        assertEquals(minOffset, offset); // will not entry getInitOffset() 
again
         messageStore.getMinOffsetInQueue(retryTopic, 0); // prevent 
UnnecessaryStubbingException
     }
 
@@ -193,17 +198,17 @@ public class PopMessageProcessorTest {
                 
.thenReturn(CompletableFuture.completedFuture(getMessageResult));
 
         long offset = 
brokerController.getConsumerOffsetManager().queryOffset(newGroup, topic, 0);
-        Assert.assertEquals(-1, offset);
+        assertEquals(-1, offset);
 
         RemotingCommand request = createPopMsgCommand(newGroup, topic, 0, 
ConsumeInitMode.MAX);
         popMessageProcessor.processRequest(handlerContext, request);
         offset = 
brokerController.getConsumerOffsetManager().queryOffset(newGroup, topic, 0);
-        Assert.assertEquals(maxOffset - 1, offset); // checkInMem return false
+        assertEquals(maxOffset - 1, offset); // checkInMem return false
 
         when(messageStore.getMaxOffsetInQueue(topic, 0)).thenReturn(maxOffset 
* 2);
         popMessageProcessor.processRequest(handlerContext, request);
         offset = 
brokerController.getConsumerOffsetManager().queryOffset(newGroup, topic, 0);
-        Assert.assertEquals(maxOffset - 1, offset); // will not entry 
getInitOffset() again
+        assertEquals(maxOffset - 1, offset); // will not entry getInitOffset() 
again
         messageStore.getMaxOffsetInQueue(topic, 0); // prevent 
UnnecessaryStubbingException
     }
 
@@ -240,4 +245,31 @@ public class PopMessageProcessorTest {
         }
         return getMessageResult;
     }
+
+    @Test
+    public void testBuildCkMsgJsonParsing() {
+        PopCheckPoint ck = new PopCheckPoint();
+        ck.setTopic("TestTopic");
+        ck.setQueueId(1);
+        ck.setStartOffset(100L);
+        ck.setCId("TestConsumer");
+        ck.setPopTime(System.currentTimeMillis());
+        ck.setBrokerName("TestBroker");
+
+        int reviveQid = 0;
+        PopMessageProcessor processor = new 
PopMessageProcessor(brokerController);
+
+        MessageExtBrokerInner result = processor.buildCkMsg(ck, reviveQid);
+
+        String jsonBody = new String(result.getBody(), StandardCharsets.UTF_8);
+        PopCheckPoint actual = JSON.parseObject(jsonBody, PopCheckPoint.class);
+
+        assertEquals(ck.getTopic(), actual.getTopic());
+        assertEquals(ck.getQueueId(), actual.getQueueId());
+        assertEquals(ck.getStartOffset(), actual.getStartOffset());
+        assertEquals(ck.getCId(), actual.getCId());
+        assertEquals(ck.getPopTime(), actual.getPopTime());
+        assertEquals(ck.getBrokerName(), actual.getBrokerName());
+        assertEquals(ck.getReviveTime(), actual.getReviveTime());
+    }
 }
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java
index 3010e83610..e6a2cdb6cd 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java
@@ -16,13 +16,7 @@
  */
 package org.apache.rocketmq.broker.processor;
 
-import com.alibaba.fastjson.JSON;
-import java.net.SocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+import com.alibaba.fastjson2.JSON;
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.failover.EscapeBridge;
@@ -40,12 +34,13 @@ import 
org.apache.rocketmq.common.message.MessageExtBrokerInner;
 import org.apache.rocketmq.common.utils.DataConverter;
 import org.apache.rocketmq.common.utils.NetworkUtil;
 import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.store.AppendMessageResult;
+import org.apache.rocketmq.store.AppendMessageStatus;
 import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.PutMessageStatus;
-import org.apache.rocketmq.store.AppendMessageResult;
-import org.apache.rocketmq.store.AppendMessageStatus;
 import org.apache.rocketmq.store.pop.AckMsg;
+import org.apache.rocketmq.store.pop.BatchAckMsg;
 import org.apache.rocketmq.store.pop.PopCheckPoint;
 import org.apache.rocketmq.store.timer.TimerMessageStore;
 import org.junit.Assert;
@@ -56,18 +51,27 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
 import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.Silent.class)
 public class PopReviveServiceTest {
@@ -405,6 +409,59 @@ public class PopReviveServiceTest {
         verify(messageStore, 
times(1)).putMessage(any(MessageExtBrokerInner.class)); // rewrite CK
     }
 
+    @Test
+    public void testReviveMsgFromBatchAck() throws Throwable {
+        brokerConfig.setEnableSkipLongAwaitingAck(true);
+        when(consumerOffsetManager.queryOffset(PopAckConstants.REVIVE_GROUP, 
REVIVE_TOPIC, REVIVE_QUEUE_ID)).thenReturn(0L);
+        List<MessageExt> reviveMessageExtList = new ArrayList<>();
+        long basePopTime = System.currentTimeMillis();
+        
reviveMessageExtList.add(buildBatchAckMsg(buildBatchAckMsg(Arrays.asList(1L, 
2L, 3L), basePopTime), 1, 1, basePopTime));
+        doReturn(reviveMessageExtList, new 
ArrayList<>()).when(popReviveService).getReviveMessage(anyLong(), anyInt());
+
+        PopReviveService.ConsumeReviveObj consumeReviveObj = new 
PopReviveService.ConsumeReviveObj();
+        popReviveService.consumeReviveMessage(consumeReviveObj);
+        assertEquals(1, consumeReviveObj.map.size());
+
+        ArgumentCaptor<Long> commitOffsetCaptor = 
ArgumentCaptor.forClass(Long.class);
+        doNothing().when(consumerOffsetManager).commitOffset(anyString(), 
anyString(), anyString(), anyInt(), commitOffsetCaptor.capture());
+        popReviveService.mergeAndRevive(consumeReviveObj);
+        assertEquals(1, commitOffsetCaptor.getValue().longValue());
+    }
+
+    public static MessageExtBrokerInner buildBatchAckMsg(BatchAckMsg 
batchAckMsg, long deliverMs, long reviveOffset, long deliverTime) {
+        MessageExtBrokerInner result = buildBatchAckInnerMessage(REVIVE_TOPIC, 
batchAckMsg, REVIVE_QUEUE_ID, STORE_HOST, deliverMs, 
PopMessageProcessor.genAckUniqueId(batchAckMsg));
+        result.setQueueOffset(reviveOffset);
+        result.setDeliverTimeMs(deliverMs);
+        result.setStoreTimestamp(deliverTime);
+        return result;
+    }
+
+    public static BatchAckMsg buildBatchAckMsg(Collection<Long> offsets, long 
popTime) {
+        BatchAckMsg result = new BatchAckMsg();
+        result.setConsumerGroup(GROUP);
+        result.setTopic(TOPIC);
+        result.setQueueId(0);
+        result.setPopTime(popTime);
+        result.setBrokerName("broker-a");
+        result.getAckOffsetList().addAll(offsets);
+        return result;
+    }
+
+    public static MessageExtBrokerInner buildBatchAckInnerMessage(String 
reviveTopic, AckMsg ackMsg, int reviveQid, SocketAddress host, long deliverMs, 
String ackUniqueId) {
+        MessageExtBrokerInner result = new MessageExtBrokerInner();
+        result.setTopic(reviveTopic);
+        
result.setBody(JSON.toJSONString(ackMsg).getBytes(DataConverter.CHARSET_UTF8));
+        result.setQueueId(reviveQid);
+        result.setTags(PopAckConstants.BATCH_ACK_TAG);
+        result.setBornTimestamp(System.currentTimeMillis());
+        result.setBornHost(host);
+        result.setStoreHost(host);
+        result.setDeliverTimeMs(deliverMs);
+        
result.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, 
ackUniqueId);
+        
result.setPropertiesString(MessageDecoder.messageProperties2String(result.getProperties()));
+        return result;
+    }
+
     public static PopCheckPoint buildPopCheckPoint(long startOffset, long 
popTime, long reviveOffset) {
         PopCheckPoint ck = new PopCheckPoint();
         ck.setStartOffset(startOffset);
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManagerTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManagerTest.java
index b74e57ab93..9b25e0134c 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManagerTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManagerTest.java
@@ -17,15 +17,11 @@
 
 package org.apache.rocketmq.broker.topic;
 
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONWriter;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.BrokerConfig;
+import 
org.apache.rocketmq.remoting.protocol.body.TopicQueueMappingSerializeWrapper;
 import 
org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingDetail;
 import 
org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingUtils;
 import 
org.apache.rocketmq.remoting.protocol.statictopic.TopicRemappingDetailWrapper;
@@ -37,6 +33,16 @@ import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
@@ -79,9 +85,9 @@ public class TopicQueueMappingManagerTest {
                 String topic = UUID.randomUUID().toString();
                 int queueNum = 10;
                 TopicRemappingDetailWrapper topicRemappingDetailWrapper  = 
TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, brokers, new 
HashMap<>());
-                Assert.assertEquals(1, 
topicRemappingDetailWrapper.getBrokerConfigMap().size());
+                assertEquals(1, 
topicRemappingDetailWrapper.getBrokerConfigMap().size());
                 TopicQueueMappingDetail topicQueueMappingDetail  = 
topicRemappingDetailWrapper.getBrokerConfigMap().values().iterator().next().getMappingDetail();
-                Assert.assertEquals(queueNum, 
topicQueueMappingDetail.getHostedQueues().size());
+                assertEquals(queueNum, 
topicQueueMappingDetail.getHostedQueues().size());
                 mappingDetailMap.put(topic, topicQueueMappingDetail);
             }
         }
@@ -89,7 +95,7 @@ public class TopicQueueMappingManagerTest {
         {
             topicQueueMappingManager = new 
TopicQueueMappingManager(brokerController);
             Assert.assertTrue(topicQueueMappingManager.load());
-            Assert.assertEquals(0, 
topicQueueMappingManager.getTopicQueueMappingTable().size());
+            assertEquals(0, 
topicQueueMappingManager.getTopicQueueMappingTable().size());
             for (TopicQueueMappingDetail mappingDetail : 
mappingDetailMap.values()) {
                 for (int i = 0; i < 10; i++) {
                     
topicQueueMappingManager.updateTopicQueueMapping(mappingDetail, false, false, 
true);
@@ -101,11 +107,49 @@ public class TopicQueueMappingManagerTest {
         {
             topicQueueMappingManager = new 
TopicQueueMappingManager(brokerController);
             Assert.assertTrue(topicQueueMappingManager.load());
-            Assert.assertEquals(mappingDetailMap.size(), 
topicQueueMappingManager.getTopicQueueMappingTable().size());
+            assertEquals(mappingDetailMap.size(), 
topicQueueMappingManager.getTopicQueueMappingTable().size());
             for (TopicQueueMappingDetail topicQueueMappingDetail: 
topicQueueMappingManager.getTopicQueueMappingTable().values()) {
-                Assert.assertEquals(topicQueueMappingDetail, 
mappingDetailMap.get(topicQueueMappingDetail.getTopic()));
+                assertEquals(topicQueueMappingDetail, 
mappingDetailMap.get(topicQueueMappingDetail.getTopic()));
             }
         }
         delete(topicQueueMappingManager);
     }
+
+    @Test
+    public void testEncodePretty() {
+        TopicQueueMappingManager topicQueueMappingManager = new 
TopicQueueMappingManager(null);
+        TopicQueueMappingDetail detail = new TopicQueueMappingDetail();
+        detail.setTopic("testTopic");
+        detail.setBname("testBroker");
+
+        topicQueueMappingManager.getTopicQueueMappingTable().put("testTopic", 
detail);
+        topicQueueMappingManager.getDataVersion().nextVersion();
+
+        String actual = topicQueueMappingManager.encode(true);
+        TopicQueueMappingSerializeWrapper expectedWrapper = new 
TopicQueueMappingSerializeWrapper();
+        expectedWrapper.setTopicQueueMappingInfoMap(new 
ConcurrentHashMap<>(topicQueueMappingManager.getTopicQueueMappingTable()));
+        
expectedWrapper.setDataVersion(topicQueueMappingManager.getDataVersion());
+        String expected = JSON.toJSONString(expectedWrapper, 
JSONWriter.Feature.PrettyFormat);
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testEncodeNonPretty() {
+        TopicQueueMappingManager topicQueueMappingManager = new 
TopicQueueMappingManager(null);
+        TopicQueueMappingDetail detail = new TopicQueueMappingDetail();
+        detail.setTopic("testTopic");
+        detail.setBname("testBroker");
+
+        topicQueueMappingManager.getTopicQueueMappingTable().put("testTopic", 
detail);
+        topicQueueMappingManager.getDataVersion().nextVersion();
+
+        String actual = topicQueueMappingManager.encode(false);
+        TopicQueueMappingSerializeWrapper expectedWrapper = new 
TopicQueueMappingSerializeWrapper();
+        expectedWrapper.setTopicQueueMappingInfoMap(new 
ConcurrentHashMap<>(topicQueueMappingManager.getTopicQueueMappingTable()));
+        
expectedWrapper.setDataVersion(topicQueueMappingManager.getDataVersion());
+        String expected = JSON.toJSONString(expectedWrapper);
+
+        assertEquals(expected, actual);
+    }
 }
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionMetricsTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionMetricsTest.java
index 690b4eabb5..62a6ad8b5b 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionMetricsTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionMetricsTest.java
@@ -19,23 +19,40 @@ package org.apache.rocketmq.broker.transaction.queue;
 
 import org.apache.rocketmq.broker.transaction.TransactionMetrics;
 import org.apache.rocketmq.broker.transaction.TransactionMetrics.Metric;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.junit.MockitoJUnitRunner;
 
+import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.Collections;
+import java.util.UUID;
 
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 
 @RunWith(MockitoJUnitRunner.class)
 public class TransactionMetricsTest {
     private TransactionMetrics transactionMetrics;
     private String configPath;
+    private Path path;
 
     @Before
-    public void setUp() throws Exception {
-        configPath = "configPath";
-        transactionMetrics = new TransactionMetrics(configPath);
+    public void before() throws Exception {
+        configPath = createBaseDir();
+        path = Paths.get(configPath);
+        transactionMetrics = spy(new TransactionMetrics(configPath));
+    }
+
+    @After
+    public void after() throws Exception {
+        deleteFile(configPath);
+        assertFalse(path.toFile().exists());
     }
 
     /**
@@ -80,4 +97,40 @@ public class TransactionMetricsTest {
         transactionMetrics.cleanMetrics(Collections.singleton(topic));
         assert transactionMetrics.getTransactionCount(topic) == 0;
     }
+
+    @Test
+    public void testPersist() {
+        assertFalse(path.toFile().exists());
+        transactionMetrics.persist();
+        assertTrue(path.toFile().exists());
+        verify(transactionMetrics).persist();
+    }
+
+    private String createBaseDir() {
+        String baseDir = System.getProperty("java.io.tmpdir") + File.separator 
+ "unitteststore-" + UUID.randomUUID();
+        final File file = new File(baseDir);
+        if (file.exists()) {
+            System.exit(1);
+        }
+        return baseDir;
+    }
+
+    private void deleteFile(String fileName) {
+        deleteFile(new File(fileName));
+    }
+
+    private void deleteFile(File file) {
+        if (!file.exists()) {
+            return;
+        }
+        if (file.isFile()) {
+            file.delete();
+        } else if (file.isDirectory()) {
+            File[] files = file.listFiles();
+            for (File file1 : files) {
+                deleteFile(file1);
+            }
+            file.delete();
+        }
+    }
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java 
b/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java
index 38e0a20752..67cf045cfb 100644
--- a/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java
+++ b/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java
@@ -16,7 +16,7 @@
  */
 package org.apache.rocketmq.store.pop;
 
-import com.alibaba.fastjson.annotation.JSONField;
+import com.alibaba.fastjson2.annotation.JSONField;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -35,7 +35,6 @@ public class PopCheckPoint implements 
Comparable<PopCheckPoint> {
     private int queueId;
     @JSONField(name = "t")
     private String topic;
-    @JSONField(name = "c")
     private String cid;
     @JSONField(name = "ro")
     private long reviveOffset;
@@ -114,10 +113,12 @@ public class PopCheckPoint implements 
Comparable<PopCheckPoint> {
         this.topic = topic;
     }
 
+    @JSONField(name = "c")
     public String getCId() {
         return cid;
     }
 
+    @JSONField(name = "c")
     public void setCId(String cid) {
         this.cid = cid;
     }

Reply via email to