chia7712 commented on a change in pull request #10158: URL: https://github.com/apache/kafka/pull/10158#discussion_r578907382
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ########## @@ -366,6 +348,44 @@ private void readToLogEnd() { } } + // Visible for testing + Map<TopicPartition, Long> readEndOffsets(Set<TopicPartition> assignment) { + log.trace("Reading to end of offset log"); + + Map<TopicPartition, Long> endOffsets; + // Note that we'd prefer to not use the consumer to find the end offsets for the assigned topic partitions. + // That is because it's possible that the consumer is already blocked waiting for new records to appear, when + // the consumer is already at the end. In such cases, using 'consumer.endOffsets(...)' will block until at least + // one more record becomes available, meaning we can't even check whether we're at the end offset. + // Since all we're trying to do here is get the end offset, we should use the supplied admin client + // (if available) + // (which prevents 'consumer.endOffsets(...)' + // from + + // Deprecated constructors do not provide an admin supplier, so the admin is potentially null. + if (useAdminForListOffsets) { + // Use the admin client to immediately find the end offsets for the assigned topic partitions. + // Unlike using the consumer + try { + endOffsets = admin.endOffsets(assignment); Review comment: How about `return admin.endOffsets(assignment);` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ########## @@ -390,7 +410,11 @@ public void run() { log.trace("Finished read to end log for topic {}", topic); } catch (TimeoutException e) { log.warn("Timeout while reading log to end for topic '{}'. Retrying automatically. " + - "This may occur when brokers are unavailable or unreachable. Reason: {}", topic, e.getMessage()); + "This may occur when brokers are unavailable or unreachable. Reason: {}", topic, e.getMessage()); + continue; + } catch (RetriableException | org.apache.kafka.connect.errors.RetriableException e) { Review comment: `readToLogEnd` is called by `start`. Should we add similar exception handle for that? ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ########## @@ -321,29 +325,7 @@ private void readToLogEnd() { log.trace("Reading to end of offset log"); Review comment: redundant log message. `readEndOffsets(Set<TopicPartition>)` has similar log. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java ########## @@ -651,6 +651,10 @@ public Config describeTopicConfig(String topic) { * @param partitions the topic partitions * @return the map of offset for each topic partition, or an empty map if the supplied partitions * are null or empty + * @throws UnsupportedVersionException if the admin client cannot read end offsets + * @throws TimeoutException if the offset metadata could not be fetched before the amount of time allocated + * by {@code request.timeout.ms} expires, and this call can be retried + * @throws LeaderNotAvailableException if the leader was not available and this call can be retried * @throws RetriableException if a retriable error occurs, the operation takes too long, or the Review comment: `the operation takes too long` ^^^^^^^^^^^^^^^^^^^^^^^ this message gets invalid since `TimeoutException` is not wrapped to `RetriableException` anymore. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org