[GitHub] [kafka] ableegoldman commented on a change in pull request #10921: KAFKA-13096: Ensure queryable store providers is up to date after adding stream thread

2021-07-20 Thread GitBox


ableegoldman commented on a change in pull request #10921:
URL: https://github.com/apache/kafka/pull/10921#discussion_r673585765



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
##
@@ -56,8 +60,16 @@ public QueryableStoreProvider(final 
List storePr
 return queryableStoreType.create(globalStoreProvider, storeName);
 }
 return queryableStoreType.create(
-new WrappingStoreProvider(storeProviders, storeQueryParameters),
+new WrappingStoreProvider(new 
ArrayList<>(storeProviders.values()), storeQueryParameters),

Review comment:
   Hmm...I'm a little less sure about this, but I think we should make sure 
that WrappingStoreProvider's view of the stream thread storeProviders also 
stays up to date when threads are added/removed. Basically if a user calls 
KafkaStreams.store() then adds/removes a bunch of threads without refreshing 
the store provider, any subsequent get() on that provider would only see the 
threads that existed at the time KAfkaStreams.store() was called if we make a 
copy like this.
   
   We should be able to just modify the WrappingStoreProvider constructor/local 
field to be a Set or even a Collection instead, since all it ever does is loop 
over this. Then we can just pass in storeProviders.values() and it's all good




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10921: KAFKA-13096: Ensure queryable store providers is up to date after adding stream thread

2021-07-20 Thread GitBox


ableegoldman commented on a change in pull request #10921:
URL: https://github.com/apache/kafka/pull/10921#discussion_r672764087



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1081,6 +1079,7 @@ private int getNumStreamThreads(final boolean 
hasGlobalTopology) {
 } else {
 log.info("Successfully removed {} in {}ms", 
streamThread.getName(), time.milliseconds() - startMs);
 threads.remove(streamThread);
+queryableStoreProvider.removeStoreProvider(new 
StreamThreadStateStoreProvider(streamThread));

Review comment:
   Ah, yeah I guess it would have always had to handle `DEAD` threads since 
in the before-time (ie before we added the `add/removeStreamThread` APIs) it 
was always possible for a thread to just die when hit with an unexpected 
exception.
   
   That said, I feel a lot better about trimming a removed thread from the list 
explicitly. Don't want to build up a mass grave of mostly dead threads (well, 
dead thread store providers) that can never be garbage collected over the 
application's lifetime

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1081,6 +1079,7 @@ private int getNumStreamThreads(final boolean 
hasGlobalTopology) {
 } else {
 log.info("Successfully removed {} in {}ms", 
streamThread.getName(), time.milliseconds() - startMs);
 threads.remove(streamThread);
+queryableStoreProvider.removeStoreProvider(new 
StreamThreadStateStoreProvider(streamThread));

Review comment:
   It seems a bit weird to have to create a `new 
StreamThreadStateStoreProvider(streamThread)` just to remove and existing 
`StreamThreadStateStoreProvider` from this -- can we maybe change it so that 
`#removeStoreProvider` accepts a `StreamThread` reference, or even just a 
`String streamThreadName`? And then store a map from name to 
`StreamThreadStateStoreProvider` or something -- WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10921: KAFKA-13096: Ensure queryable store providers is up to date after adding stream thread

2021-07-20 Thread GitBox


ableegoldman commented on a change in pull request #10921:
URL: https://github.com/apache/kafka/pull/10921#discussion_r672764087



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1081,6 +1079,7 @@ private int getNumStreamThreads(final boolean 
hasGlobalTopology) {
 } else {
 log.info("Successfully removed {} in {}ms", 
streamThread.getName(), time.milliseconds() - startMs);
 threads.remove(streamThread);
+queryableStoreProvider.removeStoreProvider(new 
StreamThreadStateStoreProvider(streamThread));

Review comment:
   Ah, yeah I guess it would have always had to handle `DEAD` threads since 
in the before-time (ie before we added the `add/removeStreamThread` APIs) it 
was always possible for a thread to just die when hit with an unexpected 
exception.
   
   That said, I feel a lot better about trimming a removed thread from the list 
explicitly. Don't want to build up a mass grave of mostly dead threads (well, 
dead thread store providers) that can never be garbage collected over the 
application's lifetime

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1081,6 +1079,7 @@ private int getNumStreamThreads(final boolean 
hasGlobalTopology) {
 } else {
 log.info("Successfully removed {} in {}ms", 
streamThread.getName(), time.milliseconds() - startMs);
 threads.remove(streamThread);
+queryableStoreProvider.removeStoreProvider(new 
StreamThreadStateStoreProvider(streamThread));

Review comment:
   It seems a bit weird to have to create a `new 
StreamThreadStateStoreProvider(streamThread)` just to remove and existing 
`StreamThreadStateStoreProvider` from this -- can we maybe change it so that 
`#removeStoreProvider` accepts a `StreamThread` reference, or even just a 
`String streamThreadName`? And then store a map from name to 
`StreamThreadStateStoreProvider` or something -- WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10921: KAFKA-13096: Ensure queryable store providers is up to date after adding stream thread

2021-07-19 Thread GitBox


ableegoldman commented on a change in pull request #10921:
URL: https://github.com/apache/kafka/pull/10921#discussion_r672765942



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1081,6 +1079,7 @@ private int getNumStreamThreads(final boolean 
hasGlobalTopology) {
 } else {
 log.info("Successfully removed {} in {}ms", 
streamThread.getName(), time.milliseconds() - startMs);
 threads.remove(streamThread);
+queryableStoreProvider.removeStoreProvider(new 
StreamThreadStateStoreProvider(streamThread));

Review comment:
   It seems a bit weird to have to create a `new 
StreamThreadStateStoreProvider(streamThread)` just to remove and existing 
`StreamThreadStateStoreProvider` from this -- can we maybe change it so that 
`#removeStoreProvider` accepts a `StreamThread` reference, or even just a 
`String streamThreadName`? And then store a map from name to 
`StreamThreadStateStoreProvider` or something -- WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10921: KAFKA-13096: Ensure queryable store providers is up to date after adding stream thread

2021-07-19 Thread GitBox


ableegoldman commented on a change in pull request #10921:
URL: https://github.com/apache/kafka/pull/10921#discussion_r672764087



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1081,6 +1079,7 @@ private int getNumStreamThreads(final boolean 
hasGlobalTopology) {
 } else {
 log.info("Successfully removed {} in {}ms", 
streamThread.getName(), time.milliseconds() - startMs);
 threads.remove(streamThread);
+queryableStoreProvider.removeStoreProvider(new 
StreamThreadStateStoreProvider(streamThread));

Review comment:
   Ah, yeah I guess it would have always had to handle `DEAD` threads since 
in the before-time (ie before we added the `add/removeStreamThread` APIs) it 
was always possible for a thread to just die when hit with an unexpected 
exception.
   
   That said, I feel a lot better about trimming a removed thread from the list 
explicitly. Don't want to build up a mass grave of mostly dead threads (well, 
dead thread store providers) that can never be garbage collected over the 
application's lifetime




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org