chia7712 commented on a change in pull request #10158:
URL: https://github.com/apache/kafka/pull/10158#discussion_r578907382



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -366,6 +348,44 @@ private void readToLogEnd() {
         }
     }
 
+    // Visible for testing
+    Map<TopicPartition, Long> readEndOffsets(Set<TopicPartition> assignment) {
+        log.trace("Reading to end of offset log");
+
+        Map<TopicPartition, Long> endOffsets;
+        // Note that we'd prefer to not use the consumer to find the end 
offsets for the assigned topic partitions.
+        // That is because it's possible that the consumer is already blocked 
waiting for new records to appear, when
+        // the consumer is already at the end. In such cases, using 
'consumer.endOffsets(...)' will block until at least
+        // one more record becomes available, meaning we can't even check 
whether we're at the end offset.
+        // Since all we're trying to do here is get the end offset, we should 
use the supplied admin client
+        // (if available)
+        // (which prevents 'consumer.endOffsets(...)'
+        // from
+
+        // Deprecated constructors do not provide an admin supplier, so the 
admin is potentially null.
+        if (useAdminForListOffsets) {
+            // Use the admin client to immediately find the end offsets for 
the assigned topic partitions.
+            // Unlike using the consumer
+            try {
+                endOffsets = admin.endOffsets(assignment);

Review comment:
       How about `return admin.endOffsets(assignment);`

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -390,7 +410,11 @@ public void run() {
                             log.trace("Finished read to end log for topic {}", 
topic);
                         } catch (TimeoutException e) {
                             log.warn("Timeout while reading log to end for 
topic '{}'. Retrying automatically. " +
-                                "This may occur when brokers are unavailable 
or unreachable. Reason: {}", topic, e.getMessage());
+                                     "This may occur when brokers are 
unavailable or unreachable. Reason: {}", topic, e.getMessage());
+                            continue;
+                        } catch (RetriableException | 
org.apache.kafka.connect.errors.RetriableException e) {

Review comment:
       `readToLogEnd` is called by `start`. Should we add similar exception 
handle for that? 

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -321,29 +325,7 @@ private void readToLogEnd() {
         log.trace("Reading to end of offset log");

Review comment:
       redundant log message. `readEndOffsets(Set<TopicPartition>)` has similar 
log.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##########
@@ -651,6 +651,10 @@ public Config describeTopicConfig(String topic) {
      * @param partitions the topic partitions
      * @return the map of offset for each topic partition, or an empty map if 
the supplied partitions
      *         are null or empty
+     * @throws UnsupportedVersionException if the admin client cannot read end 
offsets
+     * @throws TimeoutException if the offset metadata could not be fetched 
before the amount of time allocated
+     *         by {@code request.timeout.ms} expires, and this call can be 
retried
+     * @throws LeaderNotAvailableException if the leader was not available and 
this call can be retried
      * @throws RetriableException if a retriable error occurs, the operation 
takes too long, or the

Review comment:
       `the operation takes too long` 
   ^^^^^^^^^^^^^^^^^^^^^^^ this message gets invalid since `TimeoutException` 
is not wrapped to `RetriableException` anymore.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to