rhauch commented on a change in pull request #10158: URL: https://github.com/apache/kafka/pull/10158#discussion_r579288641
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ########## @@ -366,6 +346,42 @@ private void readToLogEnd() { } } + // Visible for testing + Map<TopicPartition, Long> readEndOffsets(Set<TopicPartition> assignment) { + log.trace("Reading to end of offset log"); + + Map<TopicPartition, Long> endOffsets; + // Note that we'd prefer to not use the consumer to find the end offsets for the assigned topic partitions. + // That is because it's possible that the consumer is already blocked waiting for new records to appear, when + // the consumer is already at the end. In such cases, using 'consumer.endOffsets(...)' will block until at least + // one more record becomes available, meaning we can't even check whether we're at the end offset. + // Since all we're trying to do here is get the end offset, we should use the supplied admin client + // (if available) + // (which prevents 'consumer.endOffsets(...)' + // from + + // Deprecated constructors do not provide an admin supplier, so the admin is potentially null. + if (useAdminForListOffsets) { + // Use the admin client to immediately find the end offsets for the assigned topic partitions. + // Unlike using the consumer + try { + return admin.endOffsets(assignment); + } catch (UnsupportedVersionException e) { + // This may happen with really old brokers that don't support the auto topic creation + // field in metadata requests + log.debug("Reading to end of log offsets with consumer since admin client is unsupported: {}", e.getMessage()); + useAdminForListOffsets = false; Review comment: I'm not sure the GC will really be any better if we were to use `admin = null` here, since the supplier is in charge of the lifecycle of the `TopicAdmin` instance, and our current implementation is that the `SharedTopicAdmin` is closed only when the herder is closed. I did originally do what you're suggesting though, but changed it since technically we could use the topic admin for something else. Granted, the current code doesn't need it for anything else. I thought that using the variable made it a bit clearer that the variable controlled the behavior of the `readEndOffsets(...)` method using or not using the admin client. But I think you're main point is that this can be simpler by using `admin = null`, and I can't really argue with that. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org