GitHub user humkum edited a discussion: 为什么客户端重置位点过程中要 sleep 10s?
```public synchronized void resetOffset(String topic, String group,
Map<MessageQueue, Long> offsetTable) {
DefaultMQPushConsumerImpl consumer = null;
try {
MQConsumerInner impl = this.consumerTable.get(group);
if (impl instanceof DefaultMQPushConsumerImpl) {
consumer = (DefaultMQPushConsumerImpl) impl;
} else {
log.info("[reset-offset] consumer does not exist. group={}",
group);
return;
}
consumer.suspend();
ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable =
consumer.getRebalanceImpl().getProcessQueueTable();
for (Map.Entry<MessageQueue, ProcessQueue> entry :
processQueueTable.entrySet()) {
MessageQueue mq = entry.getKey();
if (topic.equals(mq.getTopic()) && offsetTable.containsKey(mq))
{
ProcessQueue pq = entry.getValue();
pq.setDropped(true);
pq.clear();
}
}
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException ignored) {
}
Iterator<MessageQueue> iterator =
processQueueTable.keySet().iterator();
while (iterator.hasNext()) {
MessageQueue mq = iterator.next();
Long offset = offsetTable.get(mq);
if (topic.equals(mq.getTopic()) && offset != null) {
try {
consumer.updateConsumeOffset(mq, offset);
consumer.getRebalanceImpl().removeUnnecessaryMessageQueue(mq,
processQueueTable.get(mq));
iterator.remove();
} catch (Exception e) {
log.warn("reset offset failed. group={}, {}", group,
mq, e);
}
}
}
} finally {
if (consumer != null) {
consumer.resume();
}
}
}
```
这里等待 10s 的目的是什么呢?
GitHub link: https://github.com/apache/rocketmq/discussions/10088
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]