guozhangwang commented on a change in pull request #11703: URL: https://github.com/apache/kafka/pull/11703#discussion_r790371892
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java ########## @@ -56,18 +56,26 @@ public static StreamsMetricsImpl getMetricsImpl(final StateStoreContext context) return (StreamsMetricsImpl) context.metrics(); } - public static String changelogFor(final ProcessorContext context, final String storeName) { + public static String changelogFor(final ProcessorContext context, final String storeName, final Boolean newChangelogTopic) { final String prefix = getPrefix(context.appConfigs(), context.applicationId()); - return context instanceof InternalProcessorContext - ? ((InternalProcessorContext) context).changelogFor(storeName) - : ProcessorStateManager.storeChangelogTopic(prefix, storeName, context.taskId().topologyName()); + if (context instanceof InternalProcessorContext && !newChangelogTopic) { + final String changelogTopic = ((InternalProcessorContext) context).changelogFor(storeName); + if (changelogTopic != null) + return changelogTopic; + + } + return ProcessorStateManager.storeChangelogTopic(prefix, storeName, context.taskId().topologyName()); } - public static String changelogFor(final StateStoreContext context, final String storeName) { + public static String changelogFor(final StateStoreContext context, final String storeName, final Boolean newChangelogTopic) { final String prefix = getPrefix(context.appConfigs(), context.applicationId()); - return context instanceof InternalProcessorContext - ? ((InternalProcessorContext) context).changelogFor(storeName) - : ProcessorStateManager.storeChangelogTopic(prefix, storeName, context.taskId().topologyName()); + if (context instanceof InternalProcessorContext && !newChangelogTopic) { + final String changelogTopic = ((InternalProcessorContext) context).changelogFor(storeName); Review comment: I checked the code in `InternalTopologyBuilder` and I think that at the time `ProcessorStateManager` is constructed all store->changelog pairs should be inside the `Map<String, String> storeToChangelogTopic` already, and assuming that for all mock/tests, we let the mock of InternalProcessorContext to return non-null strings we should never have `null` returned from `changelogFor`. @cadonna could you help me double check here, since in the original PR your intention (https://github.com/apache/kafka/pull/8902/files#r452148874) is to still need null-check. If that's the case, we can further simply the logic here as: 1) we would never need to fallback to `ProcessorStateManager.storeChangelogTopic` since the changelog string should be constructed via the function and added into the map in `InternalTopologyBuilder` as always. 2) hence we do not need the third param since we would never need to create new changelog. ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ########## @@ -171,42 +169,24 @@ private void registerMetrics() { @Deprecated private void initStoreSerde(final ProcessorContext context) { final String storeName = name(); - final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName); - final String prefix = getPrefix(context.appConfigs(), context.applicationId()); + final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); Review comment: Only three MeteredXYZStore classes' `initStoreSerde` have the third param to false, but it seems that if the returned value is `null` we would still fallback to `ProcessorStateManager.storeChangelogTopic(prefix, storeName, taskId.topologyName())` to create the new topic, so it should be true as well?? BTW if my other comment is valid, then we can remove this param anyways. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org