[GitHub] [kafka] guozhangwang commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API
guozhangwang commented on a change in pull request #10788: URL: https://github.com/apache/kafka/pull/10788#discussion_r683938654 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java ## @@ -67,13 +72,31 @@ public class NamedTopologyIntegrationTest { public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); +// TODO KAFKA-12648: Review comment: SG. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API
guozhangwang commented on a change in pull request #10788: URL: https://github.com/apache/kafka/pull/10788#discussion_r683938587 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ## @@ -53,44 +56,162 @@ private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile(""); private final StreamsConfig config; -private final SortedMap builders; // Keep sorted by topology name for readability +private final TopologyVersion version; + +private final ConcurrentNavigableMap builders; // Keep sorted by topology name for readability Review comment: > I was referring to moving the `builders` map to the `TopologyVersion` in the above, ie I want to save that for Pt. 4 if that's ok Yup, totally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API
guozhangwang commented on a change in pull request #10788: URL: https://github.com/apache/kafka/pull/10788#discussion_r683823753 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ## @@ -53,44 +56,162 @@ private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile(""); private final StreamsConfig config; -private final SortedMap builders; // Keep sorted by topology name for readability +private final TopologyVersion version; + +private final ConcurrentNavigableMap builders; // Keep sorted by topology name for readability Review comment: SGTM. What about the other comment, i.e. moving the `Map builders` into the `TopologyVersion` itself? Besides the constructors, the only modifiers to `builders` seem to be `register/deregister`, in which we would always try to `getAndIncrement` version. So what about consolidating the modification of builders along with version bump, and hence we would not need to use a `ConcurrentNavigableMap`? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -1193,6 +1189,18 @@ public void updateTaskEndMetadata(final TopicPartition topicPartition, final Lon } } +/** + * Checks for added or removed NamedTopologies that correspond to any assigned tasks, and creates/freezes them if so + */ +void handleTopologyUpdates() { +tasks.maybeCreateTasksFromNewTopologies(); +for (final Task task : activeTaskIterable()) { +if (topologyMetadata.namedTopologiesView().contains(task.id().namedTopology())) { +task.freezeProcessing(); Review comment: Just another thought: if this is just for the very short temporary phase between when the topology is removed to when the rebalance is finally triggered to remove the tasks (which should usually be in the next poll), could we just call `task.suspend` instead of adding the new `freeze` logic? When we finally close the tasks we can still transit from suspended to closed? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -719,6 +723,8 @@ void runOnce() { final long pollLatency = pollPhase(); +topologyMetadata.maybeWaitForNonEmptyTopology(() -> state); Review comment: How about moving this ahead of `pollPhase()`? We are likely to be kicked out of the group while blocked waiting here, so it's better to be aware of that and re-join the group immediately, rather than doing the restore/etc still which may be all wasted work. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -867,7 +873,25 @@ private void initializeAndRestorePhase() { log.debug("Idempotent restore call done. Thread state has not changed."); } +// Check if the topology has been updated since we last checked, ie via #addNamedTopology or #removeNamedTopology +private void checkForTopologyUpdates() { +if (lastSeenTopologyVersion < topologyMetadata.topologyVersion()) { +lastSeenTopologyVersion = topologyMetadata.topologyVersion(); +taskManager.handleTopologyUpdates(); + +log.info("StreamThread has detected an update to the topology, triggering a rebalance to refresh the assignment"); Review comment: very nit: just add a TODO that we can improve this case to not always enforce rebalance when version bumped, in case we forgot in future PRs? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API
guozhangwang commented on a change in pull request #10788: URL: https://github.com/apache/kafka/pull/10788#discussion_r682861544 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java ## @@ -64,6 +65,9 @@ private final Map taskProducers; private final StreamThread.ProcessingMode processingMode; +// tasks may be assigned for a NamedTopology that is not yet known by this host, and saved for later creation +private final Map> unknownTasksToBeCreated = new HashMap<>(); Review comment: Yeah I left this comment before syncing with @ableegoldman offline, so it's better to clarify for others reading this PR :) ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java ## @@ -172,7 +172,7 @@ public void removeNamedTopology(final String topologyToRemove) { */ public void cleanUpNamedTopology(final String name) { if (getTopologyByName(name).isPresent()) { -throw new IllegalStateException("Can't clean up local state for an active NamedTopology"); +throw new IllegalStateException("Can't clean up local state for an active NamedTopology: "); Review comment: The topology was not included? :) ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -1193,6 +1189,18 @@ public void updateTaskEndMetadata(final TopicPartition topicPartition, final Lon } } +/** + * Checks for added or removed NamedTopologies that correspond to any assigned tasks, and creates/freezes them if so + */ +void handleTopologyUpdates() { +tasks.maybeCreateTasksFromNewTopologies(); +for (final Task task : activeTaskIterable()) { +if (topologyMetadata.namedTopologiesView().contains(task.id().namedTopology())) { +task.freezeProcessing(); Review comment: Not sure I understand this logic.. could you explain a bit? Also there's no unfrozen function so once a task is frozen it would stick with that state forever? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java ## @@ -84,6 +86,15 @@ void setMainConsumer(final Consumer mainConsumer) { this.mainConsumer = mainConsumer; } +void handleNewAssignmentAndCreateTasks(final Map> activeTasksToCreate, + final Map> standbyTasksToCreate, + final Set assignedActiveTasks, + final Set assignedStandbyTasks) { +activeTaskCreator.removeRevokedUnknownTasks(diff(HashSet::new, assignedActiveTasks, activeTasksToCreate.keySet())); Review comment: The diff between `assignedActiveTasks` and `activeTasksToCreate` are those tasks that are already owned and hence do not need to be assigned, am I reading that right (ditto for standbys)? Why these should be considered as remove revoked? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API
guozhangwang commented on a change in pull request #10788: URL: https://github.com/apache/kafka/pull/10788#discussion_r682871868 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java ## @@ -172,7 +172,7 @@ public void removeNamedTopology(final String topologyToRemove) { */ public void cleanUpNamedTopology(final String name) { if (getTopologyByName(name).isPresent()) { -throw new IllegalStateException("Can't clean up local state for an active NamedTopology"); +throw new IllegalStateException("Can't clean up local state for an active NamedTopology: "); Review comment: The topology was not included? :) ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -1193,6 +1189,18 @@ public void updateTaskEndMetadata(final TopicPartition topicPartition, final Lon } } +/** + * Checks for added or removed NamedTopologies that correspond to any assigned tasks, and creates/freezes them if so + */ +void handleTopologyUpdates() { +tasks.maybeCreateTasksFromNewTopologies(); +for (final Task task : activeTaskIterable()) { +if (topologyMetadata.namedTopologiesView().contains(task.id().namedTopology())) { +task.freezeProcessing(); Review comment: Not sure I understand this logic.. could you explain a bit? Also there's no unfrozen function so once a task is frozen it would stick with that state forever? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java ## @@ -84,6 +86,15 @@ void setMainConsumer(final Consumer mainConsumer) { this.mainConsumer = mainConsumer; } +void handleNewAssignmentAndCreateTasks(final Map> activeTasksToCreate, + final Map> standbyTasksToCreate, + final Set assignedActiveTasks, + final Set assignedStandbyTasks) { +activeTaskCreator.removeRevokedUnknownTasks(diff(HashSet::new, assignedActiveTasks, activeTasksToCreate.keySet())); Review comment: The diff between `assignedActiveTasks` and `activeTasksToCreate` are those tasks that are already owned and hence do not need to be assigned, am I reading that right (ditto for standbys)? Why these should be considered as remove revoked? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API
guozhangwang commented on a change in pull request #10788: URL: https://github.com/apache/kafka/pull/10788#discussion_r682861544 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java ## @@ -64,6 +65,9 @@ private final Map taskProducers; private final StreamThread.ProcessingMode processingMode; +// tasks may be assigned for a NamedTopology that is not yet known by this host, and saved for later creation +private final Map> unknownTasksToBeCreated = new HashMap<>(); Review comment: Yeah I left this comment before syncing with @ableegoldman offline, so it's better to clarify for others reading this PR :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API
guozhangwang commented on a change in pull request #10788: URL: https://github.com/apache/kafka/pull/10788#discussion_r682090255 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java ## @@ -66,14 +72,26 @@ ); } +Map> uncreatedTasksForTopologies(final Set currentTopologies) { +return unknownTasksToBeCreated.entrySet().stream().filter(t -> currentTopologies.contains(t.getKey().namedTopology())).collect(Collectors.toMap(Entry::getKey, Entry::getValue)); Review comment: Ditto here. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ## @@ -53,44 +56,162 @@ private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile(""); private final StreamsConfig config; -private final SortedMap builders; // Keep sorted by topology name for readability +private final TopologyVersion version; + +private final ConcurrentNavigableMap builders; // Keep sorted by topology name for readability private ProcessorTopology globalTopology; -private Map globalStateStores = new HashMap<>(); -final Set allInputTopics = new HashSet<>(); +private final Map globalStateStores = new HashMap<>(); +private final Set allInputTopics = new HashSet<>(); + +public static class TopologyVersion { +public AtomicLong topologyVersion = new AtomicLong(0L); // the local topology version +public Set assignedNamedTopologies = new HashSet<>(); // the named topologies whose tasks are actively assigned +public ReentrantLock topologyLock = new ReentrantLock(); +public Condition topologyCV = topologyLock.newCondition(); +} -public TopologyMetadata(final InternalTopologyBuilder builder, final StreamsConfig config) { +public TopologyMetadata(final InternalTopologyBuilder builder, +final StreamsConfig config) { +version = new TopologyVersion(); this.config = config; -builders = new TreeMap<>(); +builders = new ConcurrentSkipListMap<>(); if (builder.hasNamedTopology()) { builders.put(builder.topologyName(), builder); } else { builders.put(UNNAMED_TOPOLOGY, builder); } } -public TopologyMetadata(final SortedMap builders, final StreamsConfig config) { +public TopologyMetadata(final ConcurrentNavigableMap builders, +final StreamsConfig config) { +version = new TopologyVersion(); this.config = config; + this.builders = builders; if (builders.isEmpty()) { -log.debug("Building KafkaStreams app with no empty topology"); +log.debug("Starting up empty KafkaStreams app with no topology"); } } -public int getNumStreamThreads(final StreamsConfig config) { -final int configuredNumStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG); +public void updateCurrentAssignmentTopology(final Set assignedNamedTopologies) { +version.assignedNamedTopologies = assignedNamedTopologies; +} -// If the application uses named topologies, it's possible to start up with no topologies at all and only add them later -if (builders.isEmpty()) { -if (configuredNumStreamThreads != 0) { -log.info("Overriding number of StreamThreads to zero for empty topology"); +/** + * @return the set of named topologies that the assignor distributed tasks for during the last rebalance + */ +public Set assignmentNamedTopologies() { +return version.assignedNamedTopologies; +} + +public long topologyVersion() { +return version.topologyVersion.get(); +} + +public void lock() { +version.topologyLock.lock(); +} + +public void unlock() { +version.topologyLock.unlock(); +} + +public InternalTopologyBuilder getBuilderForTopologyName(final String name) { +return builders.get(name); +} + +/** + * @throws IllegalStateException if the thread is not already holding the lock via TopologyMetadata#lock + */ +public void maybeWaitForNonEmptyTopology() { +if (!version.topologyLock.isHeldByCurrentThread()) { Review comment: I feel a bit concerned about the "asymmetry" of this function: all other functions have the lock inside while this function is supposed to be called by a caller -- i.e. `handleTopologyUpdatesPhase`. It is quite vulnerable to bugs with additional edits. I'm wondering if we can move this logic out of `handleTopologyUpdatesPhase` instead: i.e. we first update the named topology, and then based on the new version we can either wait or re-subscribe and trigger rebalance. WDYT? ## File path: