ableegoldman commented on a change in pull request #8787: URL: https://github.com/apache/kafka/pull/8787#discussion_r438495057
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ########## @@ -763,18 +778,36 @@ private boolean populateClientStatesMap(final Map<UUID, ClientState> clientState .flatMap(Collection::stream) .collect(Collectors.toList()); - final Collection<TopicPartition> allPreexistingChangelogPartitions = new ArrayList<>(allChangelogPartitions); - allPreexistingChangelogPartitions.removeIf(partition -> newlyCreatedChangelogs.contains(partition.topic())); + final Set<TopicPartition> preexistingChangelogPartitions = new HashSet<>(); + final Set<TopicPartition> preexistingSourceChangelogPartitions = new HashSet<>(); + final Set<TopicPartition> newlyCreatedChangelogPartitions = new HashSet<>(); + for (final TopicPartition changelog : allChangelogPartitions) { + if (newlyCreatedChangelogs.contains(changelog.topic())) { + newlyCreatedChangelogPartitions.add(changelog); + } else if (optimizedSourceChangelogs.contains(changelog.topic())) { + preexistingSourceChangelogPartitions.add(changelog); + } else { + preexistingChangelogPartitions.add(changelog); + } + } + + // Make the listOffsets request first so it can fetch the offsets for non-source changelogs + // asynchronously while we use the blocking Consumer#committed call to fetch source-changelog offsets + final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> endOffsetsFuture = + fetchEndOffsetsFuture(preexistingChangelogPartitions, adminClient); - final Collection<TopicPartition> allNewlyCreatedChangelogPartitions = new ArrayList<>(allChangelogPartitions); - allNewlyCreatedChangelogPartitions.removeAll(allPreexistingChangelogPartitions); + final Map<TopicPartition, Long> sourceChangelogEndOffsets = + fetchCommittedOffsets(preexistingSourceChangelogPartitions, taskManager.mainConsumer()); - final Map<TopicPartition, ListOffsetsResultInfo> endOffsets = - fetchEndOffsets(allPreexistingChangelogPartitions, adminClient); + final Map<TopicPartition, ListOffsetsResultInfo> endOffsets = ClientUtils.getEndOffsets(endOffsetsFuture); - allTaskEndOffsetSums = computeEndOffsetSumsByTask(endOffsets, changelogsByStatefulTask, allNewlyCreatedChangelogPartitions); + allTaskEndOffsetSums = computeEndOffsetSumsByTask( + changelogsByStatefulTask, + endOffsets, + sourceChangelogEndOffsets, + newlyCreatedChangelogPartitions); fetchEndOffsetsSuccessful = true; - } catch (final StreamsException e) { + } catch (final StreamsException | TimeoutException e) { Review comment: > if you throw an exception in the assignor, it just calls the assignor again in a tight loop Wouldn't the leader thread just die? Not saying that that's ideal, either. But it's at least in line with how exceptions thrown by other admin client requests in the assignment are currently handled. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org