kkonstantine commented on a change in pull request #10158:
URL: https://github.com/apache/kafka/pull/10158#discussion_r578998452



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -366,6 +346,42 @@ private void readToLogEnd() {
         }
     }
 
+    // Visible for testing
+    Map<TopicPartition, Long> readEndOffsets(Set<TopicPartition> assignment) {
+        log.trace("Reading to end of offset log");
+
+        Map<TopicPartition, Long> endOffsets;
+        // Note that we'd prefer to not use the consumer to find the end 
offsets for the assigned topic partitions.
+        // That is because it's possible that the consumer is already blocked 
waiting for new records to appear, when
+        // the consumer is already at the end. In such cases, using 
'consumer.endOffsets(...)' will block until at least
+        // one more record becomes available, meaning we can't even check 
whether we're at the end offset.
+        // Since all we're trying to do here is get the end offset, we should 
use the supplied admin client
+        // (if available)
+        // (which prevents 'consumer.endOffsets(...)'
+        // from
+
+        // Deprecated constructors do not provide an admin supplier, so the 
admin is potentially null.
+        if (useAdminForListOffsets) {
+            // Use the admin client to immediately find the end offsets for 
the assigned topic partitions.
+            // Unlike using the consumer
+            try {
+                return admin.endOffsets(assignment);
+            } catch (UnsupportedVersionException e) {
+                // This may happen with really old brokers that don't support 
the auto topic creation
+                // field in metadata requests
+                log.debug("Reading to end of log offsets with consumer since 
admin client is unsupported: {}", e.getMessage());
+                useAdminForListOffsets = false;

Review comment:
       Using `admin = null` here allows to GC the unused admin instance 
earlier, right?
   Not a big gain, but also I don't see much benefit by using a variable such 
as `useAdminForListOffsets`

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -161,6 +164,7 @@ public void start() {
 
         // Create the topic admin client and initialize the topic ...
         admin = topicAdminSupplier.get();   // may be null
+        useAdminForListOffsets = admin != null;

Review comment:
       It's not obvious to me why we have to wait to call 
`topicAdminSupplier.get();` in `start`. 
   
   I think we can call it in the constructor and not save the field 
`topicAdminSupplier` but keep only `admin` as a member field. Unfortunately it 
can't be declared `final` because it may be reset (set to `null`) in the 
exception handling below but still, using fewer member fields seems reasonable 
and does not reduce the usability in this class IMO. 

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -366,6 +346,42 @@ private void readToLogEnd() {
         }
     }
 
+    // Visible for testing
+    Map<TopicPartition, Long> readEndOffsets(Set<TopicPartition> assignment) {
+        log.trace("Reading to end of offset log");
+
+        Map<TopicPartition, Long> endOffsets;
+        // Note that we'd prefer to not use the consumer to find the end 
offsets for the assigned topic partitions.
+        // That is because it's possible that the consumer is already blocked 
waiting for new records to appear, when
+        // the consumer is already at the end. In such cases, using 
'consumer.endOffsets(...)' will block until at least
+        // one more record becomes available, meaning we can't even check 
whether we're at the end offset.
+        // Since all we're trying to do here is get the end offset, we should 
use the supplied admin client
+        // (if available)
+        // (which prevents 'consumer.endOffsets(...)'
+        // from

Review comment:
       Since we are moving this comment, probably it's a good idea to fix this 
incomplete sentence here too
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to