mjsax commented on a change in pull request #6824:
URL: https://github.com/apache/kafka/pull/6824#discussion_r430786602



##########
File path: docs/streams/developer-guide/dsl-api.html
##########
@@ -3533,10 +3533,17 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable 
Foreign-Key
                 <div class="admonition tip">
                     <p><b>Tip</b></p>
                     <p class="last">Even though we do not demonstrate it in 
this example, a stream processor can access any available state stores by
-                        calling <code class="docutils literal"><span 
class="pre">ProcessorContext#getStateStore()</span></code>.  Only such state 
stores are available that (1) have been named in the
-                        corresponding <code class="docutils literal"><span 
class="pre">KStream#process()</span></code> method call (note that this is a 
different method than <code class="docutils literal"><span 
class="pre">Processor#process()</span></code>),
-                        plus (2) all global stores.  Note that global stores 
do not need to be attached explicitly;  however, they only
-                        allow for read-only access.</p>
+                        calling <code class="docutils literal"><span 
class="pre">ProcessorContext#getStateStore()</span></code>.
+                        State stores are only available if they have been 
connected to the processor, or if they are global stores.  While global stores 
do not need to be connected explicitly, they only allow for read-only access.
+                        There are two ways to connect state stores to a 
processor:
+                    <ul class="simple">
+                        <li>By passing the name of a store that has already 
been added via <code class="docutils literal"><span 
class="pre">Topology#addStateStore()</span></code> to the corresponding <code 
class="docutils literal"><span class="pre">KStream#process()</span></code> 
method call.</li>
+                        <li>Implementing <code class="docutils literal"><span 
class="pre">ConnectedStoreProvider#stores()</span></code> on the <code 
class="docutils literal"><span class="pre">ProcessorSupplier</span></code>
+                            passed to <code class="docutils literal"><span 
class="pre">KStream#process()</span></code>.  In this case there is no need to 
call <code class="docutils literal"><span 
class="pre">Topology#addStateStore()</span></code>

Review comment:
       ```suggestion
                               passed to <code class="docutils literal"><span 
class="pre">KStream#process()</span></code>.  In this case there is no need to 
call <code class="docutils literal"><span 
class="pre">StreamsBuilder#addStateStore()</span></code>
   ```

##########
File path: docs/streams/developer-guide/dsl-api.html
##########
@@ -3533,10 +3533,17 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable 
Foreign-Key
                 <div class="admonition tip">
                     <p><b>Tip</b></p>
                     <p class="last">Even though we do not demonstrate it in 
this example, a stream processor can access any available state stores by
-                        calling <code class="docutils literal"><span 
class="pre">ProcessorContext#getStateStore()</span></code>.  Only such state 
stores are available that (1) have been named in the
-                        corresponding <code class="docutils literal"><span 
class="pre">KStream#process()</span></code> method call (note that this is a 
different method than <code class="docutils literal"><span 
class="pre">Processor#process()</span></code>),
-                        plus (2) all global stores.  Note that global stores 
do not need to be attached explicitly;  however, they only
-                        allow for read-only access.</p>
+                        calling <code class="docutils literal"><span 
class="pre">ProcessorContext#getStateStore()</span></code>.
+                        State stores are only available if they have been 
connected to the processor, or if they are global stores.  While global stores 
do not need to be connected explicitly, they only allow for read-only access.
+                        There are two ways to connect state stores to a 
processor:
+                    <ul class="simple">
+                        <li>By passing the name of a store that has already 
been added via <code class="docutils literal"><span 
class="pre">Topology#addStateStore()</span></code> to the corresponding <code 
class="docutils literal"><span class="pre">KStream#process()</span></code> 
method call.</li>
+                        <li>Implementing <code class="docutils literal"><span 
class="pre">ConnectedStoreProvider#stores()</span></code> on the <code 
class="docutils literal"><span class="pre">ProcessorSupplier</span></code>
+                            passed to <code class="docutils literal"><span 
class="pre">KStream#process()</span></code>.  In this case there is no need to 
call <code class="docutils literal"><span 
class="pre">Topology#addStateStore()</span></code>

Review comment:
       This is the DSL guide, so using `StreamsBuilder` seems appropriate.

##########
File path: streams/src/main/java/org/apache/kafka/streams/Topology.java
##########
@@ -656,6 +660,14 @@ public synchronized Topology addProcessor(final String 
name,
                                               final ProcessorSupplier supplier,
                                               final String... parentNames) {
         internalTopologyBuilder.addProcessor(name, supplier, parentNames);
+        final Set<StoreBuilder<?>> stores = supplier.stores();
+        if (stores != null) {
+            for (final StoreBuilder storeBuilder : stores) {
+                internalTopologyBuilder.addStateStore(storeBuilder, name);
+            }
+            final String[] storeNames = 
stores.stream().map(StoreBuilder::name).toArray(String[]::new);
+            internalTopologyBuilder.connectProcessorAndStateStores(name, 
storeNames);

Review comment:
       Why do we need this? Calling `addStateStore(storeBuilder, name)` should 
connect the store to the processor already?

##########
File path: docs/streams/developer-guide/dsl-api.html
##########
@@ -3533,10 +3533,17 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable 
Foreign-Key
                 <div class="admonition tip">
                     <p><b>Tip</b></p>
                     <p class="last">Even though we do not demonstrate it in 
this example, a stream processor can access any available state stores by
-                        calling <code class="docutils literal"><span 
class="pre">ProcessorContext#getStateStore()</span></code>.  Only such state 
stores are available that (1) have been named in the
-                        corresponding <code class="docutils literal"><span 
class="pre">KStream#process()</span></code> method call (note that this is a 
different method than <code class="docutils literal"><span 
class="pre">Processor#process()</span></code>),
-                        plus (2) all global stores.  Note that global stores 
do not need to be attached explicitly;  however, they only
-                        allow for read-only access.</p>
+                        calling <code class="docutils literal"><span 
class="pre">ProcessorContext#getStateStore()</span></code>.
+                        State stores are only available if they have been 
connected to the processor, or if they are global stores.  While global stores 
do not need to be connected explicitly, they only allow for read-only access.
+                        There are two ways to connect state stores to a 
processor:
+                    <ul class="simple">
+                        <li>By passing the name of a store that has already 
been added via <code class="docutils literal"><span 
class="pre">Topology#addStateStore()</span></code> to the corresponding <code 
class="docutils literal"><span class="pre">KStream#process()</span></code> 
method call.</li>
+                        <li>Implementing <code class="docutils literal"><span 
class="pre">ConnectedStoreProvider#stores()</span></code> on the <code 
class="docutils literal"><span class="pre">ProcessorSupplier</span></code>
+                            passed to <code class="docutils literal"><span 
class="pre">KStream#process()</span></code>.  In this case there is no need to 
call <code class="docutils literal"><span 
class="pre">StreamsBuilder#addStateStore()</span></code>
+                            beforehand; the store will be automatically added 
for you.
+                        </li>

Review comment:
       Should we add a sentence, that `ConnectedStoreProvided` can also be used 
for `TransformerSupplier` and `ValueTransformerSupplier`?

##########
File path: docs/streams/developer-guide/dsl-api.html
##########
@@ -3533,10 +3533,17 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable 
Foreign-Key
                 <div class="admonition tip">
                     <p><b>Tip</b></p>
                     <p class="last">Even though we do not demonstrate it in 
this example, a stream processor can access any available state stores by
-                        calling <code class="docutils literal"><span 
class="pre">ProcessorContext#getStateStore()</span></code>.  Only such state 
stores are available that (1) have been named in the
-                        corresponding <code class="docutils literal"><span 
class="pre">KStream#process()</span></code> method call (note that this is a 
different method than <code class="docutils literal"><span 
class="pre">Processor#process()</span></code>),
-                        plus (2) all global stores.  Note that global stores 
do not need to be attached explicitly;  however, they only
-                        allow for read-only access.</p>
+                        calling <code class="docutils literal"><span 
class="pre">ProcessorContext#getStateStore()</span></code>.
+                        State stores are only available if they have been 
connected to the processor, or if they are global stores.  While global stores 
do not need to be connected explicitly, they only allow for read-only access.
+                        There are two ways to connect state stores to a 
processor:
+                    <ul class="simple">
+                        <li>By passing the name of a store that has already 
been added via <code class="docutils literal"><span 
class="pre">Topology#addStateStore()</span></code> to the corresponding <code 
class="docutils literal"><span class="pre">KStream#process()</span></code> 
method call.</li>
+                        <li>Implementing <code class="docutils literal"><span 
class="pre">ConnectedStoreProvider#stores()</span></code> on the <code 
class="docutils literal"><span class="pre">ProcessorSupplier</span></code>
+                            passed to <code class="docutils literal"><span 
class="pre">KStream#process()</span></code>.  In this case there is no need to 
call <code class="docutils literal"><span 
class="pre">StreamsBuilder#addStateStore()</span></code>
+                            beforehand; the store will be automatically added 
for you.
+                        </li>

Review comment:
       Should we add a sentence, that `ConnectedStoreProvided` can also be used 
for `TransformerSupplier` and `ValueTransformerSupplier` and 
`ValueTransformerWithKeySupplier`?

##########
File path: docs/streams/developer-guide/dsl-api.html
##########
@@ -3533,10 +3533,17 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable 
Foreign-Key
                 <div class="admonition tip">
                     <p><b>Tip</b></p>
                     <p class="last">Even though we do not demonstrate it in 
this example, a stream processor can access any available state stores by
-                        calling <code class="docutils literal"><span 
class="pre">ProcessorContext#getStateStore()</span></code>.  Only such state 
stores are available that (1) have been named in the
-                        corresponding <code class="docutils literal"><span 
class="pre">KStream#process()</span></code> method call (note that this is a 
different method than <code class="docutils literal"><span 
class="pre">Processor#process()</span></code>),
-                        plus (2) all global stores.  Note that global stores 
do not need to be attached explicitly;  however, they only
-                        allow for read-only access.</p>
+                        calling <code class="docutils literal"><span 
class="pre">ProcessorContext#getStateStore()</span></code>.
+                        State stores are only available if they have been 
connected to the processor, or if they are global stores.  While global stores 
do not need to be connected explicitly, they only allow for read-only access.
+                        There are two ways to connect state stores to a 
processor:
+                    <ul class="simple">
+                        <li>By passing the name of a store that has already 
been added via <code class="docutils literal"><span 
class="pre">Topology#addStateStore()</span></code> to the corresponding <code 
class="docutils literal"><span class="pre">KStream#process()</span></code> 
method call.</li>
+                        <li>Implementing <code class="docutils literal"><span 
class="pre">ConnectedStoreProvider#stores()</span></code> on the <code 
class="docutils literal"><span class="pre">ProcessorSupplier</span></code>
+                            passed to <code class="docutils literal"><span 
class="pre">KStream#process()</span></code>.  In this case there is no need to 
call <code class="docutils literal"><span 
class="pre">StreamsBuilder#addStateStore()</span></code>
+                            beforehand; the store will be automatically added 
for you.
+                        </li>

Review comment:
       Should we add a sentence, that `ConnectedStoreProvided` can also be used 
for `TransformerSupplier` et al.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to