ableegoldman commented on a change in pull request #10788: URL: https://github.com/apache/kafka/pull/10788#discussion_r683055067
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java ########## @@ -84,6 +86,15 @@ void setMainConsumer(final Consumer<byte[], byte[]> mainConsumer) { this.mainConsumer = mainConsumer; } + void handleNewAssignmentAndCreateTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate, + final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate, + final Set<TaskId> assignedActiveTasks, + final Set<TaskId> assignedStandbyTasks) { + activeTaskCreator.removeRevokedUnknownTasks(diff(HashSet::new, assignedActiveTasks, activeTasksToCreate.keySet())); Review comment: Whoops, that's what happens when you push changes late at night 🙃 I'll fix it up...hopefully will make more sense then ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -1193,6 +1189,18 @@ public void updateTaskEndMetadata(final TopicPartition topicPartition, final Lon } } + /** + * Checks for added or removed NamedTopologies that correspond to any assigned tasks, and creates/freezes them if so + */ + void handleTopologyUpdates() { + tasks.maybeCreateTasksFromNewTopologies(); + for (final Task task : activeTaskIterable()) { + if (topologyMetadata.namedTopologiesView().contains(task.id().namedTopology())) { + task.freezeProcessing(); Review comment: This is another stupid bug...it should be freezing them if they _don't _ appear in the current topology. Tasks from removed named topologies are frozen to prevent them from processing any more records, so yes, it is meant to be permanent. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ########## @@ -53,44 +56,162 @@ private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile(""); private final StreamsConfig config; - private final SortedMap<String, InternalTopologyBuilder> builders; // Keep sorted by topology name for readability + private final TopologyVersion version; + + private final ConcurrentNavigableMap<String, InternalTopologyBuilder> builders; // Keep sorted by topology name for readability private ProcessorTopology globalTopology; - private Map<String, StateStore> globalStateStores = new HashMap<>(); - final Set<String> allInputTopics = new HashSet<>(); + private final Map<String, StateStore> globalStateStores = new HashMap<>(); + private final Set<String> allInputTopics = new HashSet<>(); + + public static class TopologyVersion { + public AtomicLong topologyVersion = new AtomicLong(0L); // the local topology version + public Set<String> assignedNamedTopologies = new HashSet<>(); // the named topologies whose tasks are actively assigned + public ReentrantLock topologyLock = new ReentrantLock(); + public Condition topologyCV = topologyLock.newCondition(); + } - public TopologyMetadata(final InternalTopologyBuilder builder, final StreamsConfig config) { + public TopologyMetadata(final InternalTopologyBuilder builder, + final StreamsConfig config) { + version = new TopologyVersion(); this.config = config; - builders = new TreeMap<>(); + builders = new ConcurrentSkipListMap<>(); if (builder.hasNamedTopology()) { builders.put(builder.topologyName(), builder); } else { builders.put(UNNAMED_TOPOLOGY, builder); } } - public TopologyMetadata(final SortedMap<String, InternalTopologyBuilder> builders, final StreamsConfig config) { + public TopologyMetadata(final ConcurrentNavigableMap<String, InternalTopologyBuilder> builders, + final StreamsConfig config) { + version = new TopologyVersion(); this.config = config; + this.builders = builders; if (builders.isEmpty()) { - log.debug("Building KafkaStreams app with no empty topology"); + log.debug("Starting up empty KafkaStreams app with no topology"); } } - public int getNumStreamThreads(final StreamsConfig config) { - final int configuredNumStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG); + public void updateCurrentAssignmentTopology(final Set<String> assignedNamedTopologies) { + version.assignedNamedTopologies = assignedNamedTopologies; + } - // If the application uses named topologies, it's possible to start up with no topologies at all and only add them later - if (builders.isEmpty()) { - if (configuredNumStreamThreads != 0) { - log.info("Overriding number of StreamThreads to zero for empty topology"); + /** + * @return the set of named topologies that the assignor distributed tasks for during the last rebalance + */ + public Set<String> assignmentNamedTopologies() { + return version.assignedNamedTopologies; + } + + public long topologyVersion() { + return version.topologyVersion.get(); + } + + public void lock() { + version.topologyLock.lock(); + } + + public void unlock() { + version.topologyLock.unlock(); + } + + public InternalTopologyBuilder getBuilderForTopologyName(final String name) { + return builders.get(name); + } + + /** + * @throws IllegalStateException if the thread is not already holding the lock via TopologyMetadata#lock + */ + public void maybeWaitForNonEmptyTopology() { + if (!version.topologyLock.isHeldByCurrentThread()) { Review comment: Agreed, let me clean this up ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ########## @@ -867,6 +873,36 @@ private void initializeAndRestorePhase() { log.debug("Idempotent restore call done. Thread state has not changed."); } + private void handleTopologyUpdatesPhase() { + // Check if the topology has been updated since we last checked, ie via #addNamedTopology or #removeNamedTopology + // or if this is the very first topology in which case we may need to wait for it to be non-empty + if (lastSeenTopologyVersion < topologyMetadata.topologyVersion() || lastSeenTopologyVersion == 0) { + try { Review comment: I refactored this part of the code quite a bit, let me know if you have any remaining concerns with the new logic ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java ########## @@ -67,13 +72,31 @@ public class NamedTopologyIntegrationTest { public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + // TODO KAFKA-12648: Review comment: I'm still filling out the integration test suite, especially the multi-node testing, but I'll make sure this scenario has coverage. This will probably have to be in the followup Pt. 4 which expands `add/removeNamedTopology` to return a `Future`, since being able to block on this helps a lot with the testing. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ########## @@ -53,44 +56,162 @@ private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile(""); private final StreamsConfig config; - private final SortedMap<String, InternalTopologyBuilder> builders; // Keep sorted by topology name for readability + private final TopologyVersion version; + + private final ConcurrentNavigableMap<String, InternalTopologyBuilder> builders; // Keep sorted by topology name for readability Review comment: Well I removed the whole `assignedNamedTopologies` thing from this PR, but yes, that was one of the original intentions of adding it to the AssignmentInfo. I'm leaning towards keeping things simple for now and avoid adding on to the AssignmentInfo where possible. We can revisit this if the extra rebalances become a problem. It could be sufficient to just use the set of actually-assigned topologies as a proxy. It's not perfect since it could be that the leader was aware of a topology and simply happened to not assign any of its tasks to this client, but I'd bet this gets rid of most of the unnecessary rebalances. That said, I'm saving this optimization for a followup Pt. 4 PR. I'll revisit the best way to handle the `builders` map in that as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org