mjsax commented on a change in pull request #6824: URL: https://github.com/apache/kafka/pull/6824#discussion_r430786602
########## File path: docs/streams/developer-guide/dsl-api.html ########## @@ -3533,10 +3533,17 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable Foreign-Key <div class="admonition tip"> <p><b>Tip</b></p> <p class="last">Even though we do not demonstrate it in this example, a stream processor can access any available state stores by - calling <code class="docutils literal"><span class="pre">ProcessorContext#getStateStore()</span></code>. Only such state stores are available that (1) have been named in the - corresponding <code class="docutils literal"><span class="pre">KStream#process()</span></code> method call (note that this is a different method than <code class="docutils literal"><span class="pre">Processor#process()</span></code>), - plus (2) all global stores. Note that global stores do not need to be attached explicitly; however, they only - allow for read-only access.</p> + calling <code class="docutils literal"><span class="pre">ProcessorContext#getStateStore()</span></code>. + State stores are only available if they have been connected to the processor, or if they are global stores. While global stores do not need to be connected explicitly, they only allow for read-only access. + There are two ways to connect state stores to a processor: + <ul class="simple"> + <li>By passing the name of a store that has already been added via <code class="docutils literal"><span class="pre">Topology#addStateStore()</span></code> to the corresponding <code class="docutils literal"><span class="pre">KStream#process()</span></code> method call.</li> + <li>Implementing <code class="docutils literal"><span class="pre">ConnectedStoreProvider#stores()</span></code> on the <code class="docutils literal"><span class="pre">ProcessorSupplier</span></code> + passed to <code class="docutils literal"><span class="pre">KStream#process()</span></code>. In this case there is no need to call <code class="docutils literal"><span class="pre">Topology#addStateStore()</span></code> Review comment: ```suggestion passed to <code class="docutils literal"><span class="pre">KStream#process()</span></code>. In this case there is no need to call <code class="docutils literal"><span class="pre">StreamsBuilder#addStateStore()</span></code> ``` ########## File path: docs/streams/developer-guide/dsl-api.html ########## @@ -3533,10 +3533,17 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable Foreign-Key <div class="admonition tip"> <p><b>Tip</b></p> <p class="last">Even though we do not demonstrate it in this example, a stream processor can access any available state stores by - calling <code class="docutils literal"><span class="pre">ProcessorContext#getStateStore()</span></code>. Only such state stores are available that (1) have been named in the - corresponding <code class="docutils literal"><span class="pre">KStream#process()</span></code> method call (note that this is a different method than <code class="docutils literal"><span class="pre">Processor#process()</span></code>), - plus (2) all global stores. Note that global stores do not need to be attached explicitly; however, they only - allow for read-only access.</p> + calling <code class="docutils literal"><span class="pre">ProcessorContext#getStateStore()</span></code>. + State stores are only available if they have been connected to the processor, or if they are global stores. While global stores do not need to be connected explicitly, they only allow for read-only access. + There are two ways to connect state stores to a processor: + <ul class="simple"> + <li>By passing the name of a store that has already been added via <code class="docutils literal"><span class="pre">Topology#addStateStore()</span></code> to the corresponding <code class="docutils literal"><span class="pre">KStream#process()</span></code> method call.</li> + <li>Implementing <code class="docutils literal"><span class="pre">ConnectedStoreProvider#stores()</span></code> on the <code class="docutils literal"><span class="pre">ProcessorSupplier</span></code> + passed to <code class="docutils literal"><span class="pre">KStream#process()</span></code>. In this case there is no need to call <code class="docutils literal"><span class="pre">Topology#addStateStore()</span></code> Review comment: This is the DSL guide, so using `StreamsBuilder` seems appropriate. ########## File path: streams/src/main/java/org/apache/kafka/streams/Topology.java ########## @@ -656,6 +660,14 @@ public synchronized Topology addProcessor(final String name, final ProcessorSupplier supplier, final String... parentNames) { internalTopologyBuilder.addProcessor(name, supplier, parentNames); + final Set<StoreBuilder<?>> stores = supplier.stores(); + if (stores != null) { + for (final StoreBuilder storeBuilder : stores) { + internalTopologyBuilder.addStateStore(storeBuilder, name); + } + final String[] storeNames = stores.stream().map(StoreBuilder::name).toArray(String[]::new); + internalTopologyBuilder.connectProcessorAndStateStores(name, storeNames); Review comment: Why do we need this? Calling `addStateStore(storeBuilder, name)` should connect the store to the processor already? ########## File path: docs/streams/developer-guide/dsl-api.html ########## @@ -3533,10 +3533,17 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable Foreign-Key <div class="admonition tip"> <p><b>Tip</b></p> <p class="last">Even though we do not demonstrate it in this example, a stream processor can access any available state stores by - calling <code class="docutils literal"><span class="pre">ProcessorContext#getStateStore()</span></code>. Only such state stores are available that (1) have been named in the - corresponding <code class="docutils literal"><span class="pre">KStream#process()</span></code> method call (note that this is a different method than <code class="docutils literal"><span class="pre">Processor#process()</span></code>), - plus (2) all global stores. Note that global stores do not need to be attached explicitly; however, they only - allow for read-only access.</p> + calling <code class="docutils literal"><span class="pre">ProcessorContext#getStateStore()</span></code>. + State stores are only available if they have been connected to the processor, or if they are global stores. While global stores do not need to be connected explicitly, they only allow for read-only access. + There are two ways to connect state stores to a processor: + <ul class="simple"> + <li>By passing the name of a store that has already been added via <code class="docutils literal"><span class="pre">Topology#addStateStore()</span></code> to the corresponding <code class="docutils literal"><span class="pre">KStream#process()</span></code> method call.</li> + <li>Implementing <code class="docutils literal"><span class="pre">ConnectedStoreProvider#stores()</span></code> on the <code class="docutils literal"><span class="pre">ProcessorSupplier</span></code> + passed to <code class="docutils literal"><span class="pre">KStream#process()</span></code>. In this case there is no need to call <code class="docutils literal"><span class="pre">StreamsBuilder#addStateStore()</span></code> + beforehand; the store will be automatically added for you. + </li> Review comment: Should we add a sentence, that `ConnectedStoreProvided` can also be used for `TransformerSupplier` and `ValueTransformerSupplier`? ########## File path: docs/streams/developer-guide/dsl-api.html ########## @@ -3533,10 +3533,17 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable Foreign-Key <div class="admonition tip"> <p><b>Tip</b></p> <p class="last">Even though we do not demonstrate it in this example, a stream processor can access any available state stores by - calling <code class="docutils literal"><span class="pre">ProcessorContext#getStateStore()</span></code>. Only such state stores are available that (1) have been named in the - corresponding <code class="docutils literal"><span class="pre">KStream#process()</span></code> method call (note that this is a different method than <code class="docutils literal"><span class="pre">Processor#process()</span></code>), - plus (2) all global stores. Note that global stores do not need to be attached explicitly; however, they only - allow for read-only access.</p> + calling <code class="docutils literal"><span class="pre">ProcessorContext#getStateStore()</span></code>. + State stores are only available if they have been connected to the processor, or if they are global stores. While global stores do not need to be connected explicitly, they only allow for read-only access. + There are two ways to connect state stores to a processor: + <ul class="simple"> + <li>By passing the name of a store that has already been added via <code class="docutils literal"><span class="pre">Topology#addStateStore()</span></code> to the corresponding <code class="docutils literal"><span class="pre">KStream#process()</span></code> method call.</li> + <li>Implementing <code class="docutils literal"><span class="pre">ConnectedStoreProvider#stores()</span></code> on the <code class="docutils literal"><span class="pre">ProcessorSupplier</span></code> + passed to <code class="docutils literal"><span class="pre">KStream#process()</span></code>. In this case there is no need to call <code class="docutils literal"><span class="pre">StreamsBuilder#addStateStore()</span></code> + beforehand; the store will be automatically added for you. + </li> Review comment: Should we add a sentence, that `ConnectedStoreProvided` can also be used for `TransformerSupplier` and `ValueTransformerSupplier` and `ValueTransformerWithKeySupplier`? ########## File path: docs/streams/developer-guide/dsl-api.html ########## @@ -3533,10 +3533,17 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable Foreign-Key <div class="admonition tip"> <p><b>Tip</b></p> <p class="last">Even though we do not demonstrate it in this example, a stream processor can access any available state stores by - calling <code class="docutils literal"><span class="pre">ProcessorContext#getStateStore()</span></code>. Only such state stores are available that (1) have been named in the - corresponding <code class="docutils literal"><span class="pre">KStream#process()</span></code> method call (note that this is a different method than <code class="docutils literal"><span class="pre">Processor#process()</span></code>), - plus (2) all global stores. Note that global stores do not need to be attached explicitly; however, they only - allow for read-only access.</p> + calling <code class="docutils literal"><span class="pre">ProcessorContext#getStateStore()</span></code>. + State stores are only available if they have been connected to the processor, or if they are global stores. While global stores do not need to be connected explicitly, they only allow for read-only access. + There are two ways to connect state stores to a processor: + <ul class="simple"> + <li>By passing the name of a store that has already been added via <code class="docutils literal"><span class="pre">Topology#addStateStore()</span></code> to the corresponding <code class="docutils literal"><span class="pre">KStream#process()</span></code> method call.</li> + <li>Implementing <code class="docutils literal"><span class="pre">ConnectedStoreProvider#stores()</span></code> on the <code class="docutils literal"><span class="pre">ProcessorSupplier</span></code> + passed to <code class="docutils literal"><span class="pre">KStream#process()</span></code>. In this case there is no need to call <code class="docutils literal"><span class="pre">StreamsBuilder#addStateStore()</span></code> + beforehand; the store will be automatically added for you. + </li> Review comment: Should we add a sentence, that `ConnectedStoreProvided` can also be used for `TransformerSupplier` et al. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org