rhauch commented on a change in pull request #10158:
URL: https://github.com/apache/kafka/pull/10158#discussion_r578919850



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -366,6 +348,44 @@ private void readToLogEnd() {
         }
     }
 
+    // Visible for testing
+    Map<TopicPartition, Long> readEndOffsets(Set<TopicPartition> assignment) {
+        log.trace("Reading to end of offset log");
+
+        Map<TopicPartition, Long> endOffsets;
+        // Note that we'd prefer to not use the consumer to find the end 
offsets for the assigned topic partitions.
+        // That is because it's possible that the consumer is already blocked 
waiting for new records to appear, when
+        // the consumer is already at the end. In such cases, using 
'consumer.endOffsets(...)' will block until at least
+        // one more record becomes available, meaning we can't even check 
whether we're at the end offset.
+        // Since all we're trying to do here is get the end offset, we should 
use the supplied admin client
+        // (if available)
+        // (which prevents 'consumer.endOffsets(...)'
+        // from
+
+        // Deprecated constructors do not provide an admin supplier, so the 
admin is potentially null.
+        if (useAdminForListOffsets) {
+            // Use the admin client to immediately find the end offsets for 
the assigned topic partitions.
+            // Unlike using the consumer
+            try {
+                endOffsets = admin.endOffsets(assignment);

Review comment:
       I did see a way of reducing the # of returns to 2, and removing one of 
the duplicated lines. Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to