Hello,

We're building a JSON decorator using Kafka Streams' processing API.

The process is briefly that a piece of JSON should be consumed from an input 
topic (keys are null, value is the JSON). The JSON contains a field (e.g. 
"thisField") with a value (e.g. "someLink") . This value (and a timestamp) is 
used to look-up another piece JSON from a key-value topic (keys are all the 
different values of "thisField", values are JSON). This key-value topic is 
created by another service in Kafka. This additional piece of JSON then gets 
appended to the input JSON and the result gets written to an output topic (keys 
are null, value is now the original JSON + lookup JSON).

To do the query against a key-value store, ideally I want Kafka Streams to 
directly create and update a window key-value store in memory (or disk) from my 
key-value topic in Kafka, but I am unable to find a way to specify this through 
the StoreBuilder interface. Does anybody know how to do this?
Here is my current Storebuilder code snippet:
StoreBuilder<WindowStore<String, String>> storeBuilder = 
Stores.windowStoreBuilder(
                            Stores.persistentWindowStore("loopkupStore", 
Duration.ofDays(14600), Duration.ofDays(14600), false),
                            Serdes.String(),
                            Serdes.String());
                    storeBuilder.build();


Currently my workaround is to have a sink for the key-value store and then 
create/update this key-value store using a node in the processing topology, but 
this has issues when restarting the service, i.e. when the service is 
restarted, the key-value store topic needs to be consumed from the start to 
rebuild the store in memory, but the sink would have written commit offsets 
which prevents the topic to be consumed from the start. I also cannot use 
streams.cleanUp() as this will reset all the sinks in my topology (y other sink 
ingests records from the input topic).

Thanks

Pirow Engelbrecht
System Engineer

E. 
pirow.engelbre...@etion.co.za<file:///C:/Users/adm_rudolph/Desktop/em...@etion.co.za>
T. +27 12 678 9740 (ext. 9879)
M. +27 63 148 3376

76 Regency Drive | Irene | Centurion | 0157<https://goo.gl/maps/v9ZbwjqpPyL2>
www.etion.co.za<https://www.parsec.co.za/>


[cid:image001.jpg@01D67637.6A057630]<https://www.parsec.co.za/>


Facebook<https://www.facebook.com/Etion-Limited-2194612947433812?_rdc=1&_rdr> | 
YouTube<https://www.youtube.com/channel/UCUY-5oeACtLk2uTsEjZCU6A> | 
LinkedIn<https://www.linkedin.com/company/etionltd> | 
Twitter<https://twitter.com/Etionlimited> | 
Instagram<https://www.instagram.com/Etionlimited/>


Reply via email to