guozhangwang commented on a change in pull request #8681:
URL: https://github.com/apache/kafka/pull/8681#discussion_r426975369



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -266,7 +266,8 @@ public void registerStore(final StateStore store, final 
StateRestoreCallback sta
         }
 
         if (stores.containsKey(storeName)) {
-            throw new IllegalArgumentException(format("%sStore %s has already 
been registered.", logPrefix, storeName));
+            log.warn("State Store {} has already been registered, which could 
be due to a half-way registration" +

Review comment:
       +1, we can rely on `storeManager#getStore` inside `StateManagerUtil` to 
check if the store is already registered.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -266,7 +266,8 @@ public void registerStore(final StateStore store, final 
StateRestoreCallback sta
         }
 
         if (stores.containsKey(storeName)) {
-            throw new IllegalArgumentException(format("%sStore %s has already 
been registered.", logPrefix, storeName));
+            log.warn("State Store {} has already been registered, which could 
be due to a half-way registration" +
+                "in the previous round", storeName);

Review comment:
       nit: we could make the warn log entry more clear that we did not 
override the registered the store, e.g. "Skipped registering state store {} 
since it has already existed in the state manager, ..."




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to