GitHub user Bngel edited a discussion:
在ConsumerOffsetManager进行消费位点提交的时候如果在这个情况下有高并发会有一定概率丢失其中的一个队列的消费位点吗?
`ConsumerOffsetManager.java`
```java
private void commitOffset(final String clientHost, final String key, final int
queueId, final long offset) {
ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
if (null == map) {
map = new ConcurrentHashMap<Integer, Long>(32);
map.put(queueId, offset);
this.offsetTable.put(key, map);
} else {
Long storeOffset = map.put(queueId, offset);
if (storeOffset != null && offset < storeOffset) {
LOG.warn("[NOTIFYME]update consumer offset less than store.
clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}",
clientHost, key, queueId, offset, storeOffset);
}
}
if (versionChangeCounter.incrementAndGet() %
brokerController.getBrokerConfig().getConsumerOffsetUpdateVersionStep() == 0) {
long stateMachineVersion = brokerController.getMessageStore() != null ?
brokerController.getMessageStore().getStateMachineVersion() : 0;
dataVersion.nextVersion(stateMachineVersion);
}
}
```
在这段代码里,假设两个请求同时到达,这个时候 `offsetTable`
里没有任何数据,两个请求来自两个监听同一个`topic@group`的队列,那么这个时候假设一个队列先 ` ConcurrentMap<Integer,
Long> map = this.offsetTable.get(key);`,这个时候 `null ==
map`成立,进行对应put,另一个队列同理,这样的话是不是覆盖掉了其中的一个`offset`?是不是因为只需要对offset的大小进行比较,如果较小的被覆盖掉也无所谓,所以这个地方并不需要保证强一致性?
GitHub link: https://github.com/apache/rocketmq/discussions/9448
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]