[GitHub] [kafka] mjsax commented on a change in pull request #6824: KAFKA-7523: Add ConnectedStoreProvider to Processor API
mjsax commented on a change in pull request #6824: URL: https://github.com/apache/kafka/pull/6824#discussion_r430786602 ## File path: docs/streams/developer-guide/dsl-api.html ## @@ -3533,10 +3533,17 @@ KTable-KTable Foreign-Key Tip Even though we do not demonstrate it in this example, a stream processor can access any available state stores by -calling ProcessorContext#getStateStore(). Only such state stores are available that (1) have been named in the -corresponding KStream#process() method call (note that this is a different method than Processor#process()), -plus (2) all global stores. Note that global stores do not need to be attached explicitly; however, they only -allow for read-only access. +calling ProcessorContext#getStateStore(). +State stores are only available if they have been connected to the processor, or if they are global stores. While global stores do not need to be connected explicitly, they only allow for read-only access. +There are two ways to connect state stores to a processor: + +By passing the name of a store that has already been added via Topology#addStateStore() to the corresponding KStream#process() method call. +Implementing ConnectedStoreProvider#stores() on the ProcessorSupplier +passed to KStream#process(). In this case there is no need to call Topology#addStateStore() Review comment: ```suggestion passed to KStream#process(). In this case there is no need to call StreamsBuilder#addStateStore() ``` ## File path: docs/streams/developer-guide/dsl-api.html ## @@ -3533,10 +3533,17 @@ KTable-KTable Foreign-Key Tip Even though we do not demonstrate it in this example, a stream processor can access any available state stores by -calling ProcessorContext#getStateStore(). Only such state stores are available that (1) have been named in the -corresponding KStream#process() method call (note that this is a different method than Processor#process()), -plus (2) all global stores. Note that global stores do not need to be attached explicitly; however, they only -allow for read-only access. +calling ProcessorContext#getStateStore(). +State stores are only available if they have been connected to the processor, or if they are global stores. While global stores do not need to be connected explicitly, they only allow for read-only access. +There are two ways to connect state stores to a processor: + +By passing the name of a store that has already been added via Topology#addStateStore() to the corresponding KStream#process() method call. +Implementing ConnectedStoreProvider#stores() on the ProcessorSupplier +passed to KStream#process(). In this case there is no need to call Topology#addStateStore() Review comment: This is the DSL guide, so using `StreamsBuilder` seems appropriate. ## File path: streams/src/main/java/org/apache/kafka/streams/Topology.java ## @@ -656,6 +660,14 @@ public synchronized Topology addProcessor(final String name, final ProcessorSupplier supplier, final String... parentNames) { internalTopologyBuilder.addProcessor(name, supplier, parentNames); +final Set> stores = supplier.stores(); +if (stores != null) { +for (final StoreBuilder storeBuilder : stores) { +internalTopologyBuilder.addStateStore(storeBuilder, name); +} +final String[] storeNames = stores.stream().map(StoreBuilder::name).toArray(String[]::new); +internalTopologyBuilder.connectProcessorAndStateStores(name, storeNames); Review comment: Why do we need this? Calling `addStateStore(storeBuilder, name)` should connect the store to the processor already? ## File path: docs/streams/developer-guide/dsl-api.html ## @@ -3533,10 +3533,17 @@ KTable-KTable Foreign-Key Tip Even though we do not demonstrate it in this example, a stream processor can access any available state stores by -calling ProcessorContext#getStateStore(). Only such state stores are available that (1) have been named in the -corresponding KStream#process()
[GitHub] [kafka] mjsax commented on a change in pull request #6824: KAFKA-7523: Add ConnectedStoreProvider to Processor API
mjsax commented on a change in pull request #6824: URL: https://github.com/apache/kafka/pull/6824#discussion_r427675991 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ## @@ -700,12 +703,29 @@ public void process(final ProcessorSupplier processorSuppl final StatefulProcessorNode processNode = new StatefulProcessorNode<>( name, new ProcessorParameters<>(processorSupplier, name), -stateStoreNames +getStoreNamesAndMaybeAddStores(processorSupplier, stateStoreNames) ); builder.addGraphNode(this.streamsGraphNode, processNode); } +/** + * Provides store names that should be connected to a {@link StatefulProcessorNode}, from two sources: + * 1) Store names are provided as arguments to process(...), transform(...), etc. + * 2) {@link StoreBuilder}s are provided by the Processor/TransformerSupplier itself, by returning a set from + * {@link ConnectedStoreProvider#stores()}. The {@link StoreBuilder}s will also be added to the topology. + */ +private String[] getStoreNamesAndMaybeAddStores(final ConnectedStoreProvider storeProvider, final String[] varargsStoreNames) { Review comment: No worries. All good. :) Btw: the optimization layer was added in 2.0 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #6824: KAFKA-7523: Add ConnectedStoreProvider to Processor API
mjsax commented on a change in pull request #6824: URL: https://github.com/apache/kafka/pull/6824#discussion_r427675683 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java ## @@ -213,6 +215,48 @@ public void testDrivingStatefulTopology() { assertNull(store.get("key4")); } +@Test +public void testDrivingConnectedStateStoreTopology() { +driver = new TopologyTestDriver(createConnectedStateStoreTopology("connectedStore"), props); +driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1")); +driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2")); +driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3")); +driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value4")); +assertNoOutputRecord(OUTPUT_TOPIC_1); + +final KeyValueStore store = driver.getKeyValueStore("connectedStore"); +assertEquals("value4", store.get("key1")); +assertEquals("value2", store.get("key2")); +assertEquals("value3", store.get("key3")); +assertNull(store.get("key4")); +} + +@Test +public void testDrivingConnectedStateStoreInDifferentProcessorsTopology() { +final String storeName = "connectedStore"; +final StoreBuilder> storeBuilder = Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), Serdes.String(), Serdes.String()); +topology +.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) +.addSource("source2", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_2) +.addProcessor("processor1", defineWithStores(new StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1") +.addProcessor("processor2", defineWithStores(new StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source2") Review comment: I think the simplest fix is, to pass a Java `supplier` a first argument to `define()`: ``` defineWithStores(() -> new StatefulProcessor(storeName), Collections.singleton(storeBuilder)) ``` And within `define()`: ``` public Processor get() { return supplier.get(); } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #6824: KAFKA-7523: Add ConnectedStoreProvider to Processor API
mjsax commented on a change in pull request #6824: URL: https://github.com/apache/kafka/pull/6824#discussion_r427605702 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java ## @@ -213,6 +215,48 @@ public void testDrivingStatefulTopology() { assertNull(store.get("key4")); } +@Test +public void testDrivingConnectedStateStoreTopology() { +driver = new TopologyTestDriver(createConnectedStateStoreTopology("connectedStore"), props); +driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1")); +driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2")); +driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3")); +driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value4")); +assertNoOutputRecord(OUTPUT_TOPIC_1); + +final KeyValueStore store = driver.getKeyValueStore("connectedStore"); +assertEquals("value4", store.get("key1")); +assertEquals("value2", store.get("key2")); +assertEquals("value3", store.get("key3")); +assertNull(store.get("key4")); +} + +@Test +public void testDrivingConnectedStateStoreInDifferentProcessorsTopology() { +final String storeName = "connectedStore"; +final StoreBuilder> storeBuilder = Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), Serdes.String(), Serdes.String()); +topology +.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) +.addSource("source2", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_2) +.addProcessor("processor1", defineWithStores(new StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1") +.addProcessor("processor2", defineWithStores(new StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source2") Review comment: Well, `get()` of the returned supplier could be called multiple times, thus, would return the name object that is passed into `defineWithStores` each time -- this is incorrect though, because each time `get()` is called a new object must be returned. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #6824: KAFKA-7523: Add ConnectedStoreProvider to Processor API
mjsax commented on a change in pull request #6824: URL: https://github.com/apache/kafka/pull/6824#discussion_r427603579 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ## @@ -700,12 +703,29 @@ public void process(final ProcessorSupplier processorSuppl final StatefulProcessorNode processNode = new StatefulProcessorNode<>( name, new ProcessorParameters<>(processorSupplier, name), -stateStoreNames +getStoreNamesAndMaybeAddStores(processorSupplier, stateStoreNames) ); builder.addGraphNode(this.streamsGraphNode, processNode); } +/** + * Provides store names that should be connected to a {@link StatefulProcessorNode}, from two sources: + * 1) Store names are provided as arguments to process(...), transform(...), etc. + * 2) {@link StoreBuilder}s are provided by the Processor/TransformerSupplier itself, by returning a set from + * {@link ConnectedStoreProvider#stores()}. The {@link StoreBuilder}s will also be added to the topology. + */ +private String[] getStoreNamesAndMaybeAddStores(final ConnectedStoreProvider storeProvider, final String[] varargsStoreNames) { Review comment: Sorry for not being detailed enough. My bad (re-reading my comment it does not really make sense). What I actually meant was: Within `KStreamImpl` we should not access the `builder` at all. Kafka Streams uses a 2-phase translation: in the first phase, we only build up a logical representation of the topology (what we call the "stream graph" that comprises "stream graph node"). If optimization is enabled, the optimizer would potentially modify the "stream graph" before it gets translated into the actual topology. The created `StatefulProcessorNode` is one of those "stream graph nodes" -- in its `writeToTopology()` method it will add it's state store to the `Topology` and connect the processor to other stores if necessary. Thus, your code adds the store "too early" and returns a list of store names that the processor is later connecting to. Instead, we should modify `StatefulProcessorNode` and delay adding the store to the topology until `writeToTopology()` is called: in particular `StatefulProcessorNode` should have a constructor that accepts both an array of store names it should connect to and a `ConnectedStoreProvider`. Does this clarify? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #6824: KAFKA-7523: Add ConnectedStoreProvider to Processor API
mjsax commented on a change in pull request #6824: URL: https://github.com/apache/kafka/pull/6824#discussion_r424758763 ## File path: docs/streams/developer-guide/processor-api.html ## @@ -396,33 +396,52 @@ Connecting Processors and State Stores// and the WordCountProcessor node as its upstream processor .addSink(Sink, sink-topic, Process); + +Here is a quick explanation of this example: + +A source processor node named Source is added to the topology using the addSource method, with one Kafka topic +source-topic fed to it. +A processor node named Process with the pre-defined WordCountProcessor logic is then added as the downstream +processor of the Source node using the addProcessor method. +A predefined persistent key-value state store is added and connected to the Process node, using +countStoreBuilder. +A sink processor node is then added to complete the topology using the addSink method, taking the Process node +as its upstream processor and writing to a separate sink-topic Kafka topic (note that users can also use another overloaded variant of addSink +to dynamically determine the Kafka topic to write to for each received record from the upstream processor). + + +In some cases, it may be more convenient to add and connect a state store at the same time as you add the processor it is connected to to the topology. +This can be done by implementing ConnectedStoreProvider#stores() on the ProcessorSupplier +in place of calling Topology#addStateStore(), like this: + +Topology builder = new Topology(); + +// add the source processor node that takes Kafka topic source-topic as input +builder.addSource(Source, source-topic) + +// add the WordCountProcessor node which takes the source processor as its upstream processor, along with its state store +builder.addProcessor("Process", new ProcessorSupplier() { public ProcessorString, String get() { return new WordCountProcessor(); } public SetStoreBuilder stores() { return countStoreBuilder; } }); + +// add the sink processor node that takes Kafka topic sink-topic as output +// and the WordCountProcessor node as its upstream processor +.addSink(Sink, sink-topic, Process); + +This allows for a processor to "own" state stores, effectively encapsulating its usage from the user wiring the topology. +Multiple processors that share a state store may provide the same store with this technique, as long as the StoreBuilder is the same instance. +In these topologies, the Process stream processor node is considered a downstream processor of the Source node, and an +upstream processor of the Sink node. As a result, whenever the Source node forwards a newly fetched record from +Kafka to its downstream Process node, the WordCountProcessor#process() method is triggered to process the record and +update the associated state store. Whenever context#forward() is called in the +WordCountProcessor#punctuate() method, the aggregate key-value pair will be sent via the Sink processor node to +the Kafka topic sink-topic. Note that in the WordCountProcessor implementation, you must refer to the +same store name Counts when accessing the key-value store, otherwise an exception will be thrown at runtime, +indicating that the state store cannot be found. If the state store is not associated with the processor +in the Topology code, accessing it in the processors init() method will also throw an exception at +runtime, indicating the state store is not accessible from this processor. +Now that you have fully defined your processor topology in your application, you can proceed to +running the Kafka Streams application. + Review comment: Meta comment: can you also extend `streams/upgrade-guide.html` -- there is a section "public API" changes for the 2.6 release. ## File path: docs/streams/developer-guide/dsl-api.html ## @@ -3164,10 +3164,17 @@ Tip Even though we do not demonstrate it in this example, a stream processor can access any available state stores by -calling ProcessorContext#getStateStore(). Only such state stores are available that (1) have been named in the -