[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-05 Thread GitBox


ableegoldman commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r683902306



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -517,9 +526,9 @@ void handleRevocation(final Collection 
revokedPartitions) {
 }
 
 if (!remainingRevokedPartitions.isEmpty()) {
-log.warn("The following partitions {} are missing from the task 
partitions. It could potentially " +
+log.debug("The following revoked partitions {} are missing from 
the current task partitions. It could potentially " +

Review comment:
   Making this debug since `warn` seems too intense, and I'm not sure it's 
even worthy of `info` -- also, with named topologies you would expect to see 
this almost every time a topology is removed since the thread will try to close 
those tasks as soon as it notices the topology's removal 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-05 Thread GitBox


ableegoldman commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r683902306



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -517,9 +526,9 @@ void handleRevocation(final Collection 
revokedPartitions) {
 }
 
 if (!remainingRevokedPartitions.isEmpty()) {
-log.warn("The following partitions {} are missing from the task 
partitions. It could potentially " +
+log.debug("The following revoked partitions {} are missing from 
the current task partitions. It could potentially " +

Review comment:
   Making this debug since `warn` seems too intense, and I'm not sure it's 
even worthy of `info` -- also, with named topologies you would expect to see 
this almost every time a topology is removed 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-05 Thread GitBox


ableegoldman commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r683901855



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -775,26 +806,16 @@ private void completeTaskCloseClean(final Task task) {
 void shutdown(final boolean clean) {
 final AtomicReference firstException = new 
AtomicReference<>(null);
 
-final Set tasksToCloseDirty = new HashSet<>();

Review comment:
   No actual changes here, just pulled the cleanup of tasks out into a 
separate new `#closeAndCleanUpTasks` method so we can call that on tasks from 
removed topologies




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-05 Thread GitBox


ableegoldman commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r683874039



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -719,6 +723,8 @@ void runOnce() {
 
 final long pollLatency = pollPhase();
 
+topologyMetadata.maybeWaitForNonEmptyTopology(() -> state);

Review comment:
   Ack (although note that there's no wasted work on the restore phase 
since there's by definition nothing for the thread to do yet as it won't have 
been assigned any new tasks until it polls again). 
   
   I don't think it really matters much where we put this for that reason, 
except for the case in which we start up with no topology -- then it's a waste 
to join the group in the first place, so we may as well wait until we receive 
something to work on. So yes, I'll move it back ahead of poll




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-05 Thread GitBox


ableegoldman commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r683872205



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -53,44 +56,162 @@
 private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
 
 private final StreamsConfig config;
-private final SortedMap builders; // Keep 
sorted by topology name for readability
+private final TopologyVersion version;
+
+private final ConcurrentNavigableMap 
builders; // Keep sorted by topology name for readability

Review comment:
   > I'll revisit the best way to handle the builders map in that as well
   
   I was referring to moving the `builders` map to the `TopologyVersion` in the 
above, ie I want to save that for Pt. 4  if that's ok




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-05 Thread GitBox


ableegoldman commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r683851998



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -1193,6 +1189,18 @@ public void updateTaskEndMetadata(final TopicPartition 
topicPartition, final Lon
 }
 }
 
+/**
+ * Checks for added or removed NamedTopologies that correspond to any 
assigned tasks, and creates/freezes them if so
+ */
+void handleTopologyUpdates() {
+tasks.maybeCreateTasksFromNewTopologies();
+for (final Task task : activeTaskIterable()) {
+if 
(topologyMetadata.namedTopologiesView().contains(task.id().namedTopology())) {
+task.freezeProcessing();

Review comment:
   Well long story short I was trying to avoid mucking around in the task 
state management (which has historically been the source of many critical bugs) 
-- also we want to remove the `suspended` state soon anyways. But actually it 
seems simpler to just `close()` the tasks here and now altogether




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-05 Thread GitBox


ableegoldman commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r683055067



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
##
@@ -84,6 +86,15 @@ void setMainConsumer(final Consumer 
mainConsumer) {
 this.mainConsumer = mainConsumer;
 }
 
+void handleNewAssignmentAndCreateTasks(final Map> activeTasksToCreate,
+   final Map> standbyTasksToCreate,
+   final Set 
assignedActiveTasks,
+   final Set 
assignedStandbyTasks) {
+activeTaskCreator.removeRevokedUnknownTasks(diff(HashSet::new, 
assignedActiveTasks, activeTasksToCreate.keySet()));

Review comment:
   Whoops, that's what happens when you push changes late at night  I'll 
fix it up...hopefully will make more sense then

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -1193,6 +1189,18 @@ public void updateTaskEndMetadata(final TopicPartition 
topicPartition, final Lon
 }
 }
 
+/**
+ * Checks for added or removed NamedTopologies that correspond to any 
assigned tasks, and creates/freezes them if so
+ */
+void handleTopologyUpdates() {
+tasks.maybeCreateTasksFromNewTopologies();
+for (final Task task : activeTaskIterable()) {
+if 
(topologyMetadata.namedTopologiesView().contains(task.id().namedTopology())) {
+task.freezeProcessing();

Review comment:
   This is another stupid bug...it should be freezing them if they _don't _ 
appear in the current topology. Tasks from removed named topologies are frozen 
to prevent them from processing any more records, so yes, it is meant to be 
permanent.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -53,44 +56,162 @@
 private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
 
 private final StreamsConfig config;
-private final SortedMap builders; // Keep 
sorted by topology name for readability
+private final TopologyVersion version;
+
+private final ConcurrentNavigableMap 
builders; // Keep sorted by topology name for readability
 
 private ProcessorTopology globalTopology;
-private Map globalStateStores = new HashMap<>();
-final Set allInputTopics = new HashSet<>();
+private final Map globalStateStores = new HashMap<>();
+private final Set allInputTopics = new HashSet<>();
+
+public static class TopologyVersion {
+public AtomicLong topologyVersion = new AtomicLong(0L); // the local 
topology version
+public Set assignedNamedTopologies = new HashSet<>(); // the 
named topologies whose tasks are actively assigned
+public ReentrantLock topologyLock = new ReentrantLock();
+public Condition topologyCV = topologyLock.newCondition();
+}
 
-public TopologyMetadata(final InternalTopologyBuilder builder, final 
StreamsConfig config) {
+public TopologyMetadata(final InternalTopologyBuilder builder,
+final StreamsConfig config) {
+version = new TopologyVersion();
 this.config = config;
-builders = new TreeMap<>();
+builders = new ConcurrentSkipListMap<>();
 if (builder.hasNamedTopology()) {
 builders.put(builder.topologyName(), builder);
 } else {
 builders.put(UNNAMED_TOPOLOGY, builder);
 }
 }
 
-public TopologyMetadata(final SortedMap 
builders, final StreamsConfig config) {
+public TopologyMetadata(final ConcurrentNavigableMap builders,
+final StreamsConfig config) {
+version = new TopologyVersion();
 this.config = config;
+
 this.builders = builders;
 if (builders.isEmpty()) {
-log.debug("Building KafkaStreams app with no empty topology");
+log.debug("Starting up empty KafkaStreams app with no topology");
 }
 }
 
-public int getNumStreamThreads(final StreamsConfig config) {
-final int configuredNumStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+public void updateCurrentAssignmentTopology(final Set 
assignedNamedTopologies) {
+version.assignedNamedTopologies = assignedNamedTopologies;
+}
 
-// If the application uses named topologies, it's possible to start up 
with no topologies at all and only add them later
-if (builders.isEmpty()) {
-if (configuredNumStreamThreads != 0) {
-log.info("Overriding number of StreamThreads to zero for empty 
topology");
+/**
+ * @return the set of named topologies that the assignor distributed tasks 
for during the last rebalance
+ */
+public Set assignmentNamedTopologies() {

[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-04 Thread GitBox


ableegoldman commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r683102378



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -53,44 +56,162 @@
 private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
 
 private final StreamsConfig config;
-private final SortedMap builders; // Keep 
sorted by topology name for readability
+private final TopologyVersion version;
+
+private final ConcurrentNavigableMap 
builders; // Keep sorted by topology name for readability

Review comment:
   Well I removed the whole `assignedNamedTopologies` thing from this PR, 
but yes, that was one of the original intentions of adding it to the 
AssignmentInfo. I'm leaning towards keeping things simple for now and avoid 
adding on to the AssignmentInfo where possible. We can revisit this if the 
extra rebalances become a problem.
   
   It could be sufficient to just use the set of actually-assigned topologies 
as a proxy. It's not perfect since it could be that the leader was aware of a 
topology and simply happened to not assign any of its tasks to this client, but 
I'd bet this gets rid of most of the unnecessary rebalances.
   
   That said, I'm saving this optimization for a followup Pt. 4 PR. I'll 
revisit the best way to handle the `builders` map in that as well




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-04 Thread GitBox


ableegoldman commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r683096288



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
##
@@ -67,13 +72,31 @@
 public class NamedTopologyIntegrationTest {
 public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
 
+// TODO KAFKA-12648:

Review comment:
   I'm still filling out the integration test suite, especially the 
multi-node testing, but I'll make sure this scenario has coverage. This will 
probably have to be in the followup Pt. 4 which expands 
`add/removeNamedTopology` to return a `Future`, since being able to block on 
this helps a lot with the testing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-04 Thread GitBox


ableegoldman commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r683095231



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -867,6 +873,36 @@ private void initializeAndRestorePhase() {
 log.debug("Idempotent restore call done. Thread state has not 
changed.");
 }
 
+private void handleTopologyUpdatesPhase() {
+// Check if the topology has been updated since we last checked, ie 
via #addNamedTopology or #removeNamedTopology
+// or if this is the very first topology in which case we may need to 
wait for it to be non-empty
+if (lastSeenTopologyVersion < topologyMetadata.topologyVersion() || 
lastSeenTopologyVersion == 0) {
+try {

Review comment:
   I refactored this part of the code quite a bit, let me know if you have 
any remaining concerns with the new logic




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-04 Thread GitBox


ableegoldman commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r683065432



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -53,44 +56,162 @@
 private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
 
 private final StreamsConfig config;
-private final SortedMap builders; // Keep 
sorted by topology name for readability
+private final TopologyVersion version;
+
+private final ConcurrentNavigableMap 
builders; // Keep sorted by topology name for readability
 
 private ProcessorTopology globalTopology;
-private Map globalStateStores = new HashMap<>();
-final Set allInputTopics = new HashSet<>();
+private final Map globalStateStores = new HashMap<>();
+private final Set allInputTopics = new HashSet<>();
+
+public static class TopologyVersion {
+public AtomicLong topologyVersion = new AtomicLong(0L); // the local 
topology version
+public Set assignedNamedTopologies = new HashSet<>(); // the 
named topologies whose tasks are actively assigned
+public ReentrantLock topologyLock = new ReentrantLock();
+public Condition topologyCV = topologyLock.newCondition();
+}
 
-public TopologyMetadata(final InternalTopologyBuilder builder, final 
StreamsConfig config) {
+public TopologyMetadata(final InternalTopologyBuilder builder,
+final StreamsConfig config) {
+version = new TopologyVersion();
 this.config = config;
-builders = new TreeMap<>();
+builders = new ConcurrentSkipListMap<>();
 if (builder.hasNamedTopology()) {
 builders.put(builder.topologyName(), builder);
 } else {
 builders.put(UNNAMED_TOPOLOGY, builder);
 }
 }
 
-public TopologyMetadata(final SortedMap 
builders, final StreamsConfig config) {
+public TopologyMetadata(final ConcurrentNavigableMap builders,
+final StreamsConfig config) {
+version = new TopologyVersion();
 this.config = config;
+
 this.builders = builders;
 if (builders.isEmpty()) {
-log.debug("Building KafkaStreams app with no empty topology");
+log.debug("Starting up empty KafkaStreams app with no topology");
 }
 }
 
-public int getNumStreamThreads(final StreamsConfig config) {
-final int configuredNumStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+public void updateCurrentAssignmentTopology(final Set 
assignedNamedTopologies) {
+version.assignedNamedTopologies = assignedNamedTopologies;
+}
 
-// If the application uses named topologies, it's possible to start up 
with no topologies at all and only add them later
-if (builders.isEmpty()) {
-if (configuredNumStreamThreads != 0) {
-log.info("Overriding number of StreamThreads to zero for empty 
topology");
+/**
+ * @return the set of named topologies that the assignor distributed tasks 
for during the last rebalance
+ */
+public Set assignmentNamedTopologies() {
+return version.assignedNamedTopologies;
+}
+
+public long topologyVersion() {
+return version.topologyVersion.get();
+}
+
+public void lock() {
+version.topologyLock.lock();
+}
+
+public void unlock() {
+version.topologyLock.unlock();
+}
+
+public InternalTopologyBuilder getBuilderForTopologyName(final String 
name) {
+return builders.get(name);
+}
+
+/**
+ * @throws IllegalStateException if the thread is not already holding the 
lock via TopologyMetadata#lock
+ */
+public void maybeWaitForNonEmptyTopology() {
+if (!version.topologyLock.isHeldByCurrentThread()) {

Review comment:
   Agreed, let me clean this up




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-04 Thread GitBox


ableegoldman commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r683057288



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -1193,6 +1189,18 @@ public void updateTaskEndMetadata(final TopicPartition 
topicPartition, final Lon
 }
 }
 
+/**
+ * Checks for added or removed NamedTopologies that correspond to any 
assigned tasks, and creates/freezes them if so
+ */
+void handleTopologyUpdates() {
+tasks.maybeCreateTasksFromNewTopologies();
+for (final Task task : activeTaskIterable()) {
+if 
(topologyMetadata.namedTopologiesView().contains(task.id().namedTopology())) {
+task.freezeProcessing();

Review comment:
   This is another stupid bug...it should be freezing them if they _don't _ 
appear in the current topology. Tasks from removed named topologies are frozen 
to prevent them from processing any more records, so yes, it is meant to be 
permanent.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-04 Thread GitBox


ableegoldman commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r683055067



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
##
@@ -84,6 +86,15 @@ void setMainConsumer(final Consumer 
mainConsumer) {
 this.mainConsumer = mainConsumer;
 }
 
+void handleNewAssignmentAndCreateTasks(final Map> activeTasksToCreate,
+   final Map> standbyTasksToCreate,
+   final Set 
assignedActiveTasks,
+   final Set 
assignedStandbyTasks) {
+activeTaskCreator.removeRevokedUnknownTasks(diff(HashSet::new, 
assignedActiveTasks, activeTasksToCreate.keySet()));

Review comment:
   Whoops, that's what happens when you push changes late at night  I'll 
fix it up...hopefully will make more sense then




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-03 Thread GitBox


ableegoldman commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r682238334



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##
@@ -248,12 +249,29 @@ public ByteBuffer subscriptionUserData(final Set 
topics) {
 handleRebalanceStart(topics);
 uniqueField++;
 
+final Set currentNamedTopologies;
+final Map taskOffsetSums;
+try {
+taskManager.topologyMetadata().lock();

Review comment:
   The lock was less for thread-safety purposes and more about atomicity 
w.r.t updating some bookkeeping/state. But I guess I removed a lot of that 
bookkeeping recently, so I'll do another pass and clean up this handling




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-03 Thread GitBox


ableegoldman commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r682237513



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -53,44 +56,162 @@
 private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
 
 private final StreamsConfig config;
-private final SortedMap builders; // Keep 
sorted by topology name for readability
+private final TopologyVersion version;
+
+private final ConcurrentNavigableMap 
builders; // Keep sorted by topology name for readability
 
 private ProcessorTopology globalTopology;
-private Map globalStateStores = new HashMap<>();
-final Set allInputTopics = new HashSet<>();
+private final Map globalStateStores = new HashMap<>();
+private final Set allInputTopics = new HashSet<>();
+
+public static class TopologyVersion {
+public AtomicLong topologyVersion = new AtomicLong(0L); // the local 
topology version
+public Set assignedNamedTopologies = new HashSet<>(); // the 
named topologies whose tasks are actively assigned
+public ReentrantLock topologyLock = new ReentrantLock();
+public Condition topologyCV = topologyLock.newCondition();
+}
 
-public TopologyMetadata(final InternalTopologyBuilder builder, final 
StreamsConfig config) {
+public TopologyMetadata(final InternalTopologyBuilder builder,
+final StreamsConfig config) {
+version = new TopologyVersion();
 this.config = config;
-builders = new TreeMap<>();
+builders = new ConcurrentSkipListMap<>();
 if (builder.hasNamedTopology()) {
 builders.put(builder.topologyName(), builder);
 } else {
 builders.put(UNNAMED_TOPOLOGY, builder);
 }
 }
 
-public TopologyMetadata(final SortedMap 
builders, final StreamsConfig config) {
+public TopologyMetadata(final ConcurrentNavigableMap builders,
+final StreamsConfig config) {
+version = new TopologyVersion();
 this.config = config;
+
 this.builders = builders;
 if (builders.isEmpty()) {
-log.debug("Building KafkaStreams app with no empty topology");
+log.debug("Starting up empty KafkaStreams app with no topology");
 }
 }
 
-public int getNumStreamThreads(final StreamsConfig config) {
-final int configuredNumStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+public void updateCurrentAssignmentTopology(final Set 
assignedNamedTopologies) {
+version.assignedNamedTopologies = assignedNamedTopologies;
+}
 
-// If the application uses named topologies, it's possible to start up 
with no topologies at all and only add them later
-if (builders.isEmpty()) {
-if (configuredNumStreamThreads != 0) {
-log.info("Overriding number of StreamThreads to zero for empty 
topology");
+/**
+ * @return the set of named topologies that the assignor distributed tasks 
for during the last rebalance
+ */
+public Set assignmentNamedTopologies() {
+return version.assignedNamedTopologies;
+}
+
+public long topologyVersion() {
+return version.topologyVersion.get();
+}
+
+public void lock() {

Review comment:
   This part of the code has been through a lot of refactoring and after 
the latest cleanup, I agree we should be able to avoid exposing it at all




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-03 Thread GitBox


ableegoldman commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r682235008



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -935,6 +944,14 @@ void shutdown(final boolean clean) {
 return tasksToCloseDirty;
 }
 
+public void updateCurrentAssignmentTopology(final Set 
assignedNamedTopologies) {
+
topologyMetadata.updateCurrentAssignmentTopology(assignedNamedTopologies);

Review comment:
   Removed until it's needed, if it is




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-03 Thread GitBox


ableegoldman commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r682234726



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -53,44 +56,162 @@
 private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
 
 private final StreamsConfig config;
-private final SortedMap builders; // Keep 
sorted by topology name for readability
+private final TopologyVersion version;
+
+private final ConcurrentNavigableMap 
builders; // Keep sorted by topology name for readability
 
 private ProcessorTopology globalTopology;
-private Map globalStateStores = new HashMap<>();
-final Set allInputTopics = new HashSet<>();
+private final Map globalStateStores = new HashMap<>();
+private final Set allInputTopics = new HashSet<>();
+
+public static class TopologyVersion {
+public AtomicLong topologyVersion = new AtomicLong(0L); // the local 
topology version
+public Set assignedNamedTopologies = new HashSet<>(); // the 
named topologies whose tasks are actively assigned
+public ReentrantLock topologyLock = new ReentrantLock();
+public Condition topologyCV = topologyLock.newCondition();
+}
 
-public TopologyMetadata(final InternalTopologyBuilder builder, final 
StreamsConfig config) {
+public TopologyMetadata(final InternalTopologyBuilder builder,
+final StreamsConfig config) {
+version = new TopologyVersion();
 this.config = config;
-builders = new TreeMap<>();
+builders = new ConcurrentSkipListMap<>();
 if (builder.hasNamedTopology()) {
 builders.put(builder.topologyName(), builder);
 } else {
 builders.put(UNNAMED_TOPOLOGY, builder);
 }
 }
 
-public TopologyMetadata(final SortedMap 
builders, final StreamsConfig config) {
+public TopologyMetadata(final ConcurrentNavigableMap builders,
+final StreamsConfig config) {
+version = new TopologyVersion();
 this.config = config;
+
 this.builders = builders;
 if (builders.isEmpty()) {
-log.debug("Building KafkaStreams app with no empty topology");
+log.debug("Starting up empty KafkaStreams app with no topology");
 }
 }
 
-public int getNumStreamThreads(final StreamsConfig config) {
-final int configuredNumStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+public void updateCurrentAssignmentTopology(final Set 
assignedNamedTopologies) {
+version.assignedNamedTopologies = assignedNamedTopologies;
+}
 
-// If the application uses named topologies, it's possible to start up 
with no topologies at all and only add them later
-if (builders.isEmpty()) {
-if (configuredNumStreamThreads != 0) {
-log.info("Overriding number of StreamThreads to zero for empty 
topology");
+/**
+ * @return the set of named topologies that the assignor distributed tasks 
for during the last rebalance
+ */
+public Set assignmentNamedTopologies() {
+return version.assignedNamedTopologies;
+}
+
+public long topologyVersion() {
+return version.topologyVersion.get();
+}
+
+public void lock() {
+version.topologyLock.lock();
+}
+
+public void unlock() {
+version.topologyLock.unlock();
+}
+
+public InternalTopologyBuilder getBuilderForTopologyName(final String 
name) {
+return builders.get(name);
+}
+
+/**
+ * @throws IllegalStateException if the thread is not already holding the 
lock via TopologyMetadata#lock
+ */
+public void maybeWaitForNonEmptyTopology() {
+if (!version.topologyLock.isHeldByCurrentThread()) {
+throw new IllegalStateException("Must call lock() before 
attempting to wait on non-empty topology");
+}
+while (isEmpty()) {
+try {
+log.debug("Detected that the topology is currently empty, 
going to wait for something to be added");
+version.topologyCV.await();
+} catch (final InterruptedException e) {
+log.debug("StreamThread was interrupted while waiting on empty 
topology", e);
+}
+}
+}
+
+public void registerAndBuildNewTopology(final InternalTopologyBuilder 
newTopologyBuilder) {
+try {
+lock();
+version.topologyVersion.incrementAndGet();
+log.info("Adding NamedTopology {}, latest topology version is {}", 
newTopologyBuilder.topologyName(), version.topologyVersion.get());
+builders.put(newTopologyBuilder.topologyName(), 
newTopologyBuilder);
+buildAndVerifyTopology(newTopologyBuilder);
+version.topologyCV.signalAll();
+} finally {
+ 

[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-03 Thread GitBox


ableegoldman commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r682233727



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -53,44 +56,162 @@
 private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
 
 private final StreamsConfig config;
-private final SortedMap builders; // Keep 
sorted by topology name for readability
+private final TopologyVersion version;
+
+private final ConcurrentNavigableMap 
builders; // Keep sorted by topology name for readability
 
 private ProcessorTopology globalTopology;
-private Map globalStateStores = new HashMap<>();
-final Set allInputTopics = new HashSet<>();
+private final Map globalStateStores = new HashMap<>();
+private final Set allInputTopics = new HashSet<>();
+
+public static class TopologyVersion {
+public AtomicLong topologyVersion = new AtomicLong(0L); // the local 
topology version
+public Set assignedNamedTopologies = new HashSet<>(); // the 
named topologies whose tasks are actively assigned
+public ReentrantLock topologyLock = new ReentrantLock();
+public Condition topologyCV = topologyLock.newCondition();
+}
 
-public TopologyMetadata(final InternalTopologyBuilder builder, final 
StreamsConfig config) {
+public TopologyMetadata(final InternalTopologyBuilder builder,
+final StreamsConfig config) {
+version = new TopologyVersion();
 this.config = config;
-builders = new TreeMap<>();
+builders = new ConcurrentSkipListMap<>();
 if (builder.hasNamedTopology()) {
 builders.put(builder.topologyName(), builder);
 } else {
 builders.put(UNNAMED_TOPOLOGY, builder);
 }
 }
 
-public TopologyMetadata(final SortedMap 
builders, final StreamsConfig config) {
+public TopologyMetadata(final ConcurrentNavigableMap builders,
+final StreamsConfig config) {
+version = new TopologyVersion();
 this.config = config;
+
 this.builders = builders;
 if (builders.isEmpty()) {
-log.debug("Building KafkaStreams app with no empty topology");
+log.debug("Starting up empty KafkaStreams app with no topology");
 }
 }
 
-public int getNumStreamThreads(final StreamsConfig config) {
-final int configuredNumStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+public void updateCurrentAssignmentTopology(final Set 
assignedNamedTopologies) {
+version.assignedNamedTopologies = assignedNamedTopologies;
+}
 
-// If the application uses named topologies, it's possible to start up 
with no topologies at all and only add them later
-if (builders.isEmpty()) {
-if (configuredNumStreamThreads != 0) {
-log.info("Overriding number of StreamThreads to zero for empty 
topology");
+/**
+ * @return the set of named topologies that the assignor distributed tasks 
for during the last rebalance
+ */
+public Set assignmentNamedTopologies() {
+return version.assignedNamedTopologies;
+}
+
+public long topologyVersion() {
+return version.topologyVersion.get();
+}
+
+public void lock() {
+version.topologyLock.lock();
+}
+
+public void unlock() {
+version.topologyLock.unlock();
+}
+
+public InternalTopologyBuilder getBuilderForTopologyName(final String 
name) {
+return builders.get(name);
+}
+
+/**
+ * @throws IllegalStateException if the thread is not already holding the 
lock via TopologyMetadata#lock
+ */
+public void maybeWaitForNonEmptyTopology() {
+if (!version.topologyLock.isHeldByCurrentThread()) {
+throw new IllegalStateException("Must call lock() before 
attempting to wait on non-empty topology");
+}
+while (isEmpty()) {
+try {
+log.debug("Detected that the topology is currently empty, 
going to wait for something to be added");
+version.topologyCV.await();
+} catch (final InterruptedException e) {
+log.debug("StreamThread was interrupted while waiting on empty 
topology", e);
+}
+}
+}
+
+public void registerAndBuildNewTopology(final InternalTopologyBuilder 
newTopologyBuilder) {
+try {
+lock();
+version.topologyVersion.incrementAndGet();
+log.info("Adding NamedTopology {}, latest topology version is {}", 
newTopologyBuilder.topologyName(), version.topologyVersion.get());
+builders.put(newTopologyBuilder.topologyName(), 
newTopologyBuilder);
+buildAndVerifyTopology(newTopologyBuilder);
+version.topologyCV.signalAll();
+} finally {
+ 

[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-03 Thread GitBox


ableegoldman commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r682231789



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##
@@ -105,31 +105,76 @@ private KafkaStreamsNamedTopologyWrapper(final 
Collection topolog
 (v1, v2) -> {
 throw new IllegalArgumentException("Topology names 
must be unique");
 },
-() -> new TreeMap<>())),
+() -> new ConcurrentSkipListMap<>())),
 config),
 config,
 clientSupplier
 );
-for (final NamedTopology topology : topologies) {
-nameToTopology.put(topology.name(), topology);
-}
 }
 
-public NamedTopology getTopologyByName(final String name) {
-if (nameToTopology.containsKey(name)) {
-return nameToTopology.get(name);
-} else {
-throw new IllegalArgumentException("Unable to locate a 
NamedTopology called " + name);
+/**
+ * @return the NamedTopology for the specific name, or Optional.empty() if 
the application has no NamedTopology of that name
+ */
+public Optional getTopologyByName(final String name) {
+return 
Optional.ofNullable(topologyMetadata.getBuilderForTopologyName(name)).map(InternalTopologyBuilder::namedTopology);
+}
+
+/**
+ * Add a new NamedTopology to a running Kafka Streams app. If multiple 
instances of the application are running,
+ * you should inform all of them by calling {@link 
#addNamedTopology(NamedTopology)} on each client in order for
+ * it to begin processing the new topology.
+ *
+ * @throws IllegalArgumentException if this topology name is already in use
+ * @throws IllegalStateExceptionif streams has not been started or has 
already shut down
+ * @throws TopologyExceptionif this topology subscribes to any 
input topics or pattern already in use
+ */
+public void addNamedTopology(final NamedTopology newTopology) {
+if (hasStartedOrFinishedShuttingDown()) {
+throw new IllegalStateException("Cannot add a NamedTopology while 
the state is " + super.state);
+} else if (getTopologyByName(newTopology.name()).isPresent()) {
+throw new IllegalArgumentException("Unable to add the new 
NamedTopology " + newTopology.name() +
+   " as another of the same 
name already exists");
 }
+
topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
 }
 
-public void addNamedTopology(final NamedTopology topology) {
-nameToTopology.put(topology.name(), topology);
-throw new UnsupportedOperationException();
+/**
+ * Remove an existing NamedTopology from a running Kafka Streams app. If 
multiple instances of the application are
+ * running, you should inform all of them by calling {@link 
#removeNamedTopology(String)} on each client to ensure
+ * it stops processing the old topology.
+ *
+ * @throws IllegalArgumentException if this topology name cannot be found
+ * @throws IllegalStateExceptionif streams has not been started or has 
already shut down
+ * @throws TopologyExceptionif this topology subscribes to any 
input topics or pattern already in use
+ */
+public void removeNamedTopology(final String topologyToRemove) {
+if (!isRunningOrRebalancing()) {
+throw new IllegalStateException("Cannot remove a NamedTopology 
while the state is " + super.state);
+} else if (!getTopologyByName(topologyToRemove).isPresent()) {
+throw new IllegalArgumentException("Unable to locate for removal a 
NamedTopology called " + topologyToRemove);
+}
+
+topologyMetadata.unregisterTopology(topologyToRemove);
 }
 
-public void removeNamedTopology(final String namedTopology) {
-throw new UnsupportedOperationException();
+/**
+ * Do a clean up of the local state directory for this NamedTopology by 
deleting all data with regard to the
+ * @link StreamsConfig#APPLICATION_ID_CONFIG application ID} in the 
({@link StreamsConfig#STATE_DIR_CONFIG})
+ * 
+ * May be called while the Streams is in any state, but only on a {@link 
NamedTopology} that has already been
+ * removed via {@link #removeNamedTopology(String)}.
+ * 
+ * Calling this method triggers a restore of local {@link StateStore}s for 
this {@link NamedTopology} if it is
+ * ever re-added via {@link #addNamedTopology(NamedTopology)}.
+ *
+ * @throws IllegalStateException if this {@code NamedTopology} hasn't been 
removed
+ * @throws StreamsException if cleanup failed
+ */
+public void cleanUpNamedTopology(final String name) {
+if 

[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-03 Thread GitBox


ableegoldman commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r682230209



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
##
@@ -64,6 +65,9 @@
 private final Map taskProducers;
 private final StreamThread.ProcessingMode processingMode;
 
+// tasks may be assigned for a NamedTopology that is not yet known by this 
host, and saved for later creation
+private final Map>  unknownTasksToBeCreated = 
new HashMap<>();

Review comment:
   I think we discussed this already, but in case anyone else is wondering: 
the leader will always assign tasks based on its view of the current named 
topologies and topics, ie it does not check on individual subscriptions since 
the group is assumed to be eventually consistent in this regard. (Note this is 
actually no different than today; even if each instance of an app has a 
different input topic in their topology, they will all wind up receiving tasks 
for whichever topic the leader happened to have.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-07-29 Thread GitBox


ableegoldman commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r679531569



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/NamedTopologyTest.java
##
@@ -112,15 +103,15 @@ public void shouldReturnTopologyByName() {
 final NamedTopology topology2 = builder2.buildNamedTopology(props);
 final NamedTopology topology3 = builder3.buildNamedTopology(props);
 streams = new KafkaStreamsNamedTopologyWrapper(asList(topology1, 
topology2, topology3), props, clientSupplier);
-assertThat(streams.getTopologyByName("topology-1"), 
equalTo(topology1));
-assertThat(streams.getTopologyByName("topology-2"), 
equalTo(topology2));
-assertThat(streams.getTopologyByName("topology-3"), 
equalTo(topology3));
+assertThat(streams.getTopologyByName("topology-1").get(), 
equalTo(topology1));
+assertThat(streams.getTopologyByName("topology-2").get(), 
equalTo(topology2));
+assertThat(streams.getTopologyByName("topology-3").get(), 
equalTo(topology3));
 }
 
 @Test
-public void 
shouldThrowIllegalArgumentWhenLookingUpNonExistentTopologyByName() {
+public void shouldReturnEmptyWhenLookingUpNonExistentTopologyByName() {

Review comment:
   We changed the behavior to return an empty Optional rather than throw, 
as users may want to use this API to determine whether the given named topology 
is known or not




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org