A KStream-KTable join might also work. > stream.join(table, ...);
-Matthias On 11/29/16 1:21 AM, Eno Thereska wrote: > Hi Simon, > > See if this helps: > > - you can create the KTable with the state store "storeAdminItem" as you > mentioned > - for each element you want to check, you want to query that state store > directly to check if that element is in the state store. You can do the > querying via the Interactive Query APIs (they basically get a handle on the > state store and issue read-only requests to it). > - There is a blog with a detailed example and code in the end here: > https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/ > > <https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/> > > Thanks > Eno > >> On 29 Nov 2016, at 05:51, Simon Teles <ste...@isi.nc> wrote: >> >> Hello all, >> >> I'm not sure how to do the following use-case, if anyone can help :) >> >> - I have an admin UI where the admin can choose wich item are allowed on the >> application >> >> - When the admin choose an item, it pushes an object to a topic kafka : >> test.admin.item with the following object {"id":"1234", "name":"toto"} >> >> - I have a Kafka Stream which is connected to a topic test.item which >> receive all the items update from our DB. >> >> - When the stream receives an item, it needs to check if its allowed by the >> admin. If yes, it saves it in on another DB and pushes a notification on >> another topic if the save on the DB is ok. If not, it does nothing. >> >> My idea is when i start the stream, i "push" all the contents from the topic >> test.admin.item to a state-store and when i receive a new item on test.item, >> i check its id against the state-store. >> >> Is this the proper way ? >> >> My problem is : >> >> -> if i use the TopologyBuilder, i don't know how can i load the topic on a >> state-store at start to after use it on a Processor ? >> >> -> With the KStreamBuilder i can use : >> >> KStreamBuilder builder = new KStreamBuilder(); >> >> // We create a KTable from topic admin.item and load it on the Store >> "AdminItem" >> builder.table(Serdes.String(), new SerdeItem(),"test.admin.item", >> "storeAdminItem"); >> >> -> But with the KStreamBuilder, i don't how can i access the state-store >> when i map/filter/etc ? >> >> I you can help me figure it out, it would be much appreciated. >> >> Thanks, >> >> Regards, >> > >
signature.asc
Description: OpenPGP digital signature