This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 152a955ca4 [ISSUE #9069] Fix the IndexFile
ConcurrentModificationException in tiered storage (#9071)
152a955ca4 is described below
commit 152a955ca467482d2f04a614c2cf23c9a9778e69
Author: wangshaojie4039 <[email protected]>
AuthorDate: Wed Dec 25 17:02:06 2024 +0800
[ISSUE #9069] Fix the IndexFile ConcurrentModificationException in tiered
storage (#9071)
---
.../tieredstore/common/GroupCommitContext.java | 70 ++++++++++++
.../core/MessageStoreDispatcherImpl.java | 93 ++++++++++++---
.../tieredstore/file/FlatFileInterface.java | 5 +-
.../rocketmq/tieredstore/file/FlatMessageFile.java | 42 ++++---
.../tieredstore/common/GroupCommitContextTest.java | 54 +++++++++
.../core/MessageStoreDispatcherImplTest.java | 125 +++++++++++++++++++++
.../tieredstore/file/FlatMessageFileTest.java | 8 ++
7 files changed, 359 insertions(+), 38 deletions(-)
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/GroupCommitContext.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/GroupCommitContext.java
new file mode 100644
index 0000000000..f677e7c934
--- /dev/null
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/GroupCommitContext.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.common;
+
+import java.util.List;
+import org.apache.rocketmq.store.DispatchRequest;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+
+public class GroupCommitContext {
+
+ private long endOffset;
+
+ private List<SelectMappedBufferResult> bufferList;
+
+ private List<DispatchRequest> dispatchRequests;
+
+ public long getEndOffset() {
+ return endOffset;
+ }
+
+ public void setEndOffset(long endOffset) {
+ this.endOffset = endOffset;
+ }
+
+ public List<SelectMappedBufferResult> getBufferList() {
+ return bufferList;
+ }
+
+ public void setBufferList(List<SelectMappedBufferResult> bufferList) {
+ this.bufferList = bufferList;
+ }
+
+ public List<DispatchRequest> getDispatchRequests() {
+ return dispatchRequests;
+ }
+
+ public void setDispatchRequests(List<DispatchRequest> dispatchRequests) {
+ this.dispatchRequests = dispatchRequests;
+ }
+
+ public void release() {
+ if (bufferList != null) {
+ for (SelectMappedBufferResult bufferResult : bufferList) {
+ bufferResult.release();
+ }
+ bufferList.clear();
+ bufferList = null;
+ }
+ if (dispatchRequests != null) {
+ dispatchRequests.clear();
+ dispatchRequests = null;
+ }
+
+ }
+}
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java
index 9b1e53564d..bcc4e225da 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java
@@ -16,15 +16,20 @@
*/
package org.apache.rocketmq.tieredstore.core;
+import com.google.common.annotations.VisibleForTesting;
import io.opentelemetry.api.common.Attributes;
import java.nio.ByteBuffer;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
@@ -42,6 +47,7 @@ import org.apache.rocketmq.tieredstore.MessageStoreExecutor;
import org.apache.rocketmq.tieredstore.TieredMessageStore;
import org.apache.rocketmq.tieredstore.common.AppendResult;
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
+import org.apache.rocketmq.tieredstore.common.GroupCommitContext;
import org.apache.rocketmq.tieredstore.file.FlatFileInterface;
import org.apache.rocketmq.tieredstore.file.FlatFileStore;
import org.apache.rocketmq.tieredstore.index.IndexService;
@@ -65,6 +71,7 @@ public class MessageStoreDispatcherImpl extends ServiceThread
implements Message
protected final MessageStoreFilter topicFilter;
protected final Semaphore semaphore;
protected final IndexService indexService;
+ protected final Map<FlatFileInterface, GroupCommitContext>
failedGroupCommitMap;
public MessageStoreDispatcherImpl(TieredMessageStore messageStore) {
this.messageStore = messageStore;
@@ -77,6 +84,7 @@ public class MessageStoreDispatcherImpl extends ServiceThread
implements Message
this.flatFileStore = messageStore.getFlatFileStore();
this.storeExecutor = messageStore.getStoreExecutor();
this.indexService = messageStore.getIndexService();
+ this.failedGroupCommitMap = new ConcurrentHashMap<>();
}
@Override
@@ -84,6 +92,11 @@ public class MessageStoreDispatcherImpl extends
ServiceThread implements Message
return MessageStoreDispatcher.class.getSimpleName();
}
+ @VisibleForTesting
+ public Map<FlatFileInterface, GroupCommitContext>
getFailedGroupCommitMap() {
+ return failedGroupCommitMap;
+ }
+
public void dispatchWithSemaphore(FlatFileInterface flatFile) {
try {
if (stopped) {
@@ -153,10 +166,22 @@ public class MessageStoreDispatcherImpl extends
ServiceThread implements Message
// If the previous commit fails, attempt to trigger a commit
directly.
if (commitOffset < currentOffset) {
- this.commitAsync(flatFile);
+ this.commitAsync(flatFile).whenComplete((result, throwable) ->
{
+ if (throwable != null) {
+ log.error("MessageDispatcher#flatFile commitOffset
less than currentOffset, commitAsync again failed. topic: {}, queueId: {} ",
topic, queueId, throwable);
+ }
+ });
return CompletableFuture.completedFuture(false);
}
+ if (failedGroupCommitMap.containsKey(flatFile)) {
+ GroupCommitContext failedCommit =
failedGroupCommitMap.get(flatFile);
+ if (failedCommit.getEndOffset() <= commitOffset) {
+ failedGroupCommitMap.remove(flatFile);
+ constructIndexFile(flatFile.getTopicId(), failedCommit);
+ }
+ }
+
if (currentOffset < minOffsetInQueue) {
log.warn("MessageDispatcher#dispatch, current offset is too
small, topic={}, queueId={}, offset={}-{}, current={}",
topic, queueId, minOffsetInQueue, maxOffsetInQueue,
currentOffset);
@@ -224,6 +249,8 @@ public class MessageStoreDispatcherImpl extends
ServiceThread implements Message
}
long offset = currentOffset;
+ List<SelectMappedBufferResult> appendingBufferList = new
ArrayList<>();
+ List<DispatchRequest> dispatchRequestList = new ArrayList<>();
for (; offset < targetOffset; offset++) {
cqUnit = consumeQueue.get(offset);
bufferSize += cqUnit.getSize();
@@ -231,6 +258,7 @@ public class MessageStoreDispatcherImpl extends
ServiceThread implements Message
break;
}
message =
defaultStore.selectOneMessageByOffset(cqUnit.getPos(), cqUnit.getSize());
+ appendingBufferList.add(message);
ByteBuffer byteBuffer = message.getByteBuffer();
AppendResult result = flatFile.appendCommitLog(message);
@@ -251,13 +279,20 @@ public class MessageStoreDispatcherImpl extends
ServiceThread implements Message
result = flatFile.appendConsumeQueue(dispatchRequest);
if (!AppendResult.SUCCESS.equals(result)) {
break;
+ } else {
+ dispatchRequestList.add(dispatchRequest);
}
}
+ GroupCommitContext groupCommitContext = new GroupCommitContext();
+ groupCommitContext.setEndOffset(offset);
+ groupCommitContext.setBufferList(appendingBufferList);
+ groupCommitContext.setDispatchRequests(dispatchRequestList);
+
// If there are many messages waiting to be uploaded, call the
upload logic immediately.
boolean repeat = timeout || maxOffsetInQueue - offset >
storeConfig.getTieredStoreGroupCommitCount();
- if (!flatFile.getDispatchRequestList().isEmpty()) {
+ if (!dispatchRequestList.isEmpty()) {
Attributes attributes =
TieredStoreMetricsManager.newAttributesBuilder()
.put(TieredStoreMetricsConstant.LABEL_TOPIC, topic)
.put(TieredStoreMetricsConstant.LABEL_QUEUE_ID, queueId)
@@ -265,8 +300,19 @@ public class MessageStoreDispatcherImpl extends
ServiceThread implements Message
.build();
TieredStoreMetricsManager.messagesDispatchTotal.add(offset -
currentOffset, attributes);
- this.commitAsync(flatFile).whenComplete((unused, throwable) ->
{
- if (repeat) {
+ this.commitAsync(flatFile).whenComplete((success, throwable)
-> {
+ if (success) {
+ constructIndexFile(flatFile.getTopicId(),
groupCommitContext);
+ }
+ else {
+ //next commit async,execute constructIndexFile.
+ GroupCommitContext oldCommit =
failedGroupCommitMap.put(flatFile, groupCommitContext);
+ if (oldCommit != null) {
+ log.warn("MessageDispatcher#commitAsync
failed,flatFile old failed commit context not release, topic={}, queueId={} ",
topic, queueId);
+ oldCommit.release();
+ }
+ }
+ if (success && repeat) {
storeExecutor.commonExecutor.submit(() ->
dispatchWithSemaphore(flatFile));
}
}
@@ -282,22 +328,28 @@ public class MessageStoreDispatcherImpl extends
ServiceThread implements Message
return CompletableFuture.completedFuture(false);
}
- public CompletableFuture<Void> commitAsync(FlatFileInterface flatFile) {
- return flatFile.commitAsync().thenAcceptAsync(success -> {
- if (success) {
- if (storeConfig.isMessageIndexEnable()) {
- flatFile.getDispatchRequestList().forEach(
- request -> constructIndexFile(flatFile.getTopicId(),
request));
+ public CompletableFuture<Boolean> commitAsync(FlatFileInterface flatFile) {
+ return flatFile.commitAsync();
+ }
+
+ public void constructIndexFile(long topicId, GroupCommitContext
groupCommitContext) {
+ MessageStoreExecutor.getInstance().bufferCommitExecutor.submit(() -> {
+ if (storeConfig.isMessageIndexEnable()) {
+ try {
+ groupCommitContext.getDispatchRequests().forEach(request
-> constructIndexFile0(topicId, request));
+ }
+ catch (Throwable e) {
+ log.error("constructIndexFile error {}", topicId, e);
}
- flatFile.release();
}
- }, storeExecutor.bufferCommitExecutor);
+ groupCommitContext.release();
+ });
}
/**
* Building indexes with offsetId is no longer supported because offsetId
has changed in tiered storage
*/
- public void constructIndexFile(long topicId, DispatchRequest request) {
+ public void constructIndexFile0(long topicId, DispatchRequest request) {
Set<String> keySet = new HashSet<>();
if (StringUtils.isNotBlank(request.getUniqKey())) {
keySet.add(request.getUniqKey());
@@ -309,12 +361,27 @@ public class MessageStoreDispatcherImpl extends
ServiceThread implements Message
request.getCommitLogOffset(), request.getMsgSize(),
request.getStoreTimestamp());
}
+ public void releaseClosedPendingGroupCommit() {
+ Iterator<Map.Entry<FlatFileInterface, GroupCommitContext>> iterator =
failedGroupCommitMap.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<FlatFileInterface, GroupCommitContext> entry =
iterator.next();
+ if (entry.getKey().isClosed()) {
+ entry.getValue().release();
+ iterator.remove();
+ }
+ }
+ }
+
+
@Override
public void run() {
log.info("{} service started", this.getServiceName());
while (!this.isStopped()) {
try {
flatFileStore.deepCopyFlatFileToList().forEach(this::dispatchWithSemaphore);
+
+ releaseClosedPendingGroupCommit();
+
this.waitForRunning(Duration.ofSeconds(20).toMillis());
} catch (Throwable t) {
log.error("MessageStore dispatch error", t);
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileInterface.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileInterface.java
index 619470fbc2..01e7f25a46 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileInterface.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileInterface.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.tieredstore.file;
import java.nio.ByteBuffer;
-import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.Lock;
import org.apache.rocketmq.common.BoundaryType;
@@ -58,8 +57,6 @@ public interface FlatFileInterface {
*/
AppendResult appendConsumeQueue(DispatchRequest request);
- List<DispatchRequest> getDispatchRequestList();
-
void release();
long getMinStoreTimestamp();
@@ -143,6 +140,8 @@ public interface FlatFileInterface {
*/
CompletableFuture<Long> getQueueOffsetByTimeAsync(long timestamp,
BoundaryType boundaryType);
+ boolean isClosed();
+
/**
* Shutdown process
*/
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
index d5675976cb..4510a8a127 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
@@ -17,12 +17,14 @@
package org.apache.rocketmq.tieredstore.file;
import com.alibaba.fastjson.JSON;
+import com.google.common.annotations.VisibleForTesting;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -51,14 +53,13 @@ public class FlatMessageFile implements FlatFileInterface {
protected final String filePath;
protected final ReentrantLock fileLock;
+ protected final Semaphore commitLock = new Semaphore(1);
protected final MessageStoreConfig storeConfig;
protected final MetadataStore metadataStore;
protected final FlatCommitLogFile commitLog;
protected final FlatConsumeQueueFile consumeQueue;
protected final AtomicLong lastDestroyTime;
- protected final List<SelectMappedBufferResult> bufferResultList;
- protected final List<DispatchRequest> dispatchRequestList;
protected final ConcurrentMap<String, CompletableFuture<?>>
inFlightRequestMap;
public FlatMessageFile(FlatFileFactory fileFactory, String topic, int
queueId) {
@@ -76,8 +77,6 @@ public class FlatMessageFile implements FlatFileInterface {
this.commitLog = fileFactory.createFlatFileForCommitLog(filePath);
this.consumeQueue =
fileFactory.createFlatFileForConsumeQueue(filePath);
this.lastDestroyTime = new AtomicLong();
- this.bufferResultList = new ArrayList<>();
- this.dispatchRequestList = new ArrayList<>();
this.inFlightRequestMap = new ConcurrentHashMap<>();
}
@@ -127,6 +126,11 @@ public class FlatMessageFile implements FlatFileInterface {
return this.fileLock;
}
+ @VisibleForTesting
+ public Semaphore getCommitLock() {
+ return commitLock;
+ }
+
@Override
public boolean rollingFile(long interval) {
return this.commitLog.tryRollingFile(interval);
@@ -156,7 +160,6 @@ public class FlatMessageFile implements FlatFileInterface {
if (closed) {
return AppendResult.FILE_CLOSED;
}
- this.bufferResultList.add(message);
return this.appendCommitLog(message.getByteBuffer());
}
@@ -172,29 +175,14 @@ public class FlatMessageFile implements FlatFileInterface
{
buffer.putLong(request.getTagsCode());
buffer.flip();
- this.dispatchRequestList.add(request);
return consumeQueue.append(buffer, request.getStoreTimestamp());
}
- @Override
- public List<DispatchRequest> getDispatchRequestList() {
- return dispatchRequestList;
- }
+
@Override
public void release() {
- for (SelectMappedBufferResult bufferResult : bufferResultList) {
- bufferResult.release();
- }
-
- if (queueMetadata != null) {
- log.trace("FlatMessageFile release, topic={}, queueId={},
bufferSize={}, requestListSize={}",
- queueMetadata.getQueue().getTopic(),
queueMetadata.getQueue().getQueueId(),
- bufferResultList.size(), dispatchRequestList.size());
- }
- bufferResultList.clear();
- dispatchRequestList.clear();
}
@Override
@@ -246,13 +234,18 @@ public class FlatMessageFile implements FlatFileInterface
{
@Override
public CompletableFuture<Boolean> commitAsync() {
+ // acquire lock
+ if (commitLock.drainPermits() <= 0) {
+ return CompletableFuture.completedFuture(false);
+ }
+
return this.commitLog.commitAsync()
.thenCompose(result -> {
if (result) {
return consumeQueue.commitAsync();
}
return CompletableFuture.completedFuture(false);
- });
+ }).whenComplete((result, throwable) -> commitLock.release());
}
@Override
@@ -363,6 +356,11 @@ public class FlatMessageFile implements FlatFileInterface {
return StringUtils.equals(filePath, ((FlatMessageFile) obj).filePath);
}
+ @Override
+ public boolean isClosed() {
+ return closed;
+ }
+
@Override
public void shutdown() {
closed = true;
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/common/GroupCommitContextTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/common/GroupCommitContextTest.java
new file mode 100644
index 0000000000..e692360761
--- /dev/null
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/common/GroupCommitContextTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tieredstore.common;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.rocketmq.store.DispatchRequest;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class GroupCommitContextTest {
+
+ @Test
+ public void groupCommitContextTest() {
+ GroupCommitContext releaseGroupCommitContext = new
GroupCommitContext();
+ releaseGroupCommitContext.release();
+
+ long endOffset = 1000;
+ List<DispatchRequest> dispatchRequestList = new ArrayList<>();
+ dispatchRequestList.add(new DispatchRequest(1000));
+ List<SelectMappedBufferResult> selectMappedBufferResultList = new
ArrayList<>();
+ selectMappedBufferResultList.add(new SelectMappedBufferResult(100,
ByteBuffer.allocate(10), 1000, null));
+ GroupCommitContext groupCommitContext = new GroupCommitContext();
+ groupCommitContext.setEndOffset(endOffset);
+ groupCommitContext.setBufferList(selectMappedBufferResultList);
+ groupCommitContext.setDispatchRequests(dispatchRequestList);
+
+ Assert.assertTrue(groupCommitContext.getEndOffset() == endOffset);
+
Assert.assertTrue(groupCommitContext.getBufferList().equals(selectMappedBufferResultList));
+
Assert.assertTrue(groupCommitContext.getDispatchRequests().equals(dispatchRequestList));
+ groupCommitContext.release();
+ Assert.assertTrue(groupCommitContext.getDispatchRequests() == null);
+ Assert.assertTrue(groupCommitContext.getBufferList() == null);
+ Assert.assertTrue(dispatchRequestList.isEmpty());
+ Assert.assertTrue(selectMappedBufferResultList.isEmpty());
+ }
+
+}
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java
index 92e989e596..6b96076948 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java
@@ -35,6 +35,7 @@ import org.apache.rocketmq.store.queue.CqUnit;
import org.apache.rocketmq.tieredstore.MessageStoreConfig;
import org.apache.rocketmq.tieredstore.MessageStoreExecutor;
import org.apache.rocketmq.tieredstore.TieredMessageStore;
+import org.apache.rocketmq.tieredstore.common.GroupCommitContext;
import org.apache.rocketmq.tieredstore.file.FlatFileFactory;
import org.apache.rocketmq.tieredstore.file.FlatFileStore;
import org.apache.rocketmq.tieredstore.file.FlatMessageFile;
@@ -157,6 +158,130 @@ public class MessageStoreDispatcherImplTest {
Assert.assertEquals(200L, flatFile.getConsumeQueueCommitOffset());
}
+ @Test
+ public void dispatchCommitFailedTest() throws Exception {
+ MessageStore defaultStore = Mockito.mock(MessageStore.class);
+ Mockito.when(defaultStore.getMinOffsetInQueue(anyString(),
anyInt())).thenReturn(100L);
+ Mockito.when(defaultStore.getMaxOffsetInQueue(anyString(),
anyInt())).thenReturn(200L);
+
+ messageStore = Mockito.mock(TieredMessageStore.class);
+ IndexService indexService =
+ new IndexStoreService(new FlatFileFactory(metadataStore,
storeConfig), storePath);
+ indexService.start();
+ Mockito.when(messageStore.getDefaultStore()).thenReturn(defaultStore);
+ Mockito.when(messageStore.getStoreConfig()).thenReturn(storeConfig);
+ Mockito.when(messageStore.getStoreExecutor()).thenReturn(executor);
+ Mockito.when(messageStore.getFlatFileStore()).thenReturn(fileStore);
+ Mockito.when(messageStore.getIndexService()).thenReturn(indexService);
+
+ // mock message
+ ByteBuffer buffer = MessageFormatUtilTest.buildMockedMessageBuffer();
+ MessageExt messageExt = MessageDecoder.decode(buffer);
+ messageExt.setKeys("Key");
+ MessageAccessor.putProperty(
+ messageExt, MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
"uk");
+ messageExt.setBody(new byte[10]);
+ messageExt.setStoreSize(0);
+ buffer = ByteBuffer.wrap(MessageDecoder.encode(messageExt, false));
+ buffer.putInt(0, buffer.remaining());
+
+ DispatchRequest request = new DispatchRequest(mq.getTopic(),
mq.getQueueId(),
+ MessageFormatUtil.getCommitLogOffset(buffer), buffer.remaining(),
0L,
+ MessageFormatUtil.getStoreTimeStamp(buffer), 0L,
+ "", "", 0, 0L, new HashMap<>());
+
+ // construct flat file
+ MessageStoreDispatcher dispatcher = new
MessageStoreDispatcherImpl(messageStore);
+ dispatcher.dispatch(request);
+ FlatMessageFile flatFile = fileStore.getFlatFile(mq);
+ Assert.assertNotNull(flatFile);
+
+ // init offset
+ dispatcher.doScheduleDispatch(flatFile, true).join();
+ Assert.assertEquals(100L, flatFile.getConsumeQueueMinOffset());
+ Assert.assertEquals(100L, flatFile.getConsumeQueueMaxOffset());
+ Assert.assertEquals(100L, flatFile.getConsumeQueueCommitOffset());
+
+ ConsumeQueueInterface cq = Mockito.mock(ConsumeQueueInterface.class);
+ Mockito.when(defaultStore.getConsumeQueue(anyString(),
anyInt())).thenReturn(cq);
+ Mockito.when(cq.get(anyLong())).thenReturn(
+ new CqUnit(100, 1000, buffer.remaining(), 0L));
+ Mockito.when(defaultStore.selectOneMessageByOffset(anyLong(),
anyInt())).thenReturn(
+ new SelectMappedBufferResult(0L, buffer.asReadOnlyBuffer(),
buffer.remaining(), null));
+ flatFile.getCommitLock().drainPermits();
+ dispatcher.doScheduleDispatch(flatFile, true).join();
+ GroupCommitContext groupCommitContext =
((MessageStoreDispatcherImpl)dispatcher).getFailedGroupCommitMap().get(flatFile);
+ Assert.assertTrue(groupCommitContext != null);
+ Assert.assertTrue(groupCommitContext.getEndOffset() == 200);
+ flatFile.getCommitLock().release();
+ flatFile.commitAsync().join();
+ dispatcher.doScheduleDispatch(flatFile, true).join();
+
Assert.assertTrue(((MessageStoreDispatcherImpl)dispatcher).getFailedGroupCommitMap().get(flatFile)
== null);
+ ((MessageStoreDispatcherImpl)dispatcher).flatFileStore.destroyFile(mq);
+
((MessageStoreDispatcherImpl)dispatcher).releaseClosedPendingGroupCommit();
+
+ }
+
+ @Test
+ public void dispatchFailedGroupCommitMapReleaseTest() throws Exception {
+ MessageStore defaultStore = Mockito.mock(MessageStore.class);
+ Mockito.when(defaultStore.getMinOffsetInQueue(anyString(),
anyInt())).thenReturn(100L);
+ Mockito.when(defaultStore.getMaxOffsetInQueue(anyString(),
anyInt())).thenReturn(200L);
+
+ messageStore = Mockito.mock(TieredMessageStore.class);
+ IndexService indexService =
+ new IndexStoreService(new FlatFileFactory(metadataStore,
storeConfig), storePath);
+ indexService.start();
+ Mockito.when(messageStore.getDefaultStore()).thenReturn(defaultStore);
+ Mockito.when(messageStore.getStoreConfig()).thenReturn(storeConfig);
+ Mockito.when(messageStore.getStoreExecutor()).thenReturn(executor);
+ Mockito.when(messageStore.getFlatFileStore()).thenReturn(fileStore);
+ Mockito.when(messageStore.getIndexService()).thenReturn(indexService);
+
+ // mock message
+ ByteBuffer buffer = MessageFormatUtilTest.buildMockedMessageBuffer();
+ MessageExt messageExt = MessageDecoder.decode(buffer);
+ messageExt.setKeys("Key");
+ MessageAccessor.putProperty(
+ messageExt, MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
"uk");
+ messageExt.setBody(new byte[10]);
+ messageExt.setStoreSize(0);
+ buffer = ByteBuffer.wrap(MessageDecoder.encode(messageExt, false));
+ buffer.putInt(0, buffer.remaining());
+
+ DispatchRequest request = new DispatchRequest(mq.getTopic(),
mq.getQueueId(),
+ MessageFormatUtil.getCommitLogOffset(buffer), buffer.remaining(),
0L,
+ MessageFormatUtil.getStoreTimeStamp(buffer), 0L,
+ "", "", 0, 0L, new HashMap<>());
+
+ // construct flat file
+ MessageStoreDispatcher dispatcher = new
MessageStoreDispatcherImpl(messageStore);
+ dispatcher.dispatch(request);
+ FlatMessageFile flatFile = fileStore.getFlatFile(mq);
+ Assert.assertNotNull(flatFile);
+
+ // init offset
+ dispatcher.doScheduleDispatch(flatFile, true).join();
+ Assert.assertEquals(100L, flatFile.getConsumeQueueMinOffset());
+ Assert.assertEquals(100L, flatFile.getConsumeQueueMaxOffset());
+ Assert.assertEquals(100L, flatFile.getConsumeQueueCommitOffset());
+
+ ConsumeQueueInterface cq = Mockito.mock(ConsumeQueueInterface.class);
+ Mockito.when(defaultStore.getConsumeQueue(anyString(),
anyInt())).thenReturn(cq);
+ Mockito.when(cq.get(anyLong())).thenReturn(
+ new CqUnit(100, 1000, buffer.remaining(), 0L));
+ Mockito.when(defaultStore.selectOneMessageByOffset(anyLong(),
anyInt())).thenReturn(
+ new SelectMappedBufferResult(0L, buffer.asReadOnlyBuffer(),
buffer.remaining(), null));
+ flatFile.getCommitLock().drainPermits();
+ dispatcher.doScheduleDispatch(flatFile, true).join();
+ GroupCommitContext groupCommitContext =
((MessageStoreDispatcherImpl)dispatcher).getFailedGroupCommitMap().get(flatFile);
+ Assert.assertTrue(groupCommitContext != null);
+ ((MessageStoreDispatcherImpl)dispatcher).flatFileStore.destroyFile(mq);
+
((MessageStoreDispatcherImpl)dispatcher).releaseClosedPendingGroupCommit();
+
Assert.assertTrue(((MessageStoreDispatcherImpl)dispatcher).getFailedGroupCommitMap().get(flatFile)
== null);
+
+ }
+
@Test
public void dispatchServiceTest() {
MessageStore defaultStore = Mockito.mock(MessageStore.class);
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java
index 8a417f54a7..8208d27741 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java
@@ -216,4 +216,12 @@ public class FlatMessageFileTest {
flatFile.destroy();
}
+
+ @Test
+ public void testCommitLock() {
+ String topic = "CommitLogTest";
+ FlatMessageFile flatFile = new FlatMessageFile(flatFileFactory, topic,
0);
+ flatFile.getCommitLock().drainPermits();
+ Assert.assertFalse(flatFile.commitAsync().join());
+ }
}