mjsax commented on a change in pull request #6824:
URL: https://github.com/apache/kafka/pull/6824#discussion_r427675991



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
##########
@@ -700,12 +703,29 @@ public void process(final ProcessorSupplier<? super K, ? 
super V> processorSuppl
         final StatefulProcessorNode<? super K, ? super V> processNode = new 
StatefulProcessorNode<>(
                 name,
                 new ProcessorParameters<>(processorSupplier, name),
-                stateStoreNames
+                getStoreNamesAndMaybeAddStores(processorSupplier, 
stateStoreNames)
         );
 
         builder.addGraphNode(this.streamsGraphNode, processNode);
     }
 
+    /**
+     * Provides store names that should be connected to a {@link 
StatefulProcessorNode}, from two sources:
+     * 1) Store names are provided as arguments to process(...), 
transform(...), etc.
+     * 2) {@link StoreBuilder}s are provided by the 
Processor/TransformerSupplier itself, by returning a set from
+     * {@link ConnectedStoreProvider#stores()}.  The {@link StoreBuilder}s 
will also be added to the topology.
+     */
+    private String[] getStoreNamesAndMaybeAddStores(final 
ConnectedStoreProvider storeProvider, final String[] varargsStoreNames) {

Review comment:
       No worries. All good. :)
   
   Btw: the optimization layer was added in 2.0




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to