ableegoldman commented on a change in pull request #11857:
URL: https://github.com/apache/kafka/pull/11857#discussion_r820595568



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1129,13 +1129,25 @@ public void updateTaskEndMetadata(final TopicPartition 
topicPartition, final Lon
      * added NamedTopology and create them if so, then close any tasks whose 
named topology no longer exists
      */
     void handleTopologyUpdates() {
-        tasks.maybeCreateTasksFromNewTopologies();
+        final Set<String> currentNamedTopologies = 
topologyMetadata.updateThreadTopologyVersion(Thread.currentThread().getName());

Review comment:
       This isn't the main fix, but we were playing a little fast and loose 
with the topology version we were reporting having ack'ed -- tightened this up 
by first atomically updating the topology version and saving the set of current 
named topologies, then doing the actual update handling, and _then_ checking 
the listeners and completing any finished add/remove topology requests




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to