GitHub user zhaomengit edited a discussion:
DefaultMessageStore.start方法maxPhysicalPosInLogicQueue变量赋值的问题
```java
/**
* 1. Make sure the fast-forward messages to be truncated during
the recovering according to the max physical offset of the commitlog;
* 2. DLedger committedPos may be missing, so the
maxPhysicalPosInLogicQueue maybe bigger that maxOffset returned by
DLedgerCommitLog, just let it go;
* 3. Calculate the reput offset according to the consume queue;
* 4. Make sure the fall-behind messages to be dispatched before
starting the commitlog, especially when the broker role are automatically
changed.
*/
long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
for (ConcurrentMap<Integer, ConsumeQueue> maps :
this.consumeQueueTable.values()) {
for (ConsumeQueue logic : maps.values()) {
if (logic.getMaxPhysicOffset() >
maxPhysicalPosInLogicQueue) {
maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset();
}
}
}
// If maxPhyPos(CQs) < minPhyPos(CommitLog), some newly deleted
topics may be re-dispatched into cqs mistakenly.
if (maxPhysicalPosInLogicQueue < 0) {
maxPhysicalPosInLogicQueue = 0;
}
if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) {
maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset();
/**
* This happens in following conditions:
* 1. If someone removes all the consumequeue files or the disk
get damaged.
* 2. Launch a new broker, and copy the commitlog from other
brokers.
*
* All the conditions has the same in common that the
maxPhysicalPosInLogicQueue should be 0.
* If the maxPhysicalPosInLogicQueue is gt 0, there maybe
something wrong.
*/
log.warn("[TooSmallCqOffset] maxPhysicalPosInLogicQueue={}
clMinOffset={}", maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset());
}
log.info("[SetReputOffset] maxPhysicalPosInLogicQueue={}
clMinOffset={} clMaxOffset={} clConfirmedOffset={}",
maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset(),
this.commitLog.getMaxOffset(), this.commitLog.getConfirmOffset());
this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
this.reputMessageService.start();
```
经过下面这段代码,maxPhysicalPosInLogicQueue 已经至少比commitLog.getMinOffset()大了
```java
long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
for (ConcurrentMap<Integer, ConsumeQueue> maps :
this.consumeQueueTable.values()) {
for (ConsumeQueue logic : maps.values()) {
if (logic.getMaxPhysicOffset() >
maxPhysicalPosInLogicQueue) {
maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset();
}
}
}
```
下面的这个条件` if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) `
在什么情况下会发生,下面的注释没怎么看明白,为什么删除所有消费队列文件和磁盘损坏会发生这种情况
GitHub link: https://github.com/apache/rocketmq/discussions/8303
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]