rhauch commented on a change in pull request #10158: URL: https://github.com/apache/kafka/pull/10158#discussion_r578919850
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ########## @@ -366,6 +348,44 @@ private void readToLogEnd() { } } + // Visible for testing + Map<TopicPartition, Long> readEndOffsets(Set<TopicPartition> assignment) { + log.trace("Reading to end of offset log"); + + Map<TopicPartition, Long> endOffsets; + // Note that we'd prefer to not use the consumer to find the end offsets for the assigned topic partitions. + // That is because it's possible that the consumer is already blocked waiting for new records to appear, when + // the consumer is already at the end. In such cases, using 'consumer.endOffsets(...)' will block until at least + // one more record becomes available, meaning we can't even check whether we're at the end offset. + // Since all we're trying to do here is get the end offset, we should use the supplied admin client + // (if available) + // (which prevents 'consumer.endOffsets(...)' + // from + + // Deprecated constructors do not provide an admin supplier, so the admin is potentially null. + if (useAdminForListOffsets) { + // Use the admin client to immediately find the end offsets for the assigned topic partitions. + // Unlike using the consumer + try { + endOffsets = admin.endOffsets(assignment); Review comment: I did see a way of reducing the # of returns to 2, and removing one of the duplicated lines. Thanks! ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org