Hi Simon, See if this helps:
- you can create the KTable with the state store "storeAdminItem" as you mentioned - for each element you want to check, you want to query that state store directly to check if that element is in the state store. You can do the querying via the Interactive Query APIs (they basically get a handle on the state store and issue read-only requests to it). - There is a blog with a detailed example and code in the end here: https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/ <https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/> Thanks Eno > On 29 Nov 2016, at 05:51, Simon Teles <ste...@isi.nc> wrote: > > Hello all, > > I'm not sure how to do the following use-case, if anyone can help :) > > - I have an admin UI where the admin can choose wich item are allowed on the > application > > - When the admin choose an item, it pushes an object to a topic kafka : > test.admin.item with the following object {"id":"1234", "name":"toto"} > > - I have a Kafka Stream which is connected to a topic test.item which receive > all the items update from our DB. > > - When the stream receives an item, it needs to check if its allowed by the > admin. If yes, it saves it in on another DB and pushes a notification on > another topic if the save on the DB is ok. If not, it does nothing. > > My idea is when i start the stream, i "push" all the contents from the topic > test.admin.item to a state-store and when i receive a new item on test.item, > i check its id against the state-store. > > Is this the proper way ? > > My problem is : > > -> if i use the TopologyBuilder, i don't know how can i load the topic on a > state-store at start to after use it on a Processor ? > > -> With the KStreamBuilder i can use : > > KStreamBuilder builder = new KStreamBuilder(); > > // We create a KTable from topic admin.item and load it on the Store > "AdminItem" > builder.table(Serdes.String(), new SerdeItem(),"test.admin.item", > "storeAdminItem"); > > -> But with the KStreamBuilder, i don't how can i access the state-store when > i map/filter/etc ? > > I you can help me figure it out, it would be much appreciated. > > Thanks, > > Regards, >