GitHub user zhaomengit edited a discussion: 
DefaultMessageStore.start方法maxPhysicalPosInLogicQueue变量赋值的问题

```java
            /**
             * 1. Make sure the fast-forward messages to be truncated during 
the recovering according to the max physical offset of the commitlog;
             * 2. DLedger committedPos may be missing, so the 
maxPhysicalPosInLogicQueue maybe bigger that maxOffset returned by 
DLedgerCommitLog, just let it go;
             * 3. Calculate the reput offset according to the consume queue;
             * 4. Make sure the fall-behind messages to be dispatched before 
starting the commitlog, especially when the broker role are automatically 
changed.
             */
            long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
            for (ConcurrentMap<Integer, ConsumeQueue> maps : 
this.consumeQueueTable.values()) {
                for (ConsumeQueue logic : maps.values()) {
                    if (logic.getMaxPhysicOffset() > 
maxPhysicalPosInLogicQueue) {
                        maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset();
                    }
                }
            }
            // If maxPhyPos(CQs) < minPhyPos(CommitLog), some newly deleted 
topics may be re-dispatched into cqs mistakenly.
            if (maxPhysicalPosInLogicQueue < 0) {
                maxPhysicalPosInLogicQueue = 0;
            }
            if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) {
                maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset();
                /**
                 * This happens in following conditions:
                 * 1. If someone removes all the consumequeue files or the disk 
get damaged.
                 * 2. Launch a new broker, and copy the commitlog from other 
brokers.
                 *
                 * All the conditions has the same in common that the 
maxPhysicalPosInLogicQueue should be 0.
                 * If the maxPhysicalPosInLogicQueue is gt 0, there maybe 
something wrong.
                 */
                log.warn("[TooSmallCqOffset] maxPhysicalPosInLogicQueue={} 
clMinOffset={}", maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset());
            }
            log.info("[SetReputOffset] maxPhysicalPosInLogicQueue={} 
clMinOffset={} clMaxOffset={} clConfirmedOffset={}",
                maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset(), 
this.commitLog.getMaxOffset(), this.commitLog.getConfirmOffset());
            
this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
            this.reputMessageService.start();
```

经过下面这段代码,maxPhysicalPosInLogicQueue 已经至少比commitLog.getMinOffset()大了
```java
            long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
            for (ConcurrentMap<Integer, ConsumeQueue> maps : 
this.consumeQueueTable.values()) {
                for (ConsumeQueue logic : maps.values()) {
                    if (logic.getMaxPhysicOffset() > 
maxPhysicalPosInLogicQueue) {
                        maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset();
                    }
                }
            }
```
下面的这个条件` if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) ` 
在什么情况下会发生,下面的注释没怎么看明白,为什么删除所有消费队列文件和磁盘损坏会发生这种情况

GitHub link: https://github.com/apache/rocketmq/discussions/8303

----
This is an automatically sent email for dev@rocketmq.apache.org.
To unsubscribe, please send an email to: dev-unsubscr...@rocketmq.apache.org

Reply via email to