[GitHub] [kafka] guozhangwang commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-05 Thread GitBox


guozhangwang commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r683938654



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
##
@@ -67,13 +72,31 @@
 public class NamedTopologyIntegrationTest {
 public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
 
+// TODO KAFKA-12648:

Review comment:
   SG.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-05 Thread GitBox


guozhangwang commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r683938587



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -53,44 +56,162 @@
 private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
 
 private final StreamsConfig config;
-private final SortedMap builders; // Keep 
sorted by topology name for readability
+private final TopologyVersion version;
+
+private final ConcurrentNavigableMap 
builders; // Keep sorted by topology name for readability

Review comment:
   > I was referring to moving the `builders` map to the `TopologyVersion` 
in the above, ie I want to save that for Pt. 4 if that's ok
   
   Yup, totally.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-05 Thread GitBox


guozhangwang commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r683823753



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -53,44 +56,162 @@
 private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
 
 private final StreamsConfig config;
-private final SortedMap builders; // Keep 
sorted by topology name for readability
+private final TopologyVersion version;
+
+private final ConcurrentNavigableMap 
builders; // Keep sorted by topology name for readability

Review comment:
   SGTM.
   
   What about the other comment, i.e. moving the `Map builders` into the `TopologyVersion` itself? Besides 
the constructors, the only modifiers to `builders` seem to be 
`register/deregister`, in which we would always try to `getAndIncrement` 
version. So what about consolidating the modification of builders along with 
version bump, and hence we would not need to use a `ConcurrentNavigableMap`?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -1193,6 +1189,18 @@ public void updateTaskEndMetadata(final TopicPartition 
topicPartition, final Lon
 }
 }
 
+/**
+ * Checks for added or removed NamedTopologies that correspond to any 
assigned tasks, and creates/freezes them if so
+ */
+void handleTopologyUpdates() {
+tasks.maybeCreateTasksFromNewTopologies();
+for (final Task task : activeTaskIterable()) {
+if 
(topologyMetadata.namedTopologiesView().contains(task.id().namedTopology())) {
+task.freezeProcessing();

Review comment:
   Just another thought: if this is just for the very short temporary phase 
between when the topology is removed to when the rebalance is finally triggered 
to remove the tasks (which should usually be in the next poll), could we just 
call `task.suspend` instead of adding the new `freeze` logic? When we finally 
close the tasks we can still transit from suspended to closed?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -719,6 +723,8 @@ void runOnce() {
 
 final long pollLatency = pollPhase();
 
+topologyMetadata.maybeWaitForNonEmptyTopology(() -> state);

Review comment:
   How about moving this ahead of `pollPhase()`? We are likely to be kicked 
out of the group while blocked waiting here, so it's better to be aware of that 
and re-join the group immediately, rather than doing the restore/etc still 
which may be all wasted work.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -867,7 +873,25 @@ private void initializeAndRestorePhase() {
 log.debug("Idempotent restore call done. Thread state has not 
changed.");
 }
 
+// Check if the topology has been updated since we last checked, ie via 
#addNamedTopology or #removeNamedTopology
+private void checkForTopologyUpdates() {
+if (lastSeenTopologyVersion < topologyMetadata.topologyVersion()) {
+lastSeenTopologyVersion = topologyMetadata.topologyVersion();
+taskManager.handleTopologyUpdates();
+
+log.info("StreamThread has detected an update to the topology, 
triggering a rebalance to refresh the assignment");

Review comment:
   very nit: just add a TODO that we can improve this case to not always 
enforce rebalance when version bumped, in case we forgot in future PRs?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-05 Thread GitBox


guozhangwang commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r682861544



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
##
@@ -64,6 +65,9 @@
 private final Map taskProducers;
 private final StreamThread.ProcessingMode processingMode;
 
+// tasks may be assigned for a NamedTopology that is not yet known by this 
host, and saved for later creation
+private final Map>  unknownTasksToBeCreated = 
new HashMap<>();

Review comment:
   Yeah I left this comment before syncing with @ableegoldman offline, so 
it's better to clarify for others reading this PR :)

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##
@@ -172,7 +172,7 @@ public void removeNamedTopology(final String 
topologyToRemove) {
  */
 public void cleanUpNamedTopology(final String name) {
 if (getTopologyByName(name).isPresent()) {
-throw new IllegalStateException("Can't clean up local state for an 
active NamedTopology");
+throw new IllegalStateException("Can't clean up local state for an 
active NamedTopology: ");

Review comment:
   The topology was not included? :)

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -1193,6 +1189,18 @@ public void updateTaskEndMetadata(final TopicPartition 
topicPartition, final Lon
 }
 }
 
+/**
+ * Checks for added or removed NamedTopologies that correspond to any 
assigned tasks, and creates/freezes them if so
+ */
+void handleTopologyUpdates() {
+tasks.maybeCreateTasksFromNewTopologies();
+for (final Task task : activeTaskIterable()) {
+if 
(topologyMetadata.namedTopologiesView().contains(task.id().namedTopology())) {
+task.freezeProcessing();

Review comment:
   Not sure I understand this logic.. could you explain a bit? Also there's 
no unfrozen function so once a task is frozen it would stick with that state 
forever?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
##
@@ -84,6 +86,15 @@ void setMainConsumer(final Consumer 
mainConsumer) {
 this.mainConsumer = mainConsumer;
 }
 
+void handleNewAssignmentAndCreateTasks(final Map> activeTasksToCreate,
+   final Map> standbyTasksToCreate,
+   final Set 
assignedActiveTasks,
+   final Set 
assignedStandbyTasks) {
+activeTaskCreator.removeRevokedUnknownTasks(diff(HashSet::new, 
assignedActiveTasks, activeTasksToCreate.keySet()));

Review comment:
   The diff between `assignedActiveTasks` and `activeTasksToCreate` are 
those tasks that are already owned and hence do not need to be assigned, am I 
reading that right (ditto for standbys)? Why these should be considered as 
remove revoked?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-04 Thread GitBox


guozhangwang commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r682871868



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##
@@ -172,7 +172,7 @@ public void removeNamedTopology(final String 
topologyToRemove) {
  */
 public void cleanUpNamedTopology(final String name) {
 if (getTopologyByName(name).isPresent()) {
-throw new IllegalStateException("Can't clean up local state for an 
active NamedTopology");
+throw new IllegalStateException("Can't clean up local state for an 
active NamedTopology: ");

Review comment:
   The topology was not included? :)

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -1193,6 +1189,18 @@ public void updateTaskEndMetadata(final TopicPartition 
topicPartition, final Lon
 }
 }
 
+/**
+ * Checks for added or removed NamedTopologies that correspond to any 
assigned tasks, and creates/freezes them if so
+ */
+void handleTopologyUpdates() {
+tasks.maybeCreateTasksFromNewTopologies();
+for (final Task task : activeTaskIterable()) {
+if 
(topologyMetadata.namedTopologiesView().contains(task.id().namedTopology())) {
+task.freezeProcessing();

Review comment:
   Not sure I understand this logic.. could you explain a bit? Also there's 
no unfrozen function so once a task is frozen it would stick with that state 
forever?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
##
@@ -84,6 +86,15 @@ void setMainConsumer(final Consumer 
mainConsumer) {
 this.mainConsumer = mainConsumer;
 }
 
+void handleNewAssignmentAndCreateTasks(final Map> activeTasksToCreate,
+   final Map> standbyTasksToCreate,
+   final Set 
assignedActiveTasks,
+   final Set 
assignedStandbyTasks) {
+activeTaskCreator.removeRevokedUnknownTasks(diff(HashSet::new, 
assignedActiveTasks, activeTasksToCreate.keySet()));

Review comment:
   The diff between `assignedActiveTasks` and `activeTasksToCreate` are 
those tasks that are already owned and hence do not need to be assigned, am I 
reading that right (ditto for standbys)? Why these should be considered as 
remove revoked?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-04 Thread GitBox


guozhangwang commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r682861544



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
##
@@ -64,6 +65,9 @@
 private final Map taskProducers;
 private final StreamThread.ProcessingMode processingMode;
 
+// tasks may be assigned for a NamedTopology that is not yet known by this 
host, and saved for later creation
+private final Map>  unknownTasksToBeCreated = 
new HashMap<>();

Review comment:
   Yeah I left this comment before syncing with @ableegoldman offline, so 
it's better to clarify for others reading this PR :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-03 Thread GitBox


guozhangwang commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r682090255



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
##
@@ -66,14 +72,26 @@
 );
 }
 
+Map> uncreatedTasksForTopologies(final 
Set currentTopologies) {
+return unknownTasksToBeCreated.entrySet().stream().filter(t -> 
currentTopologies.contains(t.getKey().namedTopology())).collect(Collectors.toMap(Entry::getKey,
 Entry::getValue));

Review comment:
   Ditto here.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -53,44 +56,162 @@
 private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
 
 private final StreamsConfig config;
-private final SortedMap builders; // Keep 
sorted by topology name for readability
+private final TopologyVersion version;
+
+private final ConcurrentNavigableMap 
builders; // Keep sorted by topology name for readability
 
 private ProcessorTopology globalTopology;
-private Map globalStateStores = new HashMap<>();
-final Set allInputTopics = new HashSet<>();
+private final Map globalStateStores = new HashMap<>();
+private final Set allInputTopics = new HashSet<>();
+
+public static class TopologyVersion {
+public AtomicLong topologyVersion = new AtomicLong(0L); // the local 
topology version
+public Set assignedNamedTopologies = new HashSet<>(); // the 
named topologies whose tasks are actively assigned
+public ReentrantLock topologyLock = new ReentrantLock();
+public Condition topologyCV = topologyLock.newCondition();
+}
 
-public TopologyMetadata(final InternalTopologyBuilder builder, final 
StreamsConfig config) {
+public TopologyMetadata(final InternalTopologyBuilder builder,
+final StreamsConfig config) {
+version = new TopologyVersion();
 this.config = config;
-builders = new TreeMap<>();
+builders = new ConcurrentSkipListMap<>();
 if (builder.hasNamedTopology()) {
 builders.put(builder.topologyName(), builder);
 } else {
 builders.put(UNNAMED_TOPOLOGY, builder);
 }
 }
 
-public TopologyMetadata(final SortedMap 
builders, final StreamsConfig config) {
+public TopologyMetadata(final ConcurrentNavigableMap builders,
+final StreamsConfig config) {
+version = new TopologyVersion();
 this.config = config;
+
 this.builders = builders;
 if (builders.isEmpty()) {
-log.debug("Building KafkaStreams app with no empty topology");
+log.debug("Starting up empty KafkaStreams app with no topology");
 }
 }
 
-public int getNumStreamThreads(final StreamsConfig config) {
-final int configuredNumStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+public void updateCurrentAssignmentTopology(final Set 
assignedNamedTopologies) {
+version.assignedNamedTopologies = assignedNamedTopologies;
+}
 
-// If the application uses named topologies, it's possible to start up 
with no topologies at all and only add them later
-if (builders.isEmpty()) {
-if (configuredNumStreamThreads != 0) {
-log.info("Overriding number of StreamThreads to zero for empty 
topology");
+/**
+ * @return the set of named topologies that the assignor distributed tasks 
for during the last rebalance
+ */
+public Set assignmentNamedTopologies() {
+return version.assignedNamedTopologies;
+}
+
+public long topologyVersion() {
+return version.topologyVersion.get();
+}
+
+public void lock() {
+version.topologyLock.lock();
+}
+
+public void unlock() {
+version.topologyLock.unlock();
+}
+
+public InternalTopologyBuilder getBuilderForTopologyName(final String 
name) {
+return builders.get(name);
+}
+
+/**
+ * @throws IllegalStateException if the thread is not already holding the 
lock via TopologyMetadata#lock
+ */
+public void maybeWaitForNonEmptyTopology() {
+if (!version.topologyLock.isHeldByCurrentThread()) {

Review comment:
   I feel a bit concerned about the "asymmetry" of this function: all other 
functions have the lock inside while this function is supposed to be called by 
a caller -- i.e. `handleTopologyUpdatesPhase`. It is quite vulnerable to bugs 
with additional edits.
   
   I'm wondering if we can move this logic out of `handleTopologyUpdatesPhase` 
instead: i.e. we first update the named topology, and then based on the new 
version we can either wait or re-subscribe and trigger rebalance. WDYT?
   
   

##
File path: