This is an automated email from the ASF dual-hosted git repository.
RongtongJin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new bfbbb2a955 [#10421] Fix Timer message rocksdb use wrong cache key.
(#10422)
bfbbb2a955 is described below
commit bfbbb2a95535374d285e97bf2d159e00b6e26dc8
Author: echooymxq <[email protected]>
AuthorDate: Sat Jun 20 10:44:49 2026 +0800
[#10421] Fix Timer message rocksdb use wrong cache key. (#10422)
---
.../store/rocksdb/MessageRocksDBStorage.java | 10 +-
.../store/rocksdb/MessageRocksDBStorageTest.java | 143 +++++++++++++++++++++
2 files changed, 150 insertions(+), 3 deletions(-)
diff --git
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/MessageRocksDBStorage.java
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/MessageRocksDBStorage.java
index 8d32998bdc..d55596a293 100644
---
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/MessageRocksDBStorage.java
+++
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/MessageRocksDBStorage.java
@@ -84,7 +84,7 @@ public class MessageRocksDBStorage extends
AbstractRocksDBStorage {
private volatile ColumnFamilyHandle transCFHandle;
private final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(1);
- private static final Cache<byte[], byte[]> DELETE_KEY_CACHE_FOR_TIMER =
CacheBuilder.newBuilder()
+ private static final Cache<String, byte[]> DELETE_KEY_CACHE_FOR_TIMER =
CacheBuilder.newBuilder()
.maximumSize(10000)
.expireAfterWrite(60, TimeUnit.MINUTES)
.build();
@@ -354,9 +354,9 @@ public class MessageRocksDBStorage extends
AbstractRocksDBStorage {
writeBatch.put(cfHandle, keyBytes, valueBytes);
} else if (record.getActionFlag() == TIMER_ROCKSDB_DELETE)
{
writeBatch.delete(cfHandle, keyBytes);
- DELETE_KEY_CACHE_FOR_TIMER.put(keyBytes,
DELETE_VAL_FLAG);
+
DELETE_KEY_CACHE_FOR_TIMER.put(getTimerCacheKey(record.getDelayTime(),
record.getUniqKey()), DELETE_VAL_FLAG);
} else if (record.getActionFlag() == TIMER_ROCKSDB_UPDATE)
{
- byte[] deleteByte =
DELETE_KEY_CACHE_FOR_TIMER.getIfPresent(keyBytes);
+ byte[] deleteByte =
DELETE_KEY_CACHE_FOR_TIMER.getIfPresent(getTimerCacheKey(record.getDelayTime(),
record.getUniqKey()));
if (null == deleteByte) {
writeBatch.put(cfHandle, keyBytes, valueBytes);
}
@@ -373,6 +373,10 @@ public class MessageRocksDBStorage extends
AbstractRocksDBStorage {
}
}
+ private static String getTimerCacheKey(long delayTime, String uniqKey) {
+ return delayTime + ":" + uniqKey;
+ }
+
public List<TimerRocksDBRecord> scanRecordsForTimer(byte[] columnFamily,
long lowerTime, long upperTime, int size, byte[] startKey) {
ColumnFamilyHandle cfHandle = getColumnFamily(columnFamily);
if (null == cfHandle || lowerTime <= 0L || upperTime <= 0L ||
lowerTime > upperTime || size <= 0) {
diff --git
a/store/src/test/java/org/apache/rocketmq/store/rocksdb/MessageRocksDBStorageTest.java
b/store/src/test/java/org/apache/rocketmq/store/rocksdb/MessageRocksDBStorageTest.java
new file mode 100644
index 0000000000..d28ef19f54
--- /dev/null
+++
b/store/src/test/java/org/apache/rocketmq/store/rocksdb/MessageRocksDBStorageTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.rocksdb;
+
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.timer.rocksdb.TimerRocksDBRecord;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import static
org.apache.rocketmq.store.rocksdb.MessageRocksDBStorage.TIMER_COLUMN_FAMILY;
+
+public class MessageRocksDBStorageTest {
+
+ private MessageRocksDBStorage storage;
+ private String storePath;
+
+ @Before
+ public void setUp() throws Exception {
+ storePath = System.getProperty("java.io.tmpdir") + File.separator +
"message_rocksdb_test_" + System.currentTimeMillis();
+ MessageStoreConfig config = new MessageStoreConfig();
+ config.setStorePathRootDir(storePath);
+ storage = new MessageRocksDBStorage(config);
+ }
+
+ @After
+ public void tearDown() {
+ if (null != storage) {
+ storage.shutdown();
+ }
+ UtilAll.deleteFile(new File(storePath));
+ }
+
+ @Test
+ public void testPutThenDelete() {
+ long delayTime = System.currentTimeMillis() + 3600000L;
+ String uniqKey = "0A0A0A0A00002A9F0000000000000003";
+
+ TimerRocksDBRecord putRecord = new TimerRocksDBRecord(delayTime,
uniqKey, 100L, 200, 0L, null);
+ putRecord.setActionFlag(TimerRocksDBRecord.TIMER_ROCKSDB_PUT);
+
+ List<TimerRocksDBRecord> putList = new ArrayList<>();
+ putList.add(putRecord);
+ storage.writeRecordsForTimer(TIMER_COLUMN_FAMILY, putList);
+
+ TimerRocksDBRecord deleteRecord = new TimerRocksDBRecord(delayTime,
uniqKey, 100L, 200, 0L, null);
+ deleteRecord.setActionFlag(TimerRocksDBRecord.TIMER_ROCKSDB_DELETE);
+
+ List<TimerRocksDBRecord> deleteList = new ArrayList<>();
+ deleteList.add(deleteRecord);
+ storage.writeRecordsForTimer(TIMER_COLUMN_FAMILY, deleteList);
+
+ List<TimerRocksDBRecord> result = storage.scanRecordsForTimer(
+ TIMER_COLUMN_FAMILY, delayTime - 1, delayTime + 1, 10, null);
+
+ Assert.assertTrue(null == result || result.isEmpty());
+ }
+
+ @Test
+ public void testPutThenUpdate() {
+ long delayTime = System.currentTimeMillis() + 3600000L;
+ String uniqKey = "0A0A0A0A00002A9F0000000000000004";
+
+ TimerRocksDBRecord putRecord = new TimerRocksDBRecord(delayTime,
uniqKey, 100L, 200, 0L, null);
+ putRecord.setActionFlag(TimerRocksDBRecord.TIMER_ROCKSDB_PUT);
+
+ List<TimerRocksDBRecord> putList = new ArrayList<>();
+ putList.add(putRecord);
+ storage.writeRecordsForTimer(TIMER_COLUMN_FAMILY, putList);
+
+ TimerRocksDBRecord updateRecord = new TimerRocksDBRecord(delayTime,
uniqKey, 200L, 300, 1L, null);
+ updateRecord.setActionFlag(TimerRocksDBRecord.TIMER_ROCKSDB_UPDATE);
+
+ List<TimerRocksDBRecord> updateList = new ArrayList<>();
+ updateList.add(updateRecord);
+ storage.writeRecordsForTimer(TIMER_COLUMN_FAMILY, updateList);
+
+ List<TimerRocksDBRecord> result = storage.scanRecordsForTimer(
+ TIMER_COLUMN_FAMILY, delayTime - 1, delayTime + 1, 10, null);
+
+ Assert.assertNotNull("PUT then UPDATE should have 1 record", result);
+ Assert.assertEquals(1, result.size());
+ Assert.assertEquals(200L, result.get(0).getOffsetPy());
+ Assert.assertEquals(300, result.get(0).getSizePy());
+ }
+
+ @Test
+ public void testDeleteThenUpdate() {
+ long delayTime = System.currentTimeMillis() + 3600000L;
+ String uniqKey = "0A0A0A0A00002A9F0000000000000001";
+
+ TimerRocksDBRecord putRecord = new TimerRocksDBRecord(delayTime,
uniqKey, 100L, 200, 0L, null);
+ putRecord.setActionFlag(TimerRocksDBRecord.TIMER_ROCKSDB_PUT);
+
+ List<TimerRocksDBRecord> putList = new ArrayList<>();
+ putList.add(putRecord);
+ storage.writeRecordsForTimer(TIMER_COLUMN_FAMILY, putList);
+
+ List<TimerRocksDBRecord> scanAfterPut = storage.scanRecordsForTimer(
+ TIMER_COLUMN_FAMILY, delayTime - 1, delayTime + 1, 10, null);
+ Assert.assertNotNull("PUT should create a record in RocksDB",
scanAfterPut);
+ Assert.assertEquals(1, scanAfterPut.size());
+
+ TimerRocksDBRecord deleteRecord = new TimerRocksDBRecord(delayTime,
uniqKey, 100L, 200, 0L, null);
+ deleteRecord.setActionFlag(TimerRocksDBRecord.TIMER_ROCKSDB_DELETE);
+
+ TimerRocksDBRecord updateRecord = new TimerRocksDBRecord(delayTime,
uniqKey, 200L, 300, 1L, null);
+ updateRecord.setActionFlag(TimerRocksDBRecord.TIMER_ROCKSDB_UPDATE);
+
+ List<TimerRocksDBRecord> cudList = new ArrayList<>();
+ cudList.add(deleteRecord);
+ cudList.add(updateRecord);
+ storage.writeRecordsForTimer(TIMER_COLUMN_FAMILY, cudList);
+
+ List<TimerRocksDBRecord> resultAfterDeleteUpdate =
storage.scanRecordsForTimer(
+ TIMER_COLUMN_FAMILY, delayTime - 1, delayTime + 1, 10, null);
+
+ int recordCount = null == resultAfterDeleteUpdate ? 0 :
resultAfterDeleteUpdate.size();
+ Assert.assertEquals(0, recordCount);
+ }
+
+}