[GitHub] [kafka] mjsax commented on a change in pull request #6824: KAFKA-7523: Add ConnectedStoreProvider to Processor API

2020-05-26 Thread GitBox


mjsax commented on a change in pull request #6824:
URL: https://github.com/apache/kafka/pull/6824#discussion_r430786602



##
File path: docs/streams/developer-guide/dsl-api.html
##
@@ -3533,10 +3533,17 @@ KTable-KTable 
Foreign-Key
 
 Tip
 Even though we do not demonstrate it in 
this example, a stream processor can access any available state stores by
-calling ProcessorContext#getStateStore().  Only such state 
stores are available that (1) have been named in the
-corresponding KStream#process() method call (note that this is a 
different method than Processor#process()),
-plus (2) all global stores.  Note that global stores 
do not need to be attached explicitly;  however, they only
-allow for read-only access.
+calling ProcessorContext#getStateStore().
+State stores are only available if they have been 
connected to the processor, or if they are global stores.  While global stores 
do not need to be connected explicitly, they only allow for read-only access.
+There are two ways to connect state stores to a 
processor:
+
+By passing the name of a store that has already 
been added via Topology#addStateStore() to the corresponding KStream#process() 
method call.
+Implementing ConnectedStoreProvider#stores() on the ProcessorSupplier
+passed to KStream#process().  In this case there is no need to 
call Topology#addStateStore()

Review comment:
   ```suggestion
   passed to KStream#process().  In this case there is no need to 
call StreamsBuilder#addStateStore()
   ```

##
File path: docs/streams/developer-guide/dsl-api.html
##
@@ -3533,10 +3533,17 @@ KTable-KTable 
Foreign-Key
 
 Tip
 Even though we do not demonstrate it in 
this example, a stream processor can access any available state stores by
-calling ProcessorContext#getStateStore().  Only such state 
stores are available that (1) have been named in the
-corresponding KStream#process() method call (note that this is a 
different method than Processor#process()),
-plus (2) all global stores.  Note that global stores 
do not need to be attached explicitly;  however, they only
-allow for read-only access.
+calling ProcessorContext#getStateStore().
+State stores are only available if they have been 
connected to the processor, or if they are global stores.  While global stores 
do not need to be connected explicitly, they only allow for read-only access.
+There are two ways to connect state stores to a 
processor:
+
+By passing the name of a store that has already 
been added via Topology#addStateStore() to the corresponding KStream#process() 
method call.
+Implementing ConnectedStoreProvider#stores() on the ProcessorSupplier
+passed to KStream#process().  In this case there is no need to 
call Topology#addStateStore()

Review comment:
   This is the DSL guide, so using `StreamsBuilder` seems appropriate.

##
File path: streams/src/main/java/org/apache/kafka/streams/Topology.java
##
@@ -656,6 +660,14 @@ public synchronized Topology addProcessor(final String 
name,
   final ProcessorSupplier supplier,
   final String... parentNames) {
 internalTopologyBuilder.addProcessor(name, supplier, parentNames);
+final Set> stores = supplier.stores();
+if (stores != null) {
+for (final StoreBuilder storeBuilder : stores) {
+internalTopologyBuilder.addStateStore(storeBuilder, name);
+}
+final String[] storeNames = 
stores.stream().map(StoreBuilder::name).toArray(String[]::new);
+internalTopologyBuilder.connectProcessorAndStateStores(name, 
storeNames);

Review comment:
   Why do we need this? Calling `addStateStore(storeBuilder, name)` should 
connect the store to the processor already?

##
File path: docs/streams/developer-guide/dsl-api.html
##
@@ -3533,10 +3533,17 @@ KTable-KTable 
Foreign-Key
 
 Tip
 Even though we do not demonstrate it in 
this example, a stream processor can access any available state stores by
-calling ProcessorContext#getStateStore().  Only such state 
stores are available that (1) have been named in the
-corresponding KStream#process() 

[GitHub] [kafka] mjsax commented on a change in pull request #6824: KAFKA-7523: Add ConnectedStoreProvider to Processor API

2020-05-19 Thread GitBox


mjsax commented on a change in pull request #6824:
URL: https://github.com/apache/kafka/pull/6824#discussion_r427675991



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
##
@@ -700,12 +703,29 @@ public void process(final ProcessorSupplier processorSuppl
 final StatefulProcessorNode processNode = new 
StatefulProcessorNode<>(
 name,
 new ProcessorParameters<>(processorSupplier, name),
-stateStoreNames
+getStoreNamesAndMaybeAddStores(processorSupplier, 
stateStoreNames)
 );
 
 builder.addGraphNode(this.streamsGraphNode, processNode);
 }
 
+/**
+ * Provides store names that should be connected to a {@link 
StatefulProcessorNode}, from two sources:
+ * 1) Store names are provided as arguments to process(...), 
transform(...), etc.
+ * 2) {@link StoreBuilder}s are provided by the 
Processor/TransformerSupplier itself, by returning a set from
+ * {@link ConnectedStoreProvider#stores()}.  The {@link StoreBuilder}s 
will also be added to the topology.
+ */
+private String[] getStoreNamesAndMaybeAddStores(final 
ConnectedStoreProvider storeProvider, final String[] varargsStoreNames) {

Review comment:
   No worries. All good. :)
   
   Btw: the optimization layer was added in 2.0





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #6824: KAFKA-7523: Add ConnectedStoreProvider to Processor API

2020-05-19 Thread GitBox


mjsax commented on a change in pull request #6824:
URL: https://github.com/apache/kafka/pull/6824#discussion_r427675683



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
##
@@ -213,6 +215,48 @@ public void testDrivingStatefulTopology() {
 assertNull(store.get("key4"));
 }
 
+@Test
+public void testDrivingConnectedStateStoreTopology() {
+driver = new 
TopologyTestDriver(createConnectedStateStoreTopology("connectedStore"), props);
+driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", 
"value1"));
+driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", 
"value2"));
+driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", 
"value3"));
+driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", 
"value4"));
+assertNoOutputRecord(OUTPUT_TOPIC_1);
+
+final KeyValueStore store = 
driver.getKeyValueStore("connectedStore");
+assertEquals("value4", store.get("key1"));
+assertEquals("value2", store.get("key2"));
+assertEquals("value3", store.get("key3"));
+assertNull(store.get("key4"));
+}
+
+@Test
+public void testDrivingConnectedStateStoreInDifferentProcessorsTopology() {
+final String storeName = "connectedStore";
+final StoreBuilder> storeBuilder = 
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), 
Serdes.String(), Serdes.String());
+topology
+.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
+.addSource("source2", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_2)
+.addProcessor("processor1", defineWithStores(new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+.addProcessor("processor2", defineWithStores(new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source2")

Review comment:
   I think the simplest fix is, to pass a Java `supplier` a first argument 
to `define()`:
   ```
   defineWithStores(() -> new StatefulProcessor(storeName), 
Collections.singleton(storeBuilder))
   ```
   
   And within `define()`:
   ```
   public Processor get() {
   return supplier.get();
   }
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #6824: KAFKA-7523: Add ConnectedStoreProvider to Processor API

2020-05-19 Thread GitBox


mjsax commented on a change in pull request #6824:
URL: https://github.com/apache/kafka/pull/6824#discussion_r427605702



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
##
@@ -213,6 +215,48 @@ public void testDrivingStatefulTopology() {
 assertNull(store.get("key4"));
 }
 
+@Test
+public void testDrivingConnectedStateStoreTopology() {
+driver = new 
TopologyTestDriver(createConnectedStateStoreTopology("connectedStore"), props);
+driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", 
"value1"));
+driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", 
"value2"));
+driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", 
"value3"));
+driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", 
"value4"));
+assertNoOutputRecord(OUTPUT_TOPIC_1);
+
+final KeyValueStore store = 
driver.getKeyValueStore("connectedStore");
+assertEquals("value4", store.get("key1"));
+assertEquals("value2", store.get("key2"));
+assertEquals("value3", store.get("key3"));
+assertNull(store.get("key4"));
+}
+
+@Test
+public void testDrivingConnectedStateStoreInDifferentProcessorsTopology() {
+final String storeName = "connectedStore";
+final StoreBuilder> storeBuilder = 
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), 
Serdes.String(), Serdes.String());
+topology
+.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
+.addSource("source2", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_2)
+.addProcessor("processor1", defineWithStores(new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
+.addProcessor("processor2", defineWithStores(new 
StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source2")

Review comment:
   Well, `get()` of the returned supplier could be called multiple times, 
thus, would return the name object that is passed into `defineWithStores` each 
time -- this is incorrect though, because each time `get()` is called a new 
object must be returned.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #6824: KAFKA-7523: Add ConnectedStoreProvider to Processor API

2020-05-19 Thread GitBox


mjsax commented on a change in pull request #6824:
URL: https://github.com/apache/kafka/pull/6824#discussion_r427603579



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
##
@@ -700,12 +703,29 @@ public void process(final ProcessorSupplier processorSuppl
 final StatefulProcessorNode processNode = new 
StatefulProcessorNode<>(
 name,
 new ProcessorParameters<>(processorSupplier, name),
-stateStoreNames
+getStoreNamesAndMaybeAddStores(processorSupplier, 
stateStoreNames)
 );
 
 builder.addGraphNode(this.streamsGraphNode, processNode);
 }
 
+/**
+ * Provides store names that should be connected to a {@link 
StatefulProcessorNode}, from two sources:
+ * 1) Store names are provided as arguments to process(...), 
transform(...), etc.
+ * 2) {@link StoreBuilder}s are provided by the 
Processor/TransformerSupplier itself, by returning a set from
+ * {@link ConnectedStoreProvider#stores()}.  The {@link StoreBuilder}s 
will also be added to the topology.
+ */
+private String[] getStoreNamesAndMaybeAddStores(final 
ConnectedStoreProvider storeProvider, final String[] varargsStoreNames) {

Review comment:
   Sorry for not being detailed enough. My bad (re-reading my comment it 
does not really make sense). What I actually meant was:
   
   Within `KStreamImpl` we should not access the `builder` at all. Kafka 
Streams uses a 2-phase translation: in the first phase, we only build up a 
logical representation of the topology (what we call the "stream graph" that 
comprises "stream graph node"). If optimization is enabled, the optimizer would 
potentially modify the "stream graph" before it gets translated into the actual 
topology.
   
   The created `StatefulProcessorNode` is one of those "stream graph nodes" -- 
in its `writeToTopology()` method it will add it's state store to the 
`Topology` and connect the processor to other stores if necessary.
   
   Thus, your code adds the store "too early" and returns a list of store names 
that the processor is later connecting to. Instead, we should modify 
`StatefulProcessorNode` and delay adding the store to the topology until 
`writeToTopology()` is called: in particular `StatefulProcessorNode` should 
have a constructor that accepts both an array of store names it should connect 
to and a `ConnectedStoreProvider`.
   
   Does this clarify?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #6824: KAFKA-7523: Add ConnectedStoreProvider to Processor API

2020-05-13 Thread GitBox


mjsax commented on a change in pull request #6824:
URL: https://github.com/apache/kafka/pull/6824#discussion_r424758763



##
File path: docs/streams/developer-guide/processor-api.html
##
@@ -396,33 +396,52 @@ Connecting 
Processors and State Stores// and the WordCountProcessor node as its upstream 
processor
 .addSink(Sink, sink-topic, Process);
 
+
+Here is a quick explanation of this example:
+
+A source processor node named Source is added to the 
topology using the addSource method, with one Kafka topic
+source-topic fed to it.
+A processor node named Process with the 
pre-defined WordCountProcessor logic is then added as the 
downstream
+processor of the Source node using the addProcessor 
method.
+A predefined persistent key-value state store is added 
and connected to the Process node, using
+countStoreBuilder.
+A sink processor node is then added to complete the 
topology using the addSink method, taking the Process node
+as its upstream processor and writing to a separate 
sink-topic Kafka topic (note that users 
can also use another overloaded variant of addSink
+to dynamically determine the Kafka topic to write to 
for each received record from the upstream processor).
+
+
+In some cases, it may be more convenient to add and 
connect a state store at the same time as you add the processor it is connected 
to to the topology.
+This can be done by implementing ConnectedStoreProvider#stores() on the 
ProcessorSupplier
+in place of calling Topology#addStateStore(), like this:
+
+Topology builder = new 
Topology();
+
+// add the source processor node that takes Kafka topic 
source-topic as input
+builder.addSource(Source, source-topic)
+
+// add the WordCountProcessor node which takes the source 
processor as its upstream processor, along with its state store
+builder.addProcessor("Process", new ProcessorSupplier() { 
public ProcessorString, String get() { return new WordCountProcessor(); 
} public SetStoreBuilder stores() { return countStoreBuilder; } 
});
+
+// add the sink processor node that takes Kafka topic 
sink-topic as output
+// and the WordCountProcessor node as its upstream 
processor
+.addSink(Sink, sink-topic, Process);
+
+This allows for a processor to "own" state stores, 
effectively encapsulating its usage from the user wiring the topology.
+Multiple processors that share a state store may 
provide the same store with this technique, as long as the StoreBuilder is the 
same instance.
+In these topologies, the Process stream processor 
node is considered a downstream processor of the Source node, and an
+upstream processor of the Sink node.  As a result, 
whenever the Source node forwards a newly fetched 
record from
+Kafka to its downstream Process node, the WordCountProcessor#process() method is triggered to 
process the record and
+update the associated state store. Whenever context#forward() is 
called in the
+WordCountProcessor#punctuate() method, the aggregate 
key-value pair will be sent via the Sink processor node to
+the Kafka topic sink-topic.  Note that in the WordCountProcessor 
implementation, you must refer to the
+same store name Counts when accessing the key-value 
store, otherwise an exception will be thrown at runtime,
+indicating that the state store cannot be found. If 
the state store is not associated with the processor
+in the Topology code, accessing it in the processors 
init() method 
will also throw an exception at
+runtime, indicating the state store is not accessible 
from this processor.
+Now that you have fully defined your processor topology 
in your application, you can proceed to
+running the Kafka Streams application.
+

Review comment:
   Meta comment: can you also extend `streams/upgrade-guide.html` -- there 
is a section "public API" changes for the 2.6 release.

##
File path: docs/streams/developer-guide/dsl-api.html
##
@@ -3164,10 +3164,17 @@ 
 
 Tip
 Even though we do not demonstrate it in 
this example, a stream processor can access any available state stores by
-calling ProcessorContext#getStateStore().  Only such state 
stores are available that (1) have been named in the
-