cadonna commented on a change in pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#discussion_r445457706



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -578,4 +577,10 @@ private StateStoreMetadata findStore(final TopicPartition 
changelogPartition) {
 
         return found.isEmpty() ? null : found.get(0);
     }
+
+    @Override
+    public TopicPartition changelogTopicPartitionFor(final String storeName) {
+        final StateStoreMetadata storeMetadata = stores.get(storeName);
+        return storeMetadata == null ? null : storeMetadata.changelogPartition;

Review comment:
       I would say it indicates a bug, if the state store should be 
changelogged. My point is that retrieving a changelog partition for a state 
store does not necessarily mean that the state store needs to be registered. 
However, since `registeredChangelogPartitionFor()` is currently only called for 
changelogging, I will add two `IllegalStateException`s to this method: one for 
non registered stores and one for stores with logging disabled. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to