mjsax commented on a change in pull request #6824:
URL: https://github.com/apache/kafka/pull/6824#discussion_r427675991
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
##########
@@ -700,12 +703,29 @@ public void process(final ProcessorSupplier<? super K, ?
super V> processorSuppl
final StatefulProcessorNode<? super K, ? super V> processNode = new
StatefulProcessorNode<>(
name,
new ProcessorParameters<>(processorSupplier, name),
- stateStoreNames
+ getStoreNamesAndMaybeAddStores(processorSupplier,
stateStoreNames)
);
builder.addGraphNode(this.streamsGraphNode, processNode);
}
+ /**
+ * Provides store names that should be connected to a {@link
StatefulProcessorNode}, from two sources:
+ * 1) Store names are provided as arguments to process(...),
transform(...), etc.
+ * 2) {@link StoreBuilder}s are provided by the
Processor/TransformerSupplier itself, by returning a set from
+ * {@link ConnectedStoreProvider#stores()}. The {@link StoreBuilder}s
will also be added to the topology.
+ */
+ private String[] getStoreNamesAndMaybeAddStores(final
ConnectedStoreProvider storeProvider, final String[] varargsStoreNames) {
Review comment:
No worries. All good. :)
Btw: the optimization layer was added in 2.0
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]