This is an automated email from the ASF dual-hosted git repository.
lizhimins pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 21562bba0e [ISSUE #10462] Fix concurrency bugs in tiered storage
(#10471)
21562bba0e is described below
commit 21562bba0e9989dbc2a0052bb408267840f8624e
Author: lizhimins <[email protected]>
AuthorDate: Thu Jun 11 15:29:56 2026 +0800
[ISSUE #10462] Fix concurrency bugs in tiered storage (#10471)
---
.../tieredstore/file/FlatCommitLogFile.java | 7 +++-
.../rocketmq/tieredstore/file/FlatMessageFile.java | 4 +-
.../rocketmq/tieredstore/index/IndexStoreFile.java | 21 ++++++----
.../tieredstore/provider/PosixFileSegment.java | 3 +-
.../tieredstore/stream/CommitLogInputStream.java | 2 +-
.../tieredstore/index/IndexStoreFileTest.java | 49 ++++++++++++++++++++++
6 files changed, 72 insertions(+), 14 deletions(-)
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFile.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFile.java
index bdf0bf375e..c316968739 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFile.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFile.java
@@ -54,8 +54,11 @@ public class FlatCommitLogFile extends FlatAppendFile {
}
public long getMinOffsetFromFile() {
- return firstOffset.get() == GET_OFFSET_ERROR ?
- this.getMinOffsetFromFileAsync().join() : firstOffset.get();
+ long cached = firstOffset.get();
+ if (cached != GET_OFFSET_ERROR) {
+ return cached;
+ }
+ return getMinOffsetFromFileAsync().join();
}
public CompletableFuture<Long> getMinOffsetFromFileAsync() {
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
index a7505b4bf4..89d6a00abd 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
@@ -48,8 +48,8 @@ public class FlatMessageFile implements FlatFileInterface {
protected static final Logger log =
LoggerFactory.getLogger(MessageStoreUtil.TIERED_STORE_LOGGER_NAME);
protected volatile boolean closed = false;
- protected TopicMetadata topicMetadata;
- protected QueueMetadata queueMetadata;
+ protected volatile TopicMetadata topicMetadata;
+ protected volatile QueueMetadata queueMetadata;
protected final String filePath;
protected final ReentrantLock fileLock;
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
index e0a3c5cd0a..ed624ae620 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
@@ -161,9 +161,12 @@ public class IndexStoreFile implements IndexFile {
return String.format("%s#%s", topic, key);
}
+ /**
+ * Equivalent to {@code
org.apache.rocketmq.store.index.IndexFile#indexKeyHashMethod}.
+ * Bitmask ensures non-negative result, including Integer.MIN_VALUE → 0.
+ */
protected int hashCode(String keyStr) {
- int keyHash = keyStr.hashCode();
- return (keyHash < 0) ? -keyHash : keyHash;
+ return keyStr.hashCode() & 0x7FFFFFFF;
}
protected void flushNewMetadata(ByteBuffer byteBuffer, boolean end) throws
IOException {
@@ -234,11 +237,13 @@ public class IndexStoreFile implements IndexFile {
return AppendResult.FILE_FULL;
}
+ ByteBuffer slotBuffer = ByteBuffer.allocate(Integer.BYTES);
for (String key : keySet) {
int hashCode = this.hashCode(this.buildKey(topic, key));
int slotPosition = this.getSlotPosition(hashCode %
this.hashSlotMaxCount);
int slotOldValue = this.getSlotValue(slotPosition);
- int timeDiff = (int) ((timestamp - this.beginTimestamp.get())
/ 1000L);
+ int timeDiff = (int) Math.max(0, Math.min(Integer.MAX_VALUE,
+ (timestamp - this.beginTimestamp.get()) / 1000L));
IndexItem indexItem = new IndexItem(
topicId, queueId, offset, size, hashCode, timeDiff,
slotOldValue);
@@ -251,10 +256,8 @@ public class IndexStoreFile implements IndexFile {
fileChannel.position(itemPosition);
fileChannel.write(itemBuffer);
- ByteBuffer slotBuffer = ByteBuffer.allocate(Integer.BYTES);
slotBuffer.putInt(0, itemIndex);
slotBuffer.position(0);
- slotBuffer.limit(Integer.BYTES);
fileChannel.position(slotPosition);
fileChannel.write(slotBuffer);
} else {
@@ -306,6 +309,7 @@ public class IndexStoreFile implements IndexFile {
String key, int maxCount, long beginTime, long endTime) {
List<IndexItem> result = new ArrayList<>();
+ boolean held = false;
try {
fileReadWriteLock.readLock().lock();
if (!UNSEALED.equals(this.fileStatus.get()) &&
!SEALED.equals(this.fileStatus.get())) {
@@ -315,6 +319,7 @@ public class IndexStoreFile implements IndexFile {
if (mappedFile == null || !mappedFile.hold()) {
return CompletableFuture.completedFuture(result);
}
+ held = true;
int hashCode = this.hashCode(key);
int slotPosition = this.getSlotPosition(hashCode %
this.hashSlotMaxCount);
@@ -334,7 +339,7 @@ public class IndexStoreFile implements IndexFile {
if (hashCode == indexItem.getHashCode() &&
beginTime <= storeTimestamp && storeTimestamp <= endTime) {
result.add(indexItem);
- if (result.size() > maxCount) {
+ if (result.size() >= maxCount) {
break;
}
}
@@ -350,7 +355,9 @@ public class IndexStoreFile implements IndexFile {
"key: {}, maxCount: {}, timestamp={}-{}", getTimestamp(), key,
maxCount, beginTime, endTime, e);
} finally {
fileReadWriteLock.readLock().unlock();
- mappedFile.release();
+ if (held) {
+ mappedFile.release();
+ }
}
return CompletableFuture.completedFuture(result);
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java
index 3ab5914161..3302d0457d 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java
@@ -173,8 +173,7 @@ public class PosixFileSegment extends FileSegment {
return CompletableFuture.supplyAsync((Supplier<ByteBuffer>) () -> {
ByteBuffer byteBuffer = ByteBuffer.allocate(length);
try {
- readFileChannel.position(position);
- readFileChannel.read(byteBuffer);
+ readFileChannel.read(byteBuffer, position);
byteBuffer.flip();
byteBuffer.limit(length);
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/stream/CommitLogInputStream.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/stream/CommitLogInputStream.java
index e2d7354755..5175e6b4da 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/stream/CommitLogInputStream.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/stream/CommitLogInputStream.java
@@ -147,9 +147,9 @@ public class CommitLogInputStream extends
FileSegmentInputStream {
posInCurBuffer += readLen;
continue;
}
+ curBuf = bufferList.get(bufIndex);
remaining = curBuf.remaining() - posInCurBuffer;
readLen = Math.min(remaining, needRead);
- curBuf = bufferList.get(bufIndex);
if (posInCurBuffer < MessageFormatUtil.PHYSICAL_OFFSET_POSITION) {
realReadLen =
Math.min(MessageFormatUtil.PHYSICAL_OFFSET_POSITION - posInCurBuffer, readLen);
// read from commitLog buffer
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreFileTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreFileTest.java
index 10014ba76a..dbad2d1aba 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreFileTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreFileTest.java
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Paths;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
@@ -293,4 +294,52 @@ public class IndexStoreFileTest {
TOPIC_NAME + "1", KEY, 64, timestamp,
System.currentTimeMillis()).get();
Assert.assertEquals(3, itemList.size());
}
+
+ @Test
+ public void hashCodeAndMultiKeyPutTest() throws Exception {
+ // hashCode must never return negative, including Integer.MIN_VALUE
edge case.
+ // Old code (keyHash < 0 ? -keyHash : keyHash) overflows on MIN_VALUE.
+ for (int i = 0; i < 10000; i++) {
+
Assert.assertTrue(indexStoreFile.hashCode(UUID.randomUUID().toString()) >= 0);
+ }
+ String minValKey = "polygenelubricants";
+ if (minValKey.hashCode() == Integer.MIN_VALUE) {
+ Assert.assertEquals(0, indexStoreFile.hashCode(minValKey));
+ }
+
+ // All keys in a multi-key set must be queryable after putKey.
+ // Catches slotBuffer reuse bug where 2nd+ keys had 0-byte slot writes.
+ long timestamp = indexStoreFile.getTimestamp();
+ Set<String> multiKeys = new HashSet<>(Arrays.asList("key1", "key2",
"key3"));
+ Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey(
+ TOPIC_NAME, TOPIC_ID, QUEUE_ID, multiKeys, MESSAGE_OFFSET,
MESSAGE_SIZE, timestamp));
+ for (String key : multiKeys) {
+ List<IndexItem> items = indexStoreFile.queryAsync(
+ TOPIC_NAME, key, 10, timestamp, timestamp + 1000).get();
+ Assert.assertEquals("Key should be queryable: " + key, 1,
items.size());
+ }
+ }
+
+ @Test
+ public void queryMaxCountAndTimeDiffClampTest() throws Exception {
+ long timestamp = indexStoreFile.getTimestamp();
+
+ // queryAsync must return at most maxCount items (not maxCount + 1).
+ // Old code used result.size() > maxCount which allowed one extra.
+ for (int i = 0; i < 10; i++) {
+ Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey(
+ TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET + i,
MESSAGE_SIZE, timestamp));
+ }
+ List<IndexItem> items = indexStoreFile.queryAsync(
+ TOPIC_NAME, KEY, 3, timestamp, timestamp + 1000).get();
+ Assert.assertEquals(3, items.size());
+
+ // timeDiff clamped to [0, Integer.MAX_VALUE] to prevent overflow.
+ long futureTimestamp = timestamp + 1000;
+ Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey(
+ TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET,
MESSAGE_SIZE, futureTimestamp));
+ items = indexStoreFile.queryAsync(
+ TOPIC_NAME, KEY, 10, timestamp, futureTimestamp + 1000).get();
+ Assert.assertTrue(items.size() >= 1);
+ }
}
\ No newline at end of file