guozhangwang commented on code in PR #13318: URL: https://github.com/apache/kafka/pull/13318#discussion_r1123632054
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java: ########## @@ -143,6 +145,33 @@ public static KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> fetchEndOf ).all(); } + public static ListOffsetsResult fetchEndOffsetsResult(final Collection<TopicPartition> partitions, Review Comment: This is 2) in the description. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java: ########## @@ -685,14 +685,17 @@ private boolean populateClientStatesMap(final Map<UUID, ClientState> clientState Map<TaskId, Long> allTaskEndOffsetSums; try { // Make the listOffsets request first so it can fetch the offsets for non-source changelogs - // asynchronously while we use the blocking Consumer#committed call to fetch source-changelog offsets - final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> endOffsetsFuture = - fetchEndOffsetsFuture(changelogTopics.preExistingNonSourceTopicBasedPartitions(), adminClient); + // asynchronously while we use the blocking Consumer#committed call to fetch source-changelog offsets; Review Comment: This is 2) in the description. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org