ableegoldman commented on a change in pull request #11857: URL: https://github.com/apache/kafka/pull/11857#discussion_r820595568
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -1129,13 +1129,25 @@ public void updateTaskEndMetadata(final TopicPartition topicPartition, final Lon * added NamedTopology and create them if so, then close any tasks whose named topology no longer exists */ void handleTopologyUpdates() { - tasks.maybeCreateTasksFromNewTopologies(); + final Set<String> currentNamedTopologies = topologyMetadata.updateThreadTopologyVersion(Thread.currentThread().getName()); Review comment: This isn't the main fix, but we were playing a little fast and loose with the topology version we were reporting having ack'ed -- tightened this up by first atomically updating the topology version and saving the set of current named topologies, then doing the actual update handling, and _then_ checking the listeners and completing any finished add/remove topology requests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org