A KStream-KTable join might also work.

> stream.join(table, ...);


-Matthias

On 11/29/16 1:21 AM, Eno Thereska wrote:
> Hi Simon,
> 
> See if this helps:
> 
> - you can create the KTable with the state store "storeAdminItem" as you 
> mentioned
> - for each element you want to check, you want to query that state store 
> directly to check if that element is in the state store. You can do the 
> querying via the Interactive Query APIs (they basically get a handle on the 
> state store and issue read-only requests to it). 
> - There is a blog with a detailed example and code in the end here: 
> https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/
>  
> <https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>
> 
> Thanks
> Eno
> 
>> On 29 Nov 2016, at 05:51, Simon Teles <ste...@isi.nc> wrote:
>>
>> Hello all,
>>
>> I'm not sure how to do the following use-case, if anyone can help :)
>>
>> - I have an admin UI where the admin can choose wich item are allowed on the 
>> application
>>
>> - When the admin choose an item, it pushes an object to a topic kafka : 
>> test.admin.item with the following object {"id":"1234", "name":"toto"}
>>
>> - I have a Kafka Stream which is connected to a topic test.item which 
>> receive all the items update from our DB.
>>
>> - When the stream receives an item, it needs to check if its allowed by the 
>> admin. If yes, it saves it in on another DB and pushes a notification on 
>> another topic if the save on the DB is ok. If not, it does nothing.
>>
>> My idea is when i start the stream, i "push" all the contents from the topic 
>> test.admin.item to a state-store and when i receive a new item on test.item, 
>> i check its id against the state-store.
>>
>> Is this the proper way ?
>>
>> My problem is :
>>
>> -> if i use the TopologyBuilder, i don't know how can i load the topic on a 
>> state-store at start to after use it on a Processor ?
>>
>> -> With the KStreamBuilder i can use :
>>
>> KStreamBuilder builder = new KStreamBuilder();
>>
>> // We create a KTable from topic admin.item and load it on the Store 
>> "AdminItem"
>> builder.table(Serdes.String(), new SerdeItem(),"test.admin.item", 
>> "storeAdminItem");
>>
>> -> But with the KStreamBuilder, i don't how can i access the state-store 
>> when i map/filter/etc ?
>>
>> I you can help me figure it out, it would be much appreciated.
>>
>> Thanks,
>>
>> Regards,
>>
> 
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to