Justinwins created KAFKA-14131:
----------------------------------
Summary: KafkaBasedLog#readToLogEnd() may accciedently falls into
infinite loop
Key: KAFKA-14131
URL: https://issues.apache.org/jira/browse/KAFKA-14131
Project: Kafka
Issue Type: Bug
Components: mirrormaker
Reporter: Justinwins
When a herder starts ,its KafkaOffsetBackingStore will readToLogEnd() by
DistributedHerder.herderExecutor of name "Distrubuted-connect-" thread , e.g .
Distrubuted-connect-28-1 , which may consume a few minutes.
If another thread tries to shut down this herder , it will block for
"task.shutdown.graceful.timeout.ms ' before the
DistributedHerder.herderExecutor is interrupted.
And if thread in DistributedHerder.herderExecutor is interupted,
KafkaOffsetBackingStore.readToLogEnd() will poll(Integer.MAX_VALUE) and log "
Error polling" as the the read has been interrupted, then "consumer.position"
will not advance, readToLogEnd() falls into infinite loop.
```
private void readToLogEnd() {
Set<TopicPartition> assignment = consumer.assignment();
Map<TopicPartition, Long> endOffsets = readEndOffsets(assignment);
log.trace("Reading to end of log offsets {}", endOffsets);
while (!endOffsets.isEmpty()) { // this loop will never jump out
Iterator<Map.Entry<TopicPartition, Long>> it = endOffsets.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<TopicPartition, Long> entry = it.next();
TopicPartition topicPartition = entry.getKey();
long endOffset = entry.getValue();
long lastConsumedOffset = consumer.position(topicPartition); // when thread was
in interupted status ,consumer.position will not advance
if (lastConsumedOffset >= endOffset) {
log.trace("Read to end offset {} for {}", endOffset, topicPartition);
it.remove();
} else {
log.trace("Behind end offset {} for {}; last-read offset is {}",
endOffset, topicPartition, lastConsumedOffset);
poll(Integer.MAX_VALUE); // here , poll() will catch InterruptedException and
log it without throwing it up
break;
}
}
}
}
```
--
This message was sent by Atlassian Jira
(v8.20.10#820010)