guozhangwang commented on a change in pull request #11703:
URL: https://github.com/apache/kafka/pull/11703#discussion_r790371892



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
##########
@@ -56,18 +56,26 @@ public static StreamsMetricsImpl getMetricsImpl(final 
StateStoreContext context)
         return (StreamsMetricsImpl) context.metrics();
     }
 
-    public static String changelogFor(final ProcessorContext context, final 
String storeName) {
+    public static String changelogFor(final ProcessorContext context, final 
String storeName, final Boolean newChangelogTopic) {
         final String prefix = getPrefix(context.appConfigs(), 
context.applicationId());
-        return context instanceof InternalProcessorContext
-            ? ((InternalProcessorContext) context).changelogFor(storeName)
-            : ProcessorStateManager.storeChangelogTopic(prefix, storeName, 
context.taskId().topologyName());
+        if (context instanceof InternalProcessorContext && !newChangelogTopic) 
{
+            final String changelogTopic = ((InternalProcessorContext) 
context).changelogFor(storeName);
+            if (changelogTopic != null)
+                return changelogTopic;
+
+        }
+        return ProcessorStateManager.storeChangelogTopic(prefix, storeName, 
context.taskId().topologyName());
     }
 
-    public static String changelogFor(final StateStoreContext context, final 
String storeName) {
+    public static String changelogFor(final StateStoreContext context, final 
String storeName, final Boolean newChangelogTopic) {
         final String prefix = getPrefix(context.appConfigs(), 
context.applicationId());
-        return context instanceof InternalProcessorContext
-            ? ((InternalProcessorContext) context).changelogFor(storeName)
-            : ProcessorStateManager.storeChangelogTopic(prefix, storeName, 
context.taskId().topologyName());
+        if (context instanceof InternalProcessorContext && !newChangelogTopic) 
{
+            final String changelogTopic = ((InternalProcessorContext) 
context).changelogFor(storeName);

Review comment:
       I checked the code in `InternalTopologyBuilder` and I think that at the 
time `ProcessorStateManager` is constructed all store->changelog pairs should 
be inside the `Map<String, String> storeToChangelogTopic` already, and assuming 
that for all mock/tests, we let the mock of InternalProcessorContext to return 
non-null strings we should never have `null` returned from `changelogFor`. 
@cadonna could you help me double check here, since in the original PR your 
intention (https://github.com/apache/kafka/pull/8902/files#r452148874) is to 
still need null-check.
   
   If that's the case, we can further simply the logic here as:
   
   1) we would never need to fallback to 
`ProcessorStateManager.storeChangelogTopic` since the changelog string should 
be constructed via the function and added into the map in 
`InternalTopologyBuilder` as always.
   2) hence we do not need the third param since we would never need to create 
new changelog. 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -171,42 +169,24 @@ private void registerMetrics() {
     @Deprecated
     private void initStoreSerde(final ProcessorContext context) {
         final String storeName = name();
-        final String changelogTopic = 
ProcessorContextUtils.changelogFor(context, storeName);
-        final String prefix = getPrefix(context.appConfigs(), 
context.applicationId());
+        final String changelogTopic = 
ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE);

Review comment:
       Only three MeteredXYZStore classes' `initStoreSerde` have the third 
param to false, but it seems that if the returned value is `null` we would 
still fallback to `ProcessorStateManager.storeChangelogTopic(prefix, storeName, 
taskId.topologyName())` to create the new topic, so it should be true as well??
   
   BTW if my other comment is valid, then we can remove this param anyways.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to