ableegoldman commented on a change in pull request #8787:
URL: https://github.com/apache/kafka/pull/8787#discussion_r438495057



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -763,18 +778,36 @@ private boolean populateClientStatesMap(final Map<UUID, 
ClientState> clientState
                     .flatMap(Collection::stream)
                     .collect(Collectors.toList());
 
-            final Collection<TopicPartition> allPreexistingChangelogPartitions 
= new ArrayList<>(allChangelogPartitions);
-            allPreexistingChangelogPartitions.removeIf(partition -> 
newlyCreatedChangelogs.contains(partition.topic()));
+            final Set<TopicPartition> preexistingChangelogPartitions = new 
HashSet<>();
+            final Set<TopicPartition> preexistingSourceChangelogPartitions = 
new HashSet<>();
+            final Set<TopicPartition> newlyCreatedChangelogPartitions = new 
HashSet<>();
+            for (final TopicPartition changelog : allChangelogPartitions) {
+                if (newlyCreatedChangelogs.contains(changelog.topic())) {
+                    newlyCreatedChangelogPartitions.add(changelog);
+                } else if 
(optimizedSourceChangelogs.contains(changelog.topic())) {
+                    preexistingSourceChangelogPartitions.add(changelog);
+                } else {
+                    preexistingChangelogPartitions.add(changelog);
+                }
+            }
+
+            // Make the listOffsets request first so it can  fetch the offsets 
for non-source changelogs
+            // asynchronously while we use the blocking Consumer#committed 
call to fetch source-changelog offsets
+            final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> 
endOffsetsFuture =
+                fetchEndOffsetsFuture(preexistingChangelogPartitions, 
adminClient);
 
-            final Collection<TopicPartition> 
allNewlyCreatedChangelogPartitions = new ArrayList<>(allChangelogPartitions);
-            
allNewlyCreatedChangelogPartitions.removeAll(allPreexistingChangelogPartitions);
+            final Map<TopicPartition, Long> sourceChangelogEndOffsets =
+                fetchCommittedOffsets(preexistingSourceChangelogPartitions, 
taskManager.mainConsumer());
 
-            final Map<TopicPartition, ListOffsetsResultInfo> endOffsets =
-                fetchEndOffsets(allPreexistingChangelogPartitions, 
adminClient);
+            final Map<TopicPartition, ListOffsetsResultInfo> endOffsets = 
ClientUtils.getEndOffsets(endOffsetsFuture);
 
-            allTaskEndOffsetSums = computeEndOffsetSumsByTask(endOffsets, 
changelogsByStatefulTask, allNewlyCreatedChangelogPartitions);
+            allTaskEndOffsetSums = computeEndOffsetSumsByTask(
+                changelogsByStatefulTask,
+                endOffsets,
+                sourceChangelogEndOffsets,
+                newlyCreatedChangelogPartitions);
             fetchEndOffsetsSuccessful = true;
-        } catch (final StreamsException e) {
+        } catch (final StreamsException | TimeoutException e) {

Review comment:
       > if you throw an exception in the assignor, it just calls the assignor 
again in a tight loop
   
   Wouldn't the leader thread just die? Not saying that that's ideal, either. 
But it's at least in line with how exceptions thrown by other admin client 
requests in the assignment are currently handled.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to