[GitHub] [kafka] ableegoldman commented on a change in pull request #10921: KAFKA-13096: Ensure queryable store providers is up to date after adding stream thread
ableegoldman commented on a change in pull request #10921: URL: https://github.com/apache/kafka/pull/10921#discussion_r673585765 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java ## @@ -56,8 +60,16 @@ public QueryableStoreProvider(final List storePr return queryableStoreType.create(globalStoreProvider, storeName); } return queryableStoreType.create( -new WrappingStoreProvider(storeProviders, storeQueryParameters), +new WrappingStoreProvider(new ArrayList<>(storeProviders.values()), storeQueryParameters), Review comment: Hmm...I'm a little less sure about this, but I think we should make sure that WrappingStoreProvider's view of the stream thread storeProviders also stays up to date when threads are added/removed. Basically if a user calls KafkaStreams.store() then adds/removes a bunch of threads without refreshing the store provider, any subsequent get() on that provider would only see the threads that existed at the time KAfkaStreams.store() was called if we make a copy like this. We should be able to just modify the WrappingStoreProvider constructor/local field to be a Set or even a Collection instead, since all it ever does is loop over this. Then we can just pass in storeProviders.values() and it's all good -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10921: KAFKA-13096: Ensure queryable store providers is up to date after adding stream thread
ableegoldman commented on a change in pull request #10921: URL: https://github.com/apache/kafka/pull/10921#discussion_r672764087 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1081,6 +1079,7 @@ private int getNumStreamThreads(final boolean hasGlobalTopology) { } else { log.info("Successfully removed {} in {}ms", streamThread.getName(), time.milliseconds() - startMs); threads.remove(streamThread); +queryableStoreProvider.removeStoreProvider(new StreamThreadStateStoreProvider(streamThread)); Review comment: Ah, yeah I guess it would have always had to handle `DEAD` threads since in the before-time (ie before we added the `add/removeStreamThread` APIs) it was always possible for a thread to just die when hit with an unexpected exception. That said, I feel a lot better about trimming a removed thread from the list explicitly. Don't want to build up a mass grave of mostly dead threads (well, dead thread store providers) that can never be garbage collected over the application's lifetime ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1081,6 +1079,7 @@ private int getNumStreamThreads(final boolean hasGlobalTopology) { } else { log.info("Successfully removed {} in {}ms", streamThread.getName(), time.milliseconds() - startMs); threads.remove(streamThread); +queryableStoreProvider.removeStoreProvider(new StreamThreadStateStoreProvider(streamThread)); Review comment: It seems a bit weird to have to create a `new StreamThreadStateStoreProvider(streamThread)` just to remove and existing `StreamThreadStateStoreProvider` from this -- can we maybe change it so that `#removeStoreProvider` accepts a `StreamThread` reference, or even just a `String streamThreadName`? And then store a map from name to `StreamThreadStateStoreProvider` or something -- WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10921: KAFKA-13096: Ensure queryable store providers is up to date after adding stream thread
ableegoldman commented on a change in pull request #10921: URL: https://github.com/apache/kafka/pull/10921#discussion_r672764087 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1081,6 +1079,7 @@ private int getNumStreamThreads(final boolean hasGlobalTopology) { } else { log.info("Successfully removed {} in {}ms", streamThread.getName(), time.milliseconds() - startMs); threads.remove(streamThread); +queryableStoreProvider.removeStoreProvider(new StreamThreadStateStoreProvider(streamThread)); Review comment: Ah, yeah I guess it would have always had to handle `DEAD` threads since in the before-time (ie before we added the `add/removeStreamThread` APIs) it was always possible for a thread to just die when hit with an unexpected exception. That said, I feel a lot better about trimming a removed thread from the list explicitly. Don't want to build up a mass grave of mostly dead threads (well, dead thread store providers) that can never be garbage collected over the application's lifetime ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1081,6 +1079,7 @@ private int getNumStreamThreads(final boolean hasGlobalTopology) { } else { log.info("Successfully removed {} in {}ms", streamThread.getName(), time.milliseconds() - startMs); threads.remove(streamThread); +queryableStoreProvider.removeStoreProvider(new StreamThreadStateStoreProvider(streamThread)); Review comment: It seems a bit weird to have to create a `new StreamThreadStateStoreProvider(streamThread)` just to remove and existing `StreamThreadStateStoreProvider` from this -- can we maybe change it so that `#removeStoreProvider` accepts a `StreamThread` reference, or even just a `String streamThreadName`? And then store a map from name to `StreamThreadStateStoreProvider` or something -- WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10921: KAFKA-13096: Ensure queryable store providers is up to date after adding stream thread
ableegoldman commented on a change in pull request #10921: URL: https://github.com/apache/kafka/pull/10921#discussion_r672765942 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1081,6 +1079,7 @@ private int getNumStreamThreads(final boolean hasGlobalTopology) { } else { log.info("Successfully removed {} in {}ms", streamThread.getName(), time.milliseconds() - startMs); threads.remove(streamThread); +queryableStoreProvider.removeStoreProvider(new StreamThreadStateStoreProvider(streamThread)); Review comment: It seems a bit weird to have to create a `new StreamThreadStateStoreProvider(streamThread)` just to remove and existing `StreamThreadStateStoreProvider` from this -- can we maybe change it so that `#removeStoreProvider` accepts a `StreamThread` reference, or even just a `String streamThreadName`? And then store a map from name to `StreamThreadStateStoreProvider` or something -- WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10921: KAFKA-13096: Ensure queryable store providers is up to date after adding stream thread
ableegoldman commented on a change in pull request #10921: URL: https://github.com/apache/kafka/pull/10921#discussion_r672764087 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1081,6 +1079,7 @@ private int getNumStreamThreads(final boolean hasGlobalTopology) { } else { log.info("Successfully removed {} in {}ms", streamThread.getName(), time.milliseconds() - startMs); threads.remove(streamThread); +queryableStoreProvider.removeStoreProvider(new StreamThreadStateStoreProvider(streamThread)); Review comment: Ah, yeah I guess it would have always had to handle `DEAD` threads since in the before-time (ie before we added the `add/removeStreamThread` APIs) it was always possible for a thread to just die when hit with an unexpected exception. That said, I feel a lot better about trimming a removed thread from the list explicitly. Don't want to build up a mass grave of mostly dead threads (well, dead thread store providers) that can never be garbage collected over the application's lifetime -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org