This is an automated email from the ASF dual-hosted git repository.

lizhimins pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 21562bba0e [ISSUE #10462] Fix concurrency bugs in tiered storage 
(#10471)
21562bba0e is described below

commit 21562bba0e9989dbc2a0052bb408267840f8624e
Author: lizhimins <[email protected]>
AuthorDate: Thu Jun 11 15:29:56 2026 +0800

    [ISSUE #10462] Fix concurrency bugs in tiered storage (#10471)
---
 .../tieredstore/file/FlatCommitLogFile.java        |  7 +++-
 .../rocketmq/tieredstore/file/FlatMessageFile.java |  4 +-
 .../rocketmq/tieredstore/index/IndexStoreFile.java | 21 ++++++----
 .../tieredstore/provider/PosixFileSegment.java     |  3 +-
 .../tieredstore/stream/CommitLogInputStream.java   |  2 +-
 .../tieredstore/index/IndexStoreFileTest.java      | 49 ++++++++++++++++++++++
 6 files changed, 72 insertions(+), 14 deletions(-)

diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFile.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFile.java
index bdf0bf375e..c316968739 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFile.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFile.java
@@ -54,8 +54,11 @@ public class FlatCommitLogFile extends FlatAppendFile {
     }
 
     public long getMinOffsetFromFile() {
-        return firstOffset.get() == GET_OFFSET_ERROR ?
-            this.getMinOffsetFromFileAsync().join() : firstOffset.get();
+        long cached = firstOffset.get();
+        if (cached != GET_OFFSET_ERROR) {
+            return cached;
+        }
+        return getMinOffsetFromFileAsync().join();
     }
 
     public CompletableFuture<Long> getMinOffsetFromFileAsync() {
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
index a7505b4bf4..89d6a00abd 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
@@ -48,8 +48,8 @@ public class FlatMessageFile implements FlatFileInterface {
     protected static final Logger log = 
LoggerFactory.getLogger(MessageStoreUtil.TIERED_STORE_LOGGER_NAME);
     protected volatile boolean closed = false;
 
-    protected TopicMetadata topicMetadata;
-    protected QueueMetadata queueMetadata;
+    protected volatile TopicMetadata topicMetadata;
+    protected volatile QueueMetadata queueMetadata;
 
     protected final String filePath;
     protected final ReentrantLock fileLock;
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
index e0a3c5cd0a..ed624ae620 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
@@ -161,9 +161,12 @@ public class IndexStoreFile implements IndexFile {
         return String.format("%s#%s", topic, key);
     }
 
+    /**
+     * Equivalent to {@code 
org.apache.rocketmq.store.index.IndexFile#indexKeyHashMethod}.
+     * Bitmask ensures non-negative result, including Integer.MIN_VALUE → 0.
+     */
     protected int hashCode(String keyStr) {
-        int keyHash = keyStr.hashCode();
-        return (keyHash < 0) ? -keyHash : keyHash;
+        return keyStr.hashCode() & 0x7FFFFFFF;
     }
 
     protected void flushNewMetadata(ByteBuffer byteBuffer, boolean end) throws 
IOException {
@@ -234,11 +237,13 @@ public class IndexStoreFile implements IndexFile {
                 return AppendResult.FILE_FULL;
             }
 
+            ByteBuffer slotBuffer = ByteBuffer.allocate(Integer.BYTES);
             for (String key : keySet) {
                 int hashCode = this.hashCode(this.buildKey(topic, key));
                 int slotPosition = this.getSlotPosition(hashCode % 
this.hashSlotMaxCount);
                 int slotOldValue = this.getSlotValue(slotPosition);
-                int timeDiff = (int) ((timestamp - this.beginTimestamp.get()) 
/ 1000L);
+                int timeDiff = (int) Math.max(0, Math.min(Integer.MAX_VALUE,
+                    (timestamp - this.beginTimestamp.get()) / 1000L));
 
                 IndexItem indexItem = new IndexItem(
                     topicId, queueId, offset, size, hashCode, timeDiff, 
slotOldValue);
@@ -251,10 +256,8 @@ public class IndexStoreFile implements IndexFile {
                     fileChannel.position(itemPosition);
                     fileChannel.write(itemBuffer);
 
-                    ByteBuffer slotBuffer = ByteBuffer.allocate(Integer.BYTES);
                     slotBuffer.putInt(0, itemIndex);
                     slotBuffer.position(0);
-                    slotBuffer.limit(Integer.BYTES);
                     fileChannel.position(slotPosition);
                     fileChannel.write(slotBuffer);
                 } else {
@@ -306,6 +309,7 @@ public class IndexStoreFile implements IndexFile {
         String key, int maxCount, long beginTime, long endTime) {
 
         List<IndexItem> result = new ArrayList<>();
+        boolean held = false;
         try {
             fileReadWriteLock.readLock().lock();
             if (!UNSEALED.equals(this.fileStatus.get()) && 
!SEALED.equals(this.fileStatus.get())) {
@@ -315,6 +319,7 @@ public class IndexStoreFile implements IndexFile {
             if (mappedFile == null || !mappedFile.hold()) {
                 return CompletableFuture.completedFuture(result);
             }
+            held = true;
 
             int hashCode = this.hashCode(key);
             int slotPosition = this.getSlotPosition(hashCode % 
this.hashSlotMaxCount);
@@ -334,7 +339,7 @@ public class IndexStoreFile implements IndexFile {
                 if (hashCode == indexItem.getHashCode() &&
                     beginTime <= storeTimestamp && storeTimestamp <= endTime) {
                     result.add(indexItem);
-                    if (result.size() > maxCount) {
+                    if (result.size() >= maxCount) {
                         break;
                     }
                 }
@@ -350,7 +355,9 @@ public class IndexStoreFile implements IndexFile {
                 "key: {}, maxCount: {}, timestamp={}-{}", getTimestamp(), key, 
maxCount, beginTime, endTime, e);
         } finally {
             fileReadWriteLock.readLock().unlock();
-            mappedFile.release();
+            if (held) {
+                mappedFile.release();
+            }
         }
 
         return CompletableFuture.completedFuture(result);
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java
index 3ab5914161..3302d0457d 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java
@@ -173,8 +173,7 @@ public class PosixFileSegment extends FileSegment {
         return CompletableFuture.supplyAsync((Supplier<ByteBuffer>) () -> {
             ByteBuffer byteBuffer = ByteBuffer.allocate(length);
             try {
-                readFileChannel.position(position);
-                readFileChannel.read(byteBuffer);
+                readFileChannel.read(byteBuffer, position);
                 byteBuffer.flip();
                 byteBuffer.limit(length);
 
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/stream/CommitLogInputStream.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/stream/CommitLogInputStream.java
index e2d7354755..5175e6b4da 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/stream/CommitLogInputStream.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/stream/CommitLogInputStream.java
@@ -147,9 +147,9 @@ public class CommitLogInputStream extends 
FileSegmentInputStream {
                 posInCurBuffer += readLen;
                 continue;
             }
+            curBuf = bufferList.get(bufIndex);
             remaining = curBuf.remaining() - posInCurBuffer;
             readLen = Math.min(remaining, needRead);
-            curBuf = bufferList.get(bufIndex);
             if (posInCurBuffer < MessageFormatUtil.PHYSICAL_OFFSET_POSITION) {
                 realReadLen = 
Math.min(MessageFormatUtil.PHYSICAL_OFFSET_POSITION - posInCurBuffer, readLen);
                 // read from commitLog buffer
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreFileTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreFileTest.java
index 10014ba76a..dbad2d1aba 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreFileTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreFileTest.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.file.Paths;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
@@ -293,4 +294,52 @@ public class IndexStoreFileTest {
             TOPIC_NAME + "1", KEY, 64, timestamp, 
System.currentTimeMillis()).get();
         Assert.assertEquals(3, itemList.size());
     }
+
+    @Test
+    public void hashCodeAndMultiKeyPutTest() throws Exception {
+        // hashCode must never return negative, including Integer.MIN_VALUE 
edge case.
+        // Old code (keyHash < 0 ? -keyHash : keyHash) overflows on MIN_VALUE.
+        for (int i = 0; i < 10000; i++) {
+            
Assert.assertTrue(indexStoreFile.hashCode(UUID.randomUUID().toString()) >= 0);
+        }
+        String minValKey = "polygenelubricants";
+        if (minValKey.hashCode() == Integer.MIN_VALUE) {
+            Assert.assertEquals(0, indexStoreFile.hashCode(minValKey));
+        }
+
+        // All keys in a multi-key set must be queryable after putKey.
+        // Catches slotBuffer reuse bug where 2nd+ keys had 0-byte slot writes.
+        long timestamp = indexStoreFile.getTimestamp();
+        Set<String> multiKeys = new HashSet<>(Arrays.asList("key1", "key2", 
"key3"));
+        Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey(
+            TOPIC_NAME, TOPIC_ID, QUEUE_ID, multiKeys, MESSAGE_OFFSET, 
MESSAGE_SIZE, timestamp));
+        for (String key : multiKeys) {
+            List<IndexItem> items = indexStoreFile.queryAsync(
+                TOPIC_NAME, key, 10, timestamp, timestamp + 1000).get();
+            Assert.assertEquals("Key should be queryable: " + key, 1, 
items.size());
+        }
+    }
+
+    @Test
+    public void queryMaxCountAndTimeDiffClampTest() throws Exception {
+        long timestamp = indexStoreFile.getTimestamp();
+
+        // queryAsync must return at most maxCount items (not maxCount + 1).
+        // Old code used result.size() > maxCount which allowed one extra.
+        for (int i = 0; i < 10; i++) {
+            Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey(
+                TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET + i, 
MESSAGE_SIZE, timestamp));
+        }
+        List<IndexItem> items = indexStoreFile.queryAsync(
+            TOPIC_NAME, KEY, 3, timestamp, timestamp + 1000).get();
+        Assert.assertEquals(3, items.size());
+
+        // timeDiff clamped to [0, Integer.MAX_VALUE] to prevent overflow.
+        long futureTimestamp = timestamp + 1000;
+        Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey(
+            TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, 
MESSAGE_SIZE, futureTimestamp));
+        items = indexStoreFile.queryAsync(
+            TOPIC_NAME, KEY, 10, timestamp, futureTimestamp + 1000).get();
+        Assert.assertTrue(items.size() >= 1);
+    }
 }
\ No newline at end of file

Reply via email to