This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new c8938e0c6c [ISSUE #8829] feat: provide ConfigManagerV2 to make best
uses of RocksDB (#8856)
c8938e0c6c is described below
commit c8938e0c6c263cdb73b593ee9984841115604679
Author: Zhanhui Li <[email protected]>
AuthorDate: Mon Oct 28 10:00:33 2024 +0800
[ISSUE #8829] feat: provide ConfigManagerV2 to make best uses of RocksDB
(#8856)
* feat: provide ConfigManagerV2 to make best uses of RocksDB
Signed-off-by: Li Zhanhui <[email protected]>
* fix: release RocksDB objects using try-with-resource
Signed-off-by: Li Zhanhui <[email protected]>
---------
Signed-off-by: Li Zhanhui <[email protected]>
---
.../apache/rocketmq/broker/BrokerController.java | 39 +-
.../v1}/RocksDBConsumerOffsetManager.java | 3 +-
.../v1}/RocksDBLmqConsumerOffsetManager.java | 2 +-
.../v1}/RocksDBLmqSubscriptionGroupManager.java | 2 +-
.../v1}/RocksDBLmqTopicConfigManager.java | 2 +-
.../v1}/RocksDBOffsetSerializeWrapper.java | 2 +-
.../v1}/RocksDBSubscriptionGroupManager.java | 3 +-
.../v1}/RocksDBTopicConfigManager.java | 3 +-
.../rocketmq/broker/config/v2/ConfigHelper.java | 132 +++++++
.../rocketmq/broker/config/v2/ConfigStorage.java | 122 ++++++
.../broker/config/v2/ConsumerOffsetManagerV2.java | 426 +++++++++++++++++++++
.../v2/RecordPrefix.java} | 21 +-
.../v2/SerializationType.java} | 32 +-
.../config/v2/SubscriptionGroupManagerV2.java | 171 +++++++++
.../v2/TableId.java} | 26 +-
.../v2/TablePrefix.java} | 20 +-
.../broker/config/v2/TopicConfigManagerV2.java | 191 +++++++++
.../v2/package-info.java} | 26 +-
.../broker/offset/ConsumerOffsetManager.java | 2 +-
.../subscription/SubscriptionGroupManager.java | 4 +-
.../rocketmq/broker/topic/TopicConfigManager.java | 4 +-
.../offset/RocksDBConsumerOffsetManagerTest.java | 1 +
.../RocksDBLmqConsumerOffsetManagerTest.java | 1 +
.../offset/RocksDBOffsetSerializeWrapperTest.java | 1 +
.../offset/RocksdbTransferOffsetAndCqTest.java | 1 +
.../broker/processor/AdminBrokerProcessorTest.java | 4 +-
.../RocksdbGroupConfigTransferTest.java | 1 +
.../topic/RocksdbTopicConfigManagerTest.java | 1 +
.../topic/RocksdbTopicConfigTransferTest.java | 1 +
.../org/apache/rocketmq/common/BrokerConfig.java | 14 +
.../common/config/ConfigManagerVersion.java | 21 +-
31 files changed, 1186 insertions(+), 93 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 05a00a5005..ee211e1b80 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -76,8 +76,8 @@ import
org.apache.rocketmq.broker.offset.BroadcastOffsetManager;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager;
import org.apache.rocketmq.broker.offset.LmqConsumerOffsetManager;
-import org.apache.rocketmq.broker.offset.RocksDBConsumerOffsetManager;
-import org.apache.rocketmq.broker.offset.RocksDBLmqConsumerOffsetManager;
+import org.apache.rocketmq.broker.config.v1.RocksDBConsumerOffsetManager;
+import org.apache.rocketmq.broker.config.v1.RocksDBLmqConsumerOffsetManager;
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
import org.apache.rocketmq.broker.processor.AckMessageProcessor;
@@ -99,12 +99,16 @@ import
org.apache.rocketmq.broker.processor.SendMessageProcessor;
import org.apache.rocketmq.broker.schedule.ScheduleMessageService;
import org.apache.rocketmq.broker.slave.SlaveSynchronize;
import org.apache.rocketmq.broker.subscription.LmqSubscriptionGroupManager;
-import
org.apache.rocketmq.broker.subscription.RocksDBLmqSubscriptionGroupManager;
-import org.apache.rocketmq.broker.subscription.RocksDBSubscriptionGroupManager;
+import org.apache.rocketmq.broker.config.v1.RocksDBLmqSubscriptionGroupManager;
+import org.apache.rocketmq.broker.config.v1.RocksDBSubscriptionGroupManager;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
+import org.apache.rocketmq.broker.config.v2.ConsumerOffsetManagerV2;
+import org.apache.rocketmq.broker.config.v2.SubscriptionGroupManagerV2;
+import org.apache.rocketmq.broker.config.v2.TopicConfigManagerV2;
+import org.apache.rocketmq.broker.config.v2.ConfigStorage;
import org.apache.rocketmq.broker.topic.LmqTopicConfigManager;
-import org.apache.rocketmq.broker.topic.RocksDBLmqTopicConfigManager;
-import org.apache.rocketmq.broker.topic.RocksDBTopicConfigManager;
+import org.apache.rocketmq.broker.config.v1.RocksDBLmqTopicConfigManager;
+import org.apache.rocketmq.broker.config.v1.RocksDBTopicConfigManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.broker.topic.TopicQueueMappingCleanService;
import org.apache.rocketmq.broker.topic.TopicQueueMappingManager;
@@ -124,6 +128,7 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.config.ConfigManagerVersion;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.message.MessageExt;
@@ -239,6 +244,11 @@ public class BrokerController {
protected RemotingServer remotingServer;
protected CountDownLatch remotingServerStartLatch;
protected RemotingServer fastRemotingServer;
+
+ /**
+ * If {Topic, SubscriptionGroup, Offset}ManagerV2 are used, config entries
are stored in RocksDB.
+ */
+ protected ConfigStorage configStorage;
protected TopicConfigManager topicConfigManager;
protected SubscriptionGroupManager subscriptionGroupManager;
protected TopicQueueMappingManager topicQueueMappingManager;
@@ -334,7 +344,12 @@ public class BrokerController {
this.setStoreHost(new
InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), getListenPort()));
this.brokerStatsManager = messageStoreConfig.isEnableLmq() ? new
LmqBrokerStatsManager(this.brokerConfig.getBrokerClusterName(),
this.brokerConfig.isEnableDetailStat()) : new
BrokerStatsManager(this.brokerConfig.getBrokerClusterName(),
this.brokerConfig.isEnableDetailStat());
this.broadcastOffsetManager = new BroadcastOffsetManager(this);
- if (this.messageStoreConfig.isEnableRocksDBStore()) {
+ if
(ConfigManagerVersion.V2.getVersion().equals(brokerConfig.getConfigManagerVersion()))
{
+ this.configStorage = new
ConfigStorage(messageStoreConfig.getStorePathRootDir());
+ this.topicConfigManager = new TopicConfigManagerV2(this,
configStorage);
+ this.subscriptionGroupManager = new
SubscriptionGroupManagerV2(this, configStorage);
+ this.consumerOffsetManager = new ConsumerOffsetManagerV2(this,
configStorage);
+ } else if (this.messageStoreConfig.isEnableRocksDBStore()) {
this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new
RocksDBLmqTopicConfigManager(this) : new RocksDBTopicConfigManager(this);
this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ?
new RocksDBLmqSubscriptionGroupManager(this) : new
RocksDBSubscriptionGroupManager(this);
this.consumerOffsetManager = messageStoreConfig.isEnableLmq() ?
new RocksDBLmqConsumerOffsetManager(this) : new
RocksDBConsumerOffsetManager(this);
@@ -771,7 +786,11 @@ public class BrokerController {
}
public boolean initializeMetadata() {
- boolean result = this.topicConfigManager.load();
+ boolean result = true;
+ if (null != configStorage) {
+ result = configStorage.start();
+ }
+ result = result && this.topicConfigManager.load();
result = result && this.topicQueueMappingManager.load();
result = result && this.consumerOffsetManager.load();
result = result && this.subscriptionGroupManager.load();
@@ -1547,6 +1566,10 @@ public class BrokerController {
this.consumerOffsetManager.stop();
}
+ if (null != configStorage) {
+ configStorage.shutdown();
+ }
+
if (this.authenticationMetadataManager != null) {
this.authenticationMetadataManager.shutdown();
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java
similarity index 98%
rename from
broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java
rename to
broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java
index 1e7cda71ee..8066fe769a 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.broker.offset;
+package org.apache.rocketmq.broker.config.v1;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
@@ -24,6 +24,7 @@ import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.RocksDBConfigManager;
+import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.utils.DataConverter;
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBLmqConsumerOffsetManager.java
similarity index 98%
rename from
broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManager.java
rename to
broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBLmqConsumerOffsetManager.java
index d0faa66140..e961c6c635 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBLmqConsumerOffsetManager.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.broker.offset;
+package org.apache.rocketmq.broker.config.v1;
import java.util.HashMap;
import java.util.Map;
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBLmqSubscriptionGroupManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBLmqSubscriptionGroupManager.java
similarity index 97%
rename from
broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBLmqSubscriptionGroupManager.java
rename to
broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBLmqSubscriptionGroupManager.java
index e4de25756b..05f3f7d2ec 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBLmqSubscriptionGroupManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBLmqSubscriptionGroupManager.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.broker.subscription;
+package org.apache.rocketmq.broker.config.v1;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.MixAll;
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBLmqTopicConfigManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBLmqTopicConfigManager.java
similarity index 97%
rename from
broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBLmqTopicConfigManager.java
rename to
broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBLmqTopicConfigManager.java
index d049a8dbcd..7b27801396 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBLmqTopicConfigManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBLmqTopicConfigManager.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.broker.topic;
+package org.apache.rocketmq.broker.config.v1;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.MixAll;
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBOffsetSerializeWrapper.java
similarity index 96%
copy from
broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java
copy to
broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBOffsetSerializeWrapper.java
index 7a90fd62fb..4801cfc681 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBOffsetSerializeWrapper.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.broker.offset;
+package org.apache.rocketmq.broker.config.v1;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java
similarity index 98%
rename from
broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java
rename to
broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java
index 5119f78672..8175d63cce 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.broker.subscription;
+package org.apache.rocketmq.broker.config.v1;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
@@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.function.BiConsumer;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.RocksDBConfigManager;
+import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.utils.DataConverter;
import org.apache.rocketmq.remoting.protocol.DataVersion;
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java
similarity index 98%
rename from
broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java
rename to
broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java
index 466e6416f9..bce67392f6 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.broker.topic;
+package org.apache.rocketmq.broker.config.v1;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.RocksDBConfigManager;
+import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.utils.DataConverter;
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigHelper.java
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigHelper.java
new file mode 100644
index 0000000000..8183a1f835
--- /dev/null
+++
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigHelper.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.config.v2;
+
+import com.alibaba.fastjson2.JSON;
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.nio.charset.StandardCharsets;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.common.config.AbstractRocksDBStorage;
+import org.apache.rocketmq.remoting.protocol.DataVersion;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+
+public class ConfigHelper {
+
+ /**
+ * <p>
+ * Layout of data version key:
+ * [table-prefix, 1 byte][table-id, 2 byte][record-prefix, 1
byte][data-version-bytes]
+ * </p>
+ *
+ * <p>
+ * Layout of data version value:
+ * [state-machine-version, 8 bytes][timestamp, 8 bytes][sequence counter,
8 bytes]
+ * </p>
+ *
+ * @throws RocksDBException if RocksDB raises an error
+ */
+ public static Optional<ByteBuf> loadDataVersion(ConfigStorage
configStorage, TableId tableId)
+ throws RocksDBException {
+ int keyLen = 1 /* table-prefix */ + Short.BYTES /* table-id */ + 1 /*
record-prefix */
+ + ConfigStorage.DATA_VERSION_KEY_BYTES.length;
+ ByteBuf keyBuf =
AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen);
+ try {
+ keyBuf.writeByte(TablePrefix.TABLE.getValue());
+ keyBuf.writeShort(tableId.getValue());
+ keyBuf.writeByte(RecordPrefix.DATA_VERSION.getValue());
+ keyBuf.writeBytes(ConfigStorage.DATA_VERSION_KEY_BYTES);
+ byte[] valueByes = configStorage.get(keyBuf.nioBuffer());
+ if (null != valueByes) {
+ ByteBuf valueBuf = Unpooled.wrappedBuffer(valueByes);
+ return Optional.of(valueBuf);
+ }
+ } finally {
+ keyBuf.release();
+ }
+ return Optional.empty();
+ }
+
+ public static void stampDataVersion(WriteBatch writeBatch, DataVersion
dataVersion, long stateMachineVersion)
+ throws RocksDBException {
+ // Increase data version
+ dataVersion.nextVersion(stateMachineVersion);
+
+ int keyLen = 1 /* table-prefix */ + Short.BYTES /* table-id */ + 1 /*
record-prefix */
+ + ConfigStorage.DATA_VERSION_KEY_BYTES.length;
+ ByteBuf keyBuf =
AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen);
+ ByteBuf valueBuf =
AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(Long.BYTES * 3);
+ try {
+ keyBuf.writeByte(TablePrefix.TABLE.getValue());
+ keyBuf.writeShort(TableId.CONSUMER_OFFSET.getValue());
+ keyBuf.writeByte(RecordPrefix.DATA_VERSION.getValue());
+ keyBuf.writeBytes(ConfigStorage.DATA_VERSION_KEY_BYTES);
+ valueBuf.writeLong(dataVersion.getStateVersion());
+ valueBuf.writeLong(dataVersion.getTimestamp());
+ valueBuf.writeLong(dataVersion.getCounter().get());
+ writeBatch.put(keyBuf.nioBuffer(), valueBuf.nioBuffer());
+ } finally {
+ keyBuf.release();
+ valueBuf.release();
+ }
+ }
+
+ public static void onDataVersionLoad(ByteBuf buf, DataVersion dataVersion)
{
+ if (buf.readableBytes() == 8 /* state machine version */ + 8 /*
timestamp */ + 8 /* counter */) {
+ long stateMachineVersion = buf.readLong();
+ long timestamp = buf.readLong();
+ long counter = buf.readLong();
+ dataVersion.setStateVersion(stateMachineVersion);
+ dataVersion.setTimestamp(timestamp);
+ dataVersion.setCounter(new AtomicLong(counter));
+ }
+ buf.release();
+ }
+
+ public static ByteBuf keyBufOf(TableId tableId, final String name) {
+ Preconditions.checkNotNull(name);
+ byte[] bytes = name.getBytes(StandardCharsets.UTF_8);
+ int keyLen = 1 /* table-prefix */ + 2 /* table-id */ + 1 /*
record-type-prefix */ + 2 /* name-length */ + bytes.length;
+ ByteBuf keyBuf =
AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen);
+ keyBuf.writeByte(TablePrefix.TABLE.getValue());
+ keyBuf.writeShort(tableId.getValue());
+ keyBuf.writeByte(RecordPrefix.DATA.getValue());
+ keyBuf.writeShort(bytes.length);
+ keyBuf.writeBytes(bytes);
+ return keyBuf;
+ }
+
+ public static ByteBuf valueBufOf(final Object config, SerializationType
serializationType) {
+ if (SerializationType.JSON == serializationType) {
+ byte[] payload = JSON.toJSONBytes(config);
+ ByteBuf valueBuf =
AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(1 + payload.length);
+ valueBuf.writeByte(SerializationType.JSON.getValue());
+ valueBuf.writeBytes(payload);
+ return valueBuf;
+ }
+ throw new RuntimeException("Unsupported serialization type: " +
serializationType);
+ }
+
+ public static byte[] readBytes(final ByteBuf buf) {
+ byte[] bytes = new byte[buf.readableBytes()];
+ buf.readBytes(bytes);
+ return bytes;
+ }
+}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigStorage.java
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigStorage.java
new file mode 100644
index 0000000000..af259aaa37
--- /dev/null
+++
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigStorage.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.config.v2;
+
+import io.netty.util.internal.PlatformDependent;
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.config.AbstractRocksDBStorage;
+import org.apache.rocketmq.common.config.ConfigHelper;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DirectSlice;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+/**
+ * https://book.tidb.io/session1/chapter3/tidb-kv-to-relation.html
+ */
+public class ConfigStorage extends AbstractRocksDBStorage {
+
+ public static final String DATA_VERSION_KEY = "data_version";
+ public static final byte[] DATA_VERSION_KEY_BYTES =
DATA_VERSION_KEY.getBytes(StandardCharsets.UTF_8);
+
+ public ConfigStorage(String storePath) {
+ super(storePath + File.separator + "config" + File.separator + "rdb");
+ }
+
+ @Override
+ protected boolean postLoad() {
+ if (!PlatformDependent.hasUnsafe()) {
+ LOGGER.error("Unsafe not available and POOLED_ALLOCATOR cannot
work correctly");
+ return false;
+ }
+ try {
+ UtilAll.ensureDirOK(this.dbPath);
+ initOptions();
+ List<ColumnFamilyDescriptor> cfDescriptors = new ArrayList<>();
+
+ ColumnFamilyOptions defaultOptions =
ConfigHelper.createConfigOptions();
+ this.cfOptions.add(defaultOptions);
+ cfDescriptors.add(new
ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, defaultOptions));
+
+ // Start RocksDB instance
+ open(cfDescriptors);
+
+ this.defaultCFHandle = cfHandles.get(0);
+ } catch (final Exception e) {
+ AbstractRocksDBStorage.LOGGER.error("postLoad Failed. {}",
this.dbPath, e);
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ protected void preShutdown() {
+
+ }
+
+ protected void initOptions() {
+ this.options = ConfigHelper.createConfigDBOptions();
+ super.initOptions();
+ }
+
+ @Override
+ protected void initAbleWalWriteOptions() {
+ this.ableWalWriteOptions = new WriteOptions();
+
+ // For metadata, prioritize data integrity
+ this.ableWalWriteOptions.setSync(true);
+
+ // We need WAL for config changes
+ this.ableWalWriteOptions.setDisableWAL(false);
+
+ // No fast failure on block, wait synchronously even if there is wait
for the write request
+ this.ableWalWriteOptions.setNoSlowdown(false);
+ }
+
+ public byte[] get(ByteBuffer key) throws RocksDBException {
+ byte[] keyBytes = new byte[key.remaining()];
+ key.get(keyBytes);
+ return super.get(getDefaultCFHandle(), totalOrderReadOptions,
keyBytes);
+ }
+
+ public void write(WriteBatch writeBatch) throws RocksDBException {
+ db.write(ableWalWriteOptions, writeBatch);
+ }
+
+ public RocksIterator iterate(ByteBuffer beginKey, ByteBuffer endKey) {
+ try (ReadOptions readOptions = new ReadOptions()) {
+ readOptions.setTotalOrderSeek(true);
+ readOptions.setTailing(false);
+ readOptions.setAutoPrefixMode(true);
+ readOptions.setIterateLowerBound(new DirectSlice(beginKey));
+ readOptions.setIterateUpperBound(new DirectSlice(endKey));
+ RocksIterator iterator = db.newIterator(defaultCFHandle,
readOptions);
+ iterator.seekToFirst();
+ return iterator;
+ }
+ }
+}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java
new file mode 100644
index 0000000000..5b0885c491
--- /dev/null
+++
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.config.v2;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.internal.PlatformDependent;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.config.AbstractRocksDBStorage;
+import org.apache.rocketmq.store.MessageStore;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+
+/**
+ * <p>
+ * Layout of consumer offset key:
+ * [table-prefix, 1 byte][table-id, 2 bytes][record-prefix, 1 byte][group-len,
2 bytes][group bytes][CTRL_1, 1 byte]
+ * [topic-len, 2 bytes][topic bytes][CTRL_1, 1 byte][queue-id, 4 bytes]
+ * </p>
+ *
+ * <p>
+ * Layout of consumer offset value: [offset, 8 bytes]
+ * </p>
+ */
+public class ConsumerOffsetManagerV2 extends ConsumerOffsetManager {
+
+ private final ConfigStorage configStorage;
+
+ public ConsumerOffsetManagerV2(BrokerController brokerController,
ConfigStorage configStorage) {
+ super(brokerController);
+ this.configStorage = configStorage;
+ }
+
+ @Override
+ protected void removeConsumerOffset(String topicAtGroup) {
+ if (!MixAll.isLmq(topicAtGroup)) {
+ super.removeConsumerOffset(topicAtGroup);
+ }
+
+ String[] topicGroup = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
+ if (topicGroup.length != 2) {
+ LOG.error("Invalid topic group: {}", topicAtGroup);
+ return;
+ }
+
+ byte[] topicBytes = topicGroup[0].getBytes(StandardCharsets.UTF_8);
+ byte[] groupBytes = topicGroup[1].getBytes(StandardCharsets.UTF_8);
+
+ int keyLen = 1 /* table-prefix */ + Short.BYTES /* table-id */ + 1 /*
record-prefix */
+ + Short.BYTES /* group-len */ + groupBytes.length + 1 /* CTRL_1 */
+ + Short.BYTES + topicBytes.length + 1;
+ // [table-prefix, 1 byte][table-id, 2 bytes][record-prefix, 1
byte][group-len, 2 bytes][group-bytes][CTRL_1, 1 byte]
+ // [topic-len, 2 bytes][topic-bytes][CTRL_1]
+ ByteBuf beginKey =
AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen);
+ beginKey.writeByte(TablePrefix.TABLE.getValue());
+ beginKey.writeShort(TableId.CONSUMER_OFFSET.getValue());
+ beginKey.writeByte(RecordPrefix.DATA.getValue());
+ beginKey.writeShort(groupBytes.length);
+ beginKey.writeBytes(groupBytes);
+ beginKey.writeByte(AbstractRocksDBStorage.CTRL_1);
+ beginKey.writeShort(topicBytes.length);
+ beginKey.writeBytes(topicBytes);
+ beginKey.writeByte(AbstractRocksDBStorage.CTRL_1);
+
+ ByteBuf endKey =
AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen);
+ endKey.writeByte(TablePrefix.TABLE.getValue());
+ endKey.writeShort(TableId.CONSUMER_OFFSET.getValue());
+ endKey.writeByte(RecordPrefix.DATA.getValue());
+ endKey.writeShort(groupBytes.length);
+ endKey.writeBytes(groupBytes);
+ endKey.writeByte(AbstractRocksDBStorage.CTRL_1);
+ endKey.writeShort(topicBytes.length);
+ endKey.writeBytes(topicBytes);
+ endKey.writeByte(AbstractRocksDBStorage.CTRL_2);
+
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ // TODO: we have to make a copy here as WriteBatch lacks
ByteBuffer API here
+ writeBatch.deleteRange(ConfigHelper.readBytes(beginKey),
ConfigHelper.readBytes(endKey));
+ long stateMachineVersion = brokerController.getMessageStore() !=
null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
+ ConfigHelper.stampDataVersion(writeBatch, dataVersion,
stateMachineVersion);
+ configStorage.write(writeBatch);
+ } catch (RocksDBException e) {
+ LOG.error("Failed to removeConsumerOffset, topicAtGroup={}",
topicAtGroup, e);
+ } finally {
+ beginKey.release();
+ endKey.release();
+ }
+ }
+
+ @Override
+ public void removeOffset(String group) {
+ if (!MixAll.isLmq(group)) {
+ super.removeOffset(group);
+ }
+
+ byte[] groupBytes = group.getBytes(StandardCharsets.UTF_8);
+ int keyLen = 1 /* table-prefix */ + Short.BYTES /* table-id */ + 1 /*
record-prefix */
+ + Short.BYTES /* group-len */ + groupBytes.length + 1 /* CTRL_1 */;
+
+ // [table-prefix, 1 byte][table-id, 2 bytes][record-prefix, 1
byte][group-len, 2 bytes][group bytes][CTRL_1, 1 byte]
+ ByteBuf beginKey =
AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen);
+ beginKey.writeByte(TablePrefix.TABLE.getValue());
+ beginKey.writeShort(TableId.CONSUMER_OFFSET.getValue());
+ beginKey.writeByte(RecordPrefix.DATA.getValue());
+ beginKey.writeShort(groupBytes.length);
+ beginKey.writeBytes(groupBytes);
+ beginKey.writeByte(AbstractRocksDBStorage.CTRL_1);
+
+ ByteBuf endKey =
AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen);
+ endKey.writeByte(TablePrefix.TABLE.getValue());
+ endKey.writeShort(TableId.CONSUMER_OFFSET.getValue());
+ endKey.writeByte(RecordPrefix.DATA.getValue());
+ endKey.writeShort(groupBytes.length);
+ endKey.writeBytes(groupBytes);
+ endKey.writeByte(AbstractRocksDBStorage.CTRL_2);
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ // TODO: we have to make a copy here as WriteBatch lacks
ByteBuffer API here
+ writeBatch.deleteRange(ConfigHelper.readBytes(beginKey),
ConfigHelper.readBytes(endKey));
+ MessageStore messageStore = brokerController.getMessageStore();
+ long stateMachineVersion = messageStore != null ?
messageStore.getStateMachineVersion() : 0;
+ ConfigHelper.stampDataVersion(writeBatch, dataVersion,
stateMachineVersion);
+ configStorage.write(writeBatch);
+ } catch (RocksDBException e) {
+ LOG.error("Failed to consumer offsets by group={}", group, e);
+ } finally {
+ beginKey.release();
+ endKey.release();
+ }
+ }
+
+ /**
+ * <p>
+ * Layout of consumer offset key:
+ * [table-prefix, 1 byte][table-id, 2 bytes][record-prefix, 1
byte][group-len, 2 bytes][group bytes][CTRL_1, 1 byte]
+ * [topic-len, 2 bytes][topic bytes][CTRL_1, 1 byte][queue-id, 4 bytes]
+ * </p>
+ *
+ * <p>
+ * Layout of consumer offset value:
+ * [offset, 8 bytes]
+ * </p>
+ *
+ * @param clientHost The client that submits consumer offsets
+ * @param group Group name
+ * @param topic Topic name
+ * @param queueId Queue ID
+ * @param offset Consumer offset of the specified queue
+ */
+ @Override
+ public void commitOffset(String clientHost, String group, String topic,
int queueId, long offset) {
+ String key = topic + TOPIC_GROUP_SEPARATOR + group;
+
+ // We maintain a copy of classic consumer offset table in memory as
they take very limited memory footprint.
+ // For LMQ offsets, given the volume and number of these type of
records, they are maintained in RocksDB
+ // directly. Frequently used LMQ consumer offsets should reside either
in block-cache or MemTable, so read/write
+ // should be blazingly fast.
+ if (!MixAll.isLmq(topic)) {
+ if (offsetTable.containsKey(key)) {
+ offsetTable.get(key).put(queueId, offset);
+ } else {
+ ConcurrentMap<Integer, Long> map = new ConcurrentHashMap<>();
+ ConcurrentMap<Integer, Long> prev =
offsetTable.putIfAbsent(key, map);
+ if (null != prev) {
+ map = prev;
+ }
+ map.put(queueId, offset);
+ }
+ }
+
+ ByteBuf keyBuf = keyOfConsumerOffset(group, topic, queueId);
+ ByteBuf valueBuf = ConfigStorage.POOLED_ALLOCATOR.buffer(Long.BYTES);
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ valueBuf.writeLong(offset);
+ writeBatch.put(keyBuf.nioBuffer(), valueBuf.nioBuffer());
+ MessageStore messageStore = brokerController.getMessageStore();
+ long stateMachineVersion = messageStore != null ?
messageStore.getStateMachineVersion() : 0;
+ ConfigHelper.stampDataVersion(writeBatch, dataVersion,
stateMachineVersion);
+ configStorage.write(writeBatch);
+ } catch (RocksDBException e) {
+ LOG.error("Failed to commit consumer offset", e);
+ } finally {
+ keyBuf.release();
+ valueBuf.release();
+ }
+ }
+
+ private ByteBuf keyOfConsumerOffset(String group, String topic, int
queueId) {
+ byte[] groupBytes = group.getBytes(StandardCharsets.UTF_8);
+ byte[] topicBytes = topic.getBytes(StandardCharsets.UTF_8);
+ int keyLen = 1 /*table prefix*/ + Short.BYTES /*table-id*/ + 1
/*record-prefix*/
+ + Short.BYTES /*group-len*/ + groupBytes.length + 1 /*CTRL_1*/
+ + 2 /*topic-len*/ + topicBytes.length + 1 /* CTRL_1*/
+ + Integer.BYTES /*queue-id*/;
+ ByteBuf keyBuf = ConfigStorage.POOLED_ALLOCATOR.buffer(keyLen);
+ keyBuf.writeByte(TablePrefix.TABLE.getValue());
+ keyBuf.writeShort(TableId.CONSUMER_OFFSET.getValue());
+ keyBuf.writeByte(RecordPrefix.DATA.getValue());
+ keyBuf.writeShort(groupBytes.length);
+ keyBuf.writeBytes(groupBytes);
+ keyBuf.writeByte(AbstractRocksDBStorage.CTRL_1);
+ keyBuf.writeShort(topicBytes.length);
+ keyBuf.writeBytes(topicBytes);
+ keyBuf.writeByte(AbstractRocksDBStorage.CTRL_1);
+ keyBuf.writeInt(queueId);
+ return keyBuf;
+ }
+
+ private ByteBuf keyOfPullOffset(String group, String topic, int queueId) {
+ byte[] groupBytes = group.getBytes(StandardCharsets.UTF_8);
+ byte[] topicBytes = topic.getBytes(StandardCharsets.UTF_8);
+ int keyLen = 1 /*table prefix*/ + Short.BYTES /*table-id*/ + 1
/*record-prefix*/
+ + Short.BYTES /*group-len*/ + groupBytes.length + 1 /*CTRL_1*/
+ + 2 /*topic-len*/ + topicBytes.length + 1 /* CTRL_1*/
+ + Integer.BYTES /*queue-id*/;
+ ByteBuf keyBuf = ConfigStorage.POOLED_ALLOCATOR.buffer(keyLen);
+ keyBuf.writeByte(TablePrefix.TABLE.getValue());
+ keyBuf.writeShort(TableId.PULL_OFFSET.getValue());
+ keyBuf.writeByte(RecordPrefix.DATA.getValue());
+ keyBuf.writeShort(groupBytes.length);
+ keyBuf.writeBytes(groupBytes);
+ keyBuf.writeByte(AbstractRocksDBStorage.CTRL_1);
+ keyBuf.writeShort(topicBytes.length);
+ keyBuf.writeBytes(topicBytes);
+ keyBuf.writeByte(AbstractRocksDBStorage.CTRL_1);
+ keyBuf.writeInt(queueId);
+ return keyBuf;
+ }
+
+ @Override
+ public boolean load() {
+ return loadDataVersion() && loadConsumerOffsets();
+ }
+
+ @Override
+ public synchronized void persist() {
+ try {
+ configStorage.flushWAL();
+ } catch (RocksDBException e) {
+ LOG.error("Failed to flush RocksDB config instance WAL", e);
+ }
+ }
+
+ /**
+ * <p>
+ * Layout of data version key:
+ * [table-prefix, 1 byte][table-id, 2 byte][record-prefix, 1
byte][data-version-bytes]
+ * </p>
+ *
+ * <p>
+ * Layout of data version value:
+ * [state-machine-version, 8 bytes][timestamp, 8 bytes][sequence counter,
8 bytes]
+ * </p>
+ */
+ public boolean loadDataVersion() {
+ try {
+ ConfigHelper.loadDataVersion(configStorage,
TableId.CONSUMER_OFFSET)
+ .ifPresent(buf -> ConfigHelper.onDataVersionLoad(buf,
dataVersion));
+ } catch (RocksDBException e) {
+ LOG.error("Failed to load RocksDB config", e);
+ return false;
+ }
+ return true;
+ }
+
+ private boolean loadConsumerOffsets() {
+ // [table-prefix, 1 byte][table-id, 2 bytes][record-prefix, 1 byte]
+ ByteBuf beginKeyBuf =
AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(4);
+ beginKeyBuf.writeByte(TablePrefix.TABLE.getValue());
+ beginKeyBuf.writeShort(TableId.CONSUMER_OFFSET.getValue());
+ beginKeyBuf.writeByte(RecordPrefix.DATA.getValue());
+
+ ByteBuf endKeyBuf = AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(4);
+ endKeyBuf.writeByte(TablePrefix.TABLE.getValue());
+ endKeyBuf.writeShort(TableId.CONSUMER_OFFSET.getValue());
+ endKeyBuf.writeByte(RecordPrefix.DATA.getValue() + 1);
+
+ try (RocksIterator iterator =
configStorage.iterate(beginKeyBuf.nioBuffer(), endKeyBuf.nioBuffer())) {
+ int keyCapacity = 256;
+ // We may iterate millions of LMQ consumer offsets here, use
direct byte buffers here to avoid memory
+ // fragment
+ ByteBuffer keyBuffer = ByteBuffer.allocateDirect(keyCapacity);
+ ByteBuffer valueBuffer = ByteBuffer.allocateDirect(Long.BYTES);
+ while (iterator.isValid()) {
+ keyBuffer.clear();
+ valueBuffer.clear();
+
+ int len = iterator.key(keyBuffer);
+ if (len > keyCapacity) {
+ keyCapacity = len;
+ PlatformDependent.freeDirectBuffer(keyBuffer);
+ // Reserve more space for key
+ keyBuffer = ByteBuffer.allocateDirect(keyCapacity);
+ continue;
+ }
+ len = iterator.value(valueBuffer);
+ assert len == Long.BYTES;
+
+ // skip table-prefix, table-id, record-prefix
+ keyBuffer.position(1 + 2 + 1);
+ short groupLen = keyBuffer.getShort();
+ byte[] groupBytes = new byte[groupLen];
+ keyBuffer.get(groupBytes);
+ byte ctrl = keyBuffer.get();
+ assert ctrl == AbstractRocksDBStorage.CTRL_1;
+
+ short topicLen = keyBuffer.getShort();
+ byte[] topicBytes = new byte[topicLen];
+ keyBuffer.get(topicBytes);
+ String topic = new String(topicBytes, StandardCharsets.UTF_8);
+ ctrl = keyBuffer.get();
+ assert ctrl == AbstractRocksDBStorage.CTRL_1;
+
+ int queueId = keyBuffer.getInt();
+
+ long offset = valueBuffer.getLong();
+
+ if (!MixAll.isLmq(topic)) {
+ String group = new String(groupBytes,
StandardCharsets.UTF_8);
+ onConsumerOffsetRecordLoad(topic, group, queueId, offset);
+ }
+ iterator.next();
+ }
+ PlatformDependent.freeDirectBuffer(keyBuffer);
+ PlatformDependent.freeDirectBuffer(valueBuffer);
+ } finally {
+ beginKeyBuf.release();
+ endKeyBuf.release();
+ }
+ return true;
+ }
+
+ private void onConsumerOffsetRecordLoad(String topic, String group, int
queueId, long offset) {
+ if (MixAll.isLmq(topic)) {
+ return;
+ }
+ String key = topic + TOPIC_GROUP_SEPARATOR + group;
+ if (!offsetTable.containsKey(key)) {
+ ConcurrentMap<Integer, Long> map = new ConcurrentHashMap<>();
+ offsetTable.putIfAbsent(key, map);
+ }
+ offsetTable.get(key).put(queueId, offset);
+ }
+
+ @Override
+ public long queryOffset(String group, String topic, int queueId) {
+ if (!MixAll.isLmq(topic)) {
+ return super.queryOffset(group, topic, queueId);
+ }
+
+ ByteBuf keyBuf = keyOfConsumerOffset(group, topic, queueId);
+ try {
+ byte[] slice = configStorage.get(keyBuf.nioBuffer());
+ if (null == slice) {
+ return -1;
+ }
+ assert slice.length == Long.BYTES;
+ return ByteBuffer.wrap(slice).getLong();
+ } catch (RocksDBException e) {
+ throw new RuntimeException(e);
+ } finally {
+ keyBuf.release();
+ }
+ }
+
+ @Override
+ public void commitPullOffset(String clientHost, String group, String
topic, int queueId, long offset) {
+ if (!MixAll.isLmq(topic)) {
+ super.commitPullOffset(clientHost, group, topic, queueId, offset);
+ }
+
+ ByteBuf keyBuf = keyOfPullOffset(group, topic, queueId);
+ ByteBuf valueBuf = AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(8);
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ writeBatch.put(keyBuf.nioBuffer(), valueBuf.nioBuffer());
+ long stateMachineVersion = brokerController.getMessageStore() !=
null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
+ ConfigHelper.stampDataVersion(writeBatch, dataVersion,
stateMachineVersion);
+ } catch (RocksDBException e) {
+ LOG.error("Failed to commit pull offset. group={}, topic={},
queueId={}, offset={}",
+ group, topic, queueId, offset);
+ } finally {
+ keyBuf.release();
+ valueBuf.release();
+ }
+ }
+
+ @Override
+ public long queryPullOffset(String group, String topic, int queueId) {
+ if (!MixAll.isLmq(topic)) {
+ return super.queryPullOffset(group, topic, queueId);
+ }
+
+ ByteBuf keyBuf = keyOfPullOffset(group, topic, queueId);
+ try {
+ byte[] valueBytes = configStorage.get(keyBuf.nioBuffer());
+ if (null == valueBytes) {
+ return -1;
+ }
+ return ByteBuffer.wrap(valueBytes).getLong();
+ } catch (RocksDBException e) {
+ LOG.error("Failed to queryPullOffset. group={}, topic={},
queueId={}", group, topic, queueId);
+ } finally {
+ keyBuf.release();
+ }
+ return -1;
+ }
+}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/RecordPrefix.java
similarity index 59%
copy from
broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java
copy to
broker/src/main/java/org/apache/rocketmq/broker/config/v2/RecordPrefix.java
index 7a90fd62fb..750d454d4e 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/RecordPrefix.java
@@ -14,21 +14,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.broker.offset;
+package org.apache.rocketmq.broker.config.v2;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+public enum RecordPrefix {
+ UNSPECIFIED((byte)0),
+ DATA_VERSION((byte)1),
+ DATA((byte)2);
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+ private final byte value;
-public class RocksDBOffsetSerializeWrapper extends RemotingSerializable {
- private ConcurrentMap<Integer, Long> offsetTable = new
ConcurrentHashMap(16);
-
- public ConcurrentMap<Integer, Long> getOffsetTable() {
- return offsetTable;
+ RecordPrefix(byte value) {
+ this.value = value;
}
- public void setOffsetTable(ConcurrentMap<Integer, Long> offsetTable) {
- this.offsetTable = offsetTable;
+ public byte getValue() {
+ return value;
}
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/SerializationType.java
similarity index 57%
copy from
broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java
copy to
broker/src/main/java/org/apache/rocketmq/broker/config/v2/SerializationType.java
index 7a90fd62fb..2ee157fdc8 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/SerializationType.java
@@ -14,21 +14,33 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.broker.offset;
+package org.apache.rocketmq.broker.config.v2;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+public enum SerializationType {
+ UNSPECIFIED((byte) 0),
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+ JSON((byte) 1),
-public class RocksDBOffsetSerializeWrapper extends RemotingSerializable {
- private ConcurrentMap<Integer, Long> offsetTable = new
ConcurrentHashMap(16);
+ PROTOBUF((byte) 2),
- public ConcurrentMap<Integer, Long> getOffsetTable() {
- return offsetTable;
+ FLAT_BUFFERS((byte) 3);
+
+ private final byte value;
+
+ SerializationType(byte value) {
+ this.value = value;
+ }
+
+ public byte getValue() {
+ return value;
}
- public void setOffsetTable(ConcurrentMap<Integer, Long> offsetTable) {
- this.offsetTable = offsetTable;
+ public static SerializationType valueOf(byte value) {
+ for (SerializationType type : SerializationType.values()) {
+ if (type.getValue() == value) {
+ return type;
+ }
+ }
+ return SerializationType.UNSPECIFIED;
}
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2.java
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2.java
new file mode 100644
index 0000000000..8da6f9d2bc
--- /dev/null
+++
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.config.v2;
+
+import com.alibaba.fastjson2.JSON;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.nio.charset.StandardCharsets;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.config.AbstractRocksDBStorage;
+import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+
+public class SubscriptionGroupManagerV2 extends SubscriptionGroupManager {
+
+ private final ConfigStorage configStorage;
+
+ public SubscriptionGroupManagerV2(BrokerController brokerController,
ConfigStorage configStorage) {
+ super(brokerController);
+ this.configStorage = configStorage;
+ }
+
+ @Override
+ public boolean load() {
+ return loadDataVersion() && loadSubscriptions();
+ }
+
+ public boolean loadDataVersion() {
+ try {
+ ConfigHelper.loadDataVersion(configStorage,
TableId.SUBSCRIPTION_GROUP)
+ .ifPresent(buf -> {
+ ConfigHelper.onDataVersionLoad(buf, dataVersion);
+ });
+ } catch (RocksDBException e) {
+ log.error("loadDataVersion error", e);
+ return false;
+ }
+ return true;
+ }
+
+ private boolean loadSubscriptions() {
+ int keyLen = 1 /* table prefix */ + 2 /* table-id */ + 1 /*
record-type-prefix */;
+ ByteBuf beginKey =
AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen);
+ beginKey.writeByte(TablePrefix.TABLE.getValue());
+ beginKey.writeShort(TableId.SUBSCRIPTION_GROUP.getValue());
+ beginKey.writeByte(RecordPrefix.DATA.getValue());
+
+ ByteBuf endKey =
AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen);
+ endKey.writeByte(TablePrefix.TABLE.getValue());
+ endKey.writeShort(TableId.SUBSCRIPTION_GROUP.getValue());
+ endKey.writeByte(RecordPrefix.DATA.getValue() + 1);
+
+ try (RocksIterator iterator =
configStorage.iterate(beginKey.nioBuffer(), endKey.nioBuffer())) {
+ while (iterator.isValid()) {
+ SubscriptionGroupConfig subscriptionGroupConfig =
parseSubscription(iterator.key(), iterator.value());
+ if (null != subscriptionGroupConfig) {
+
super.updateSubscriptionGroupConfigWithoutPersist(subscriptionGroupConfig);
+ }
+ }
+ } finally {
+ beginKey.release();
+ endKey.release();
+ }
+ return true;
+ }
+
+ private SubscriptionGroupConfig parseSubscription(byte[] key, byte[]
value) {
+ ByteBuf keyBuf = Unpooled.wrappedBuffer(key);
+ ByteBuf valueBuf = Unpooled.wrappedBuffer(value);
+ try {
+ // Skip table-prefix, table-id, record-type-prefix
+ keyBuf.readerIndex(4);
+ short groupNameLen = keyBuf.readShort();
+ assert groupNameLen == keyBuf.readableBytes();
+ CharSequence groupName = keyBuf.readCharSequence(groupNameLen,
StandardCharsets.UTF_8);
+ assert null != groupName;
+ byte serializationType = valueBuf.readByte();
+ if (SerializationType.JSON ==
SerializationType.valueOf(serializationType)) {
+ CharSequence json =
valueBuf.readCharSequence(valueBuf.readableBytes(), StandardCharsets.UTF_8);
+ SubscriptionGroupConfig subscriptionGroupConfig =
JSON.parseObject(json.toString(), SubscriptionGroupConfig.class);
+ assert subscriptionGroupConfig != null;
+ assert
groupName.equals(subscriptionGroupConfig.getGroupName());
+ return subscriptionGroupConfig;
+ }
+ } finally {
+ keyBuf.release();
+ valueBuf.release();
+ }
+ return null;
+ }
+
+ @Override
+ public synchronized void persist() {
+ try {
+ configStorage.flushWAL();
+ } catch (RocksDBException e) {
+ log.error("Failed to flush RocksDB WAL", e);
+ }
+ }
+
+ @Override
+ public SubscriptionGroupConfig findSubscriptionGroupConfig(final String
group) {
+ if (MixAll.isLmq(group)) {
+ SubscriptionGroupConfig subscriptionGroupConfig = new
SubscriptionGroupConfig();
+ subscriptionGroupConfig.setGroupName(group);
+ return subscriptionGroupConfig;
+ }
+ return super.findSubscriptionGroupConfig(group);
+ }
+
+ @Override
+ public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig
config) {
+ if (config == null || MixAll.isLmq(config.getGroupName())) {
+ return;
+ }
+ ByteBuf keyBuf = ConfigHelper.keyBufOf(TableId.SUBSCRIPTION_GROUP,
config.getGroupName());
+ ByteBuf valueBuf = ConfigHelper.valueBufOf(config,
SerializationType.JSON);
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ writeBatch.put(keyBuf.nioBuffer(), valueBuf.nioBuffer());
+ long stateMachineVersion = brokerController.getMessageStore() !=
null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
+ ConfigHelper.stampDataVersion(writeBatch, dataVersion,
stateMachineVersion);
+ configStorage.write(writeBatch);
+ } catch (RocksDBException e) {
+ log.error("update subscription group config error", e);
+ } finally {
+ keyBuf.release();
+ valueBuf.release();
+ }
+ super.updateSubscriptionGroupConfigWithoutPersist(config);
+ }
+
+ @Override
+ public boolean containsSubscriptionGroup(String group) {
+ if (MixAll.isLmq(group)) {
+ return true;
+ } else {
+ return super.containsSubscriptionGroup(group);
+ }
+ }
+
+ @Override
+ protected SubscriptionGroupConfig removeSubscriptionGroupConfig(String
groupName) {
+ ByteBuf keyBuf = ConfigHelper.keyBufOf(TableId.SUBSCRIPTION_GROUP,
groupName);
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ writeBatch.delete(ConfigHelper.readBytes(keyBuf));
+ long stateMachineVersion =
brokerController.getMessageStore().getStateMachineVersion();
+ ConfigHelper.stampDataVersion(writeBatch, dataVersion,
stateMachineVersion);
+ } catch (RocksDBException e) {
+ log.error("Failed to remove subscription group config by
group-name={}", groupName, e);
+ }
+ return super.removeSubscriptionGroupConfig(groupName);
+ }
+}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/TableId.java
similarity index 59%
copy from
broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java
copy to broker/src/main/java/org/apache/rocketmq/broker/config/v2/TableId.java
index 7a90fd62fb..7a61899371 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/TableId.java
@@ -14,21 +14,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.broker.offset;
+package org.apache.rocketmq.broker.config.v2;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+/**
+ * See <a
href="https://book.tidb.io/session1/chapter3/tidb-kv-to-relation.html">Table,
Key Value Mapping</a>
+ */
+public enum TableId {
+ UNSPECIFIED((short) 0),
+ CONSUMER_OFFSET((short) 1),
+ PULL_OFFSET((short) 2),
+ TOPIC((short) 3),
+ SUBSCRIPTION_GROUP((short) 4);
-public class RocksDBOffsetSerializeWrapper extends RemotingSerializable {
- private ConcurrentMap<Integer, Long> offsetTable = new
ConcurrentHashMap(16);
+ private final short value;
- public ConcurrentMap<Integer, Long> getOffsetTable() {
- return offsetTable;
+ TableId(short value) {
+ this.value = value;
}
- public void setOffsetTable(ConcurrentMap<Integer, Long> offsetTable) {
- this.offsetTable = offsetTable;
+ public short getValue() {
+ return value;
}
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/TablePrefix.java
similarity index 59%
copy from
broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java
copy to
broker/src/main/java/org/apache/rocketmq/broker/config/v2/TablePrefix.java
index 7a90fd62fb..d16c14d275 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/TablePrefix.java
@@ -14,21 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.broker.offset;
+package org.apache.rocketmq.broker.config.v2;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+public enum TablePrefix {
+ UNSPECIFIED((byte) 0),
+ TABLE((byte) 1);
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+ private final byte value;
-public class RocksDBOffsetSerializeWrapper extends RemotingSerializable {
- private ConcurrentMap<Integer, Long> offsetTable = new
ConcurrentHashMap(16);
-
- public ConcurrentMap<Integer, Long> getOffsetTable() {
- return offsetTable;
+ TablePrefix(byte value) {
+ this.value = value;
}
- public void setOffsetTable(ConcurrentMap<Integer, Long> offsetTable) {
- this.offsetTable = offsetTable;
+ public byte getValue() {
+ return value;
}
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2.java
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2.java
new file mode 100644
index 0000000000..b1a3d2d85c
--- /dev/null
+++
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.config.v2;
+
+import com.alibaba.fastjson2.JSON;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.nio.charset.StandardCharsets;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.topic.TopicConfigManager;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.config.AbstractRocksDBStorage;
+import org.apache.rocketmq.common.constant.PermName;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+
+/**
+ * Key layout: [table-prefix, 1 byte][table-id, 2 bytes][record-type-prefix, 1
byte][topic-len, 2 bytes][topic-bytes]
+ * Value layout: [serialization-type, 1 byte][topic-config-bytes]
+ */
+public class TopicConfigManagerV2 extends TopicConfigManager {
+ private final ConfigStorage configStorage;
+
+ public TopicConfigManagerV2(BrokerController brokerController,
ConfigStorage configStorage) {
+ super(brokerController);
+ this.configStorage = configStorage;
+ }
+
+ @Override
+ public boolean load() {
+ return loadDataVersion() && loadTopicConfig();
+ }
+
+ public boolean loadDataVersion() {
+ try {
+ ConfigHelper.loadDataVersion(configStorage, TableId.TOPIC)
+ .ifPresent(buf -> ConfigHelper.onDataVersionLoad(buf,
dataVersion));
+ } catch (RocksDBException e) {
+ log.error("Failed to load data version of topic", e);
+ return false;
+ }
+ return true;
+ }
+
+ private boolean loadTopicConfig() {
+ int keyLen = 1 /* table-prefix */ + 2 /* table-id */ + 1 /*
record-type-prefix */;
+ ByteBuf beginKey =
AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen);
+ beginKey.writeByte(TablePrefix.TABLE.getValue());
+ beginKey.writeShort(TableId.TOPIC.getValue());
+ beginKey.writeByte(RecordPrefix.DATA.getValue());
+
+ ByteBuf endKey =
AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen);
+ endKey.writeByte(TablePrefix.TABLE.getValue());
+ endKey.writeShort(TableId.TOPIC.getValue());
+ endKey.writeByte(RecordPrefix.DATA.getValue() + 1);
+
+ try (RocksIterator iterator =
configStorage.iterate(beginKey.nioBuffer(), endKey.nioBuffer())) {
+ while (iterator.isValid()) {
+ byte[] key = iterator.key();
+ byte[] value = iterator.value();
+ TopicConfig topicConfig = parseTopicConfig(key, value);
+ if (null != topicConfig) {
+ super.updateSingleTopicConfigWithoutPersist(topicConfig);
+ }
+ iterator.next();
+ }
+ } finally {
+ beginKey.release();
+ endKey.release();
+ }
+ return true;
+ }
+
+ /**
+ * Key layout: [table-prefix, 1 byte][table-id, 2
bytes][record-type-prefix, 1 byte][topic-len, 2 bytes][topic-bytes]
+ * Value layout: [serialization-type, 1 byte][topic-config-bytes]
+ *
+ * @param key Topic config key representation in RocksDB
+ * @param value Topic config value representation in RocksDB
+ * @return decoded topic config
+ */
+ private TopicConfig parseTopicConfig(byte[] key, byte[] value) {
+ ByteBuf keyBuf = Unpooled.wrappedBuffer(key);
+ ByteBuf valueBuf = Unpooled.wrappedBuffer(value);
+ try {
+ // Skip table-prefix, table-id, record-type-prefix
+ keyBuf.readerIndex(4);
+ short topicLen = keyBuf.readShort();
+ assert topicLen == keyBuf.readableBytes();
+ CharSequence topic = keyBuf.readCharSequence(topicLen,
StandardCharsets.UTF_8);
+ assert null != topic;
+
+ byte serializationType = valueBuf.readByte();
+ if (SerializationType.JSON ==
SerializationType.valueOf(serializationType)) {
+ CharSequence json =
valueBuf.readCharSequence(valueBuf.readableBytes(), StandardCharsets.UTF_8);
+ TopicConfig topicConfig = JSON.parseObject(json.toString(),
TopicConfig.class);
+ assert topicConfig != null;
+ assert topic.equals(topicConfig.getTopicName());
+ return topicConfig;
+ }
+ } finally {
+ keyBuf.release();
+ valueBuf.release();
+ }
+
+ return null;
+ }
+
+ @Override
+ public synchronized void persist() {
+ try {
+ configStorage.flushWAL();
+ } catch (RocksDBException e) {
+ log.error("Failed to flush WAL", e);
+ }
+ }
+
+ @Override
+ public TopicConfig selectTopicConfig(final String topic) {
+ if (MixAll.isLmq(topic)) {
+ return simpleLmqTopicConfig(topic);
+ }
+ return super.selectTopicConfig(topic);
+ }
+
+ @Override
+ public void updateTopicConfig(final TopicConfig topicConfig) {
+ if (topicConfig == null || MixAll.isLmq(topicConfig.getTopicName())) {
+ return;
+ }
+ super.updateSingleTopicConfigWithoutPersist(topicConfig);
+
+ ByteBuf keyBuf = ConfigHelper.keyBufOf(TableId.TOPIC,
topicConfig.getTopicName());
+ ByteBuf valueBuf = ConfigHelper.valueBufOf(topicConfig,
SerializationType.JSON);
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ writeBatch.put(keyBuf.nioBuffer(), valueBuf.nioBuffer());
+ long stateMachineVersion = brokerController.getMessageStore() !=
null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
+ ConfigHelper.stampDataVersion(writeBatch, dataVersion,
stateMachineVersion);
+ configStorage.write(writeBatch);
+ } catch (RocksDBException e) {
+ log.error("Failed to update topic config", e);
+ } finally {
+ keyBuf.release();
+ valueBuf.release();
+ }
+ }
+
+ @Override
+ protected TopicConfig removeTopicConfig(String topicName) {
+ ByteBuf keyBuf = ConfigHelper.keyBufOf(TableId.TOPIC, topicName);
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ writeBatch.delete(keyBuf.nioBuffer());
+ long stateMachineVersion = brokerController.getMessageStore() !=
null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
+ ConfigHelper.stampDataVersion(writeBatch, dataVersion,
stateMachineVersion);
+ configStorage.write(writeBatch);
+ } catch (RocksDBException e) {
+ log.error("Failed to delete topic config by topicName={}",
topicName, e);
+ } finally {
+ keyBuf.release();
+ }
+ return super.removeTopicConfig(topicName);
+ }
+
+ @Override
+ public boolean containsTopic(String topic) {
+ if (MixAll.isLmq(topic)) {
+ return true;
+ }
+ return super.containsTopic(topic);
+ }
+
+ private TopicConfig simpleLmqTopicConfig(String topic) {
+ return new TopicConfig(topic, 1, 1, PermName.PERM_READ |
PermName.PERM_WRITE);
+ }
+}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/package-info.java
similarity index 58%
copy from
broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java
copy to
broker/src/main/java/org/apache/rocketmq/broker/config/v2/package-info.java
index 7a90fd62fb..1ea216193c 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/package-info.java
@@ -14,21 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.broker.offset;
+package org.apache.rocketmq.broker.config.v2;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
-
-public class RocksDBOffsetSerializeWrapper extends RemotingSerializable {
- private ConcurrentMap<Integer, Long> offsetTable = new
ConcurrentHashMap(16);
-
- public ConcurrentMap<Integer, Long> getOffsetTable() {
- return offsetTable;
- }
-
- public void setOffsetTable(ConcurrentMap<Integer, Long> offsetTable) {
- this.offsetTable = offsetTable;
- }
-}
+/*
+ * <strong>Endian</strong>: we use network byte order for all integrals, aka,
always big endian.
+ *
+ * Unlike v1 config managers, implementations in this package prioritize data
integrity and reliability.
+ * As a result,RocksDB write-ahead-log is always on and changes are
immediately flushed. Another significant
+ * difference is that heap-based cache is removed because it is not necessary
and duplicated to RocksDB
+ * MemTable/BlockCache.
+ */
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
index 403324137c..ea46f1d8a1 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
@@ -43,7 +43,7 @@ public class ConsumerOffsetManager extends ConfigManager {
protected static final Logger LOG =
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
public static final String TOPIC_GROUP_SEPARATOR = "@";
- private DataVersion dataVersion = new DataVersion();
+ protected DataVersion dataVersion = new DataVersion();
protected ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer,
Long>> offsetTable =
new ConcurrentHashMap<>(512);
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
index e6855ef9a2..f62a3e4a09 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
@@ -49,7 +49,7 @@ public class SubscriptionGroupManager extends ConfigManager {
private ConcurrentMap<String, ConcurrentMap<String, Integer>>
forbiddenTable =
new ConcurrentHashMap<>(4);
- private final DataVersion dataVersion = new DataVersion();
+ protected final DataVersion dataVersion = new DataVersion();
protected transient BrokerController brokerController;
public SubscriptionGroupManager() {
@@ -143,7 +143,7 @@ public class SubscriptionGroupManager extends ConfigManager
{
this.persist();
}
- private void
updateSubscriptionGroupConfigWithoutPersist(SubscriptionGroupConfig config) {
+ protected void
updateSubscriptionGroupConfigWithoutPersist(SubscriptionGroupConfig config) {
Map<String, String> newAttributes = request(config);
Map<String, String> currentAttributes = current(config.getGroupName());
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index 25d3218f2a..4530c10002 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -66,7 +66,7 @@ public class TopicConfigManager extends ConfigManager {
private transient final Lock topicConfigTableLock = new ReentrantLock();
protected ConcurrentMap<String, TopicConfig> topicConfigTable = new
ConcurrentHashMap<>(1024);
- private DataVersion dataVersion = new DataVersion();
+ protected DataVersion dataVersion = new DataVersion();
protected transient BrokerController brokerController;
public TopicConfigManager() {
@@ -497,7 +497,7 @@ public class TopicConfigManager extends ConfigManager {
}
}
- private void updateSingleTopicConfigWithoutPersist(final TopicConfig
topicConfig) {
+ protected void updateSingleTopicConfigWithoutPersist(final TopicConfig
topicConfig) {
checkNotNull(topicConfig, "topicConfig shouldn't be null");
Map<String, String> newAttributes = request(topicConfig);
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManagerTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManagerTest.java
index 58b690c9a3..5a7a2c38ac 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManagerTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManagerTest.java
@@ -21,6 +21,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.config.v1.RocksDBConsumerOffsetManager;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.After;
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManagerTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManagerTest.java
index ea6528546d..1b9916d6ac 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManagerTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManagerTest.java
@@ -18,6 +18,7 @@
package org.apache.rocketmq.broker.offset;
import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.config.v1.RocksDBLmqConsumerOffsetManager;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.store.config.MessageStoreConfig;
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapperTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapperTest.java
index dde0401e8a..c01e63f31f 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapperTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapperTest.java
@@ -21,6 +21,7 @@ package org.apache.rocketmq.broker.offset;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import org.apache.rocketmq.broker.config.v1.RocksDBOffsetSerializeWrapper;
import org.junit.Before;
import org.junit.Test;
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java
index b4800aec24..64c505eb77 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.collections.MapUtils;
import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.config.v1.RocksDBConsumerOffsetManager;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.Pair;
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
index 04324043fb..d87f513355 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
@@ -36,8 +36,8 @@ import org.apache.rocketmq.broker.client.ConsumerManager;
import org.apache.rocketmq.broker.client.net.Broker2Client;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.broker.schedule.ScheduleMessageService;
-import org.apache.rocketmq.broker.subscription.RocksDBSubscriptionGroupManager;
-import org.apache.rocketmq.broker.topic.RocksDBTopicConfigManager;
+import org.apache.rocketmq.broker.config.v1.RocksDBSubscriptionGroupManager;
+import org.apache.rocketmq.broker.config.v1.RocksDBTopicConfigManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.BrokerConfig;
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java
index 205e642843..26017af8a6 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java
@@ -18,6 +18,7 @@
package org.apache.rocketmq.broker.subscription;
import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.config.v1.RocksDBSubscriptionGroupManager;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.remoting.protocol.DataVersion;
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java
index b0e0d05736..080e1dd5a3 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java
@@ -24,6 +24,7 @@ import java.util.Optional;
import java.util.UUID;
import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.config.v1.RocksDBTopicConfigManager;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicAttributes;
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java
index 2a72709098..fb345548e4 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java
@@ -18,6 +18,7 @@
package org.apache.rocketmq.broker.topic;
import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.config.v1.RocksDBTopicConfigManager;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 2acfdd69a5..c651047661 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.common;
import org.apache.rocketmq.common.annotation.ImportantField;
+import org.apache.rocketmq.common.config.ConfigManagerVersion;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.common.metrics.MetricsExporterType;
@@ -431,6 +432,11 @@ public class BrokerConfig extends BrokerIdentity {
private boolean appendCkAsync = false;
+ /**
+ * V2 is recommended in cases where LMQ feature is extensively used.
+ */
+ private String configManagerVersion = ConfigManagerVersion.V1.getVersion();
+
public String getConfigBlackList() {
return configBlackList;
}
@@ -1879,4 +1885,12 @@ public class BrokerConfig extends BrokerIdentity {
public void setAppendCkAsync(boolean appendCkAsync) {
this.appendCkAsync = appendCkAsync;
}
+
+ public String getConfigManagerVersion() {
+ return configManagerVersion;
+ }
+
+ public void setConfigManagerVersion(String configManagerVersion) {
+ this.configManagerVersion = configManagerVersion;
+ }
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java
b/common/src/main/java/org/apache/rocketmq/common/config/ConfigManagerVersion.java
similarity index 59%
rename from
broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java
rename to
common/src/main/java/org/apache/rocketmq/common/config/ConfigManagerVersion.java
index 7a90fd62fb..0d5dd6940a 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java
+++
b/common/src/main/java/org/apache/rocketmq/common/config/ConfigManagerVersion.java
@@ -14,21 +14,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.broker.offset;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+package org.apache.rocketmq.common.config;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+public enum ConfigManagerVersion {
+ V1("v1"),
+ V2("v2"),
+ ;
+ private final String version;
-public class RocksDBOffsetSerializeWrapper extends RemotingSerializable {
- private ConcurrentMap<Integer, Long> offsetTable = new
ConcurrentHashMap(16);
-
- public ConcurrentMap<Integer, Long> getOffsetTable() {
- return offsetTable;
+ ConfigManagerVersion(String version) {
+ this.version = version;
}
- public void setOffsetTable(ConcurrentMap<Integer, Long> offsetTable) {
- this.offsetTable = offsetTable;
+ public String getVersion() {
+ return version;
}
}