This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 2c898c9b31 [ISSUE #7689] In Controller mode, messages may lost due to
sharing the same cq offset (#7690)
2c898c9b31 is described below
commit 2c898c9b31bf195174cf1e3a626a7c61f7576381
Author: Ji Juntao <[email protected]>
AuthorDate: Thu Dec 21 14:41:51 2023 +0800
[ISSUE #7689] In Controller mode, messages may lost due to sharing the same
cq offset (#7690)
* fix the reput bug.
* add more logs.
* refactor the method of compensating for HA.
* not modify the imports.
* refactor the log.
* refactor the log.
---
.../rocketmq/store/queue/ConsumeQueueStore.java | 76 ++++++++++++----------
1 file changed, 40 insertions(+), 36 deletions(-)
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
index 616511b67f..cbe9b4f5ac 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
@@ -455,48 +455,52 @@ public class ConsumeQueueStore extends
AbstractConsumeQueueStore {
}
// Correct unSubmit consumeOffset
- if (messageStoreConfig.isDuplicationEnable()) {
- SelectMappedBufferResult lastBuffer = null;
- long startReadOffset =
messageStore.getCommitLog().getConfirmOffset() == -1 ? 0 :
messageStore.getCommitLog().getConfirmOffset();
- while ((lastBuffer =
messageStore.selectOneMessageByOffset(startReadOffset)) != null) {
- try {
- if (lastBuffer.getStartOffset() > startReadOffset) {
- startReadOffset = lastBuffer.getStartOffset();
- continue;
- }
+ if (messageStoreConfig.isDuplicationEnable() ||
messageStore.getBrokerConfig().isEnableControllerMode()) {
+ compensateForHA(cqOffsetTable);
+ }
- ByteBuffer bb = lastBuffer.getByteBuffer();
- int magicCode = bb.getInt(bb.position() + 4);
- if (magicCode == CommitLog.BLANK_MAGIC_CODE) {
- startReadOffset += bb.getInt(bb.position());
- continue;
- } else if (magicCode != MessageDecoder.MESSAGE_MAGIC_CODE)
{
- throw new RuntimeException("Unknown magicCode: " +
magicCode);
- }
+ this.setTopicQueueTable(cqOffsetTable);
+ this.setBatchTopicQueueTable(bcqOffsetTable);
+ }
+ private void compensateForHA(ConcurrentMap<String, Long> cqOffsetTable) {
+ SelectMappedBufferResult lastBuffer = null;
+ long startReadOffset = messageStore.getCommitLog().getConfirmOffset()
== -1 ? 0 : messageStore.getCommitLog().getConfirmOffset();
+ log.info("Correct unsubmitted offset...StartReadOffset = {}",
startReadOffset);
+ while ((lastBuffer =
messageStore.selectOneMessageByOffset(startReadOffset)) != null) {
+ try {
+ if (lastBuffer.getStartOffset() > startReadOffset) {
+ startReadOffset = lastBuffer.getStartOffset();
+ continue;
+ }
- lastBuffer.getByteBuffer().mark();
- DispatchRequest dispatchRequest =
messageStore.getCommitLog().checkMessageAndReturnSize(lastBuffer.getByteBuffer(),
true, true, true);
- if (!dispatchRequest.isSuccess())
- break;
- lastBuffer.getByteBuffer().reset();
-
- MessageExt msg =
MessageDecoder.decode(lastBuffer.getByteBuffer(), true, false, false, false,
true);
- if (msg == null)
- break;
-
- String key = msg.getTopic() + "-" + msg.getQueueId();
- cqOffsetTable.put(key, msg.getQueueOffset() + 1);
- startReadOffset += msg.getStoreSize();
- } finally {
- if (lastBuffer != null)
- lastBuffer.release();
+ ByteBuffer bb = lastBuffer.getByteBuffer();
+ int magicCode = bb.getInt(bb.position() + 4);
+ if (magicCode == CommitLog.BLANK_MAGIC_CODE) {
+ startReadOffset += bb.getInt(bb.position());
+ continue;
+ } else if (magicCode != MessageDecoder.MESSAGE_MAGIC_CODE) {
+ throw new RuntimeException("Unknown magicCode: " +
magicCode);
}
+ lastBuffer.getByteBuffer().mark();
+ DispatchRequest dispatchRequest =
messageStore.getCommitLog().checkMessageAndReturnSize(lastBuffer.getByteBuffer(),
true, messageStoreConfig.isDuplicationEnable(), true);
+ if (!dispatchRequest.isSuccess())
+ break;
+ lastBuffer.getByteBuffer().reset();
+
+ MessageExt msg =
MessageDecoder.decode(lastBuffer.getByteBuffer(), true, false, false, false,
true);
+ if (msg == null)
+ break;
+
+ String key = msg.getTopic() + "-" + msg.getQueueId();
+ cqOffsetTable.put(key, msg.getQueueOffset() + 1);
+ startReadOffset += msg.getStoreSize();
+ log.info("Correcting. Key:{}, start read Offset: {}", key,
startReadOffset);
+ } finally {
+ if (lastBuffer != null)
+ lastBuffer.release();
}
}
-
- this.setTopicQueueTable(cqOffsetTable);
- this.setBatchTopicQueueTable(bcqOffsetTable);
}
@Override