Jozsef, Your question is a little unclear to me.
> To detect lost messages For what topology? >> KTable<String, InboundMsg> inputTable = builder.table("inputTopic", >> Consumed.with(...).filter(...)); The code you show contains a `filter()` that can remove record? Could this be the issue? It's also unclear to me, if the original "inputTopic" is record stream or a changelog stream? Thus, I am not sure if you join idea is the right approach. It would be important to understand the actual topology that produced the output topic. Also note, that some DSL operators (like aggregate() and join()) might drop some records if they have null-key or null-value. Maybe this is the reason for missing data in the output? -Matthias On 6/18/18 2:18 AM, József Molnár wrote: > Hi! > > I have an application which uses an input and output topic, and every > message from the input topic should have a corresponding message (with the > same key) in the output topic. > > To detect lost messages (=no output after a certain amount of time, > ~10days) I tried to use a KTable - KTable left join and check where the > output values are null in the result KTable's state store. > > Sample code: > // Stream setup > StreamsBuilder builder = new StreamsBuilder(); > KTable<String, InboundMsg> inputTable = builder.table("inputTopic", > Consumed.with(...).filter(...)); > KTable<String, OutboundMsg> outputTable = builder.table("outputTopic", > Consumed.with(...)); > > Materialized<String, InboundMsg, KeyValueStore<Bytes, byte[]>> store = > Materialized.<String, InboundMsg, KeyValueStore<Bytes, > byte[]>>as("Store")..; > KTable<String, InboundMsg> joinedTable = inputTable.leftJoin(outputTable, > ValueMapper, store); > > // Read from store > ReadOnlyKeyValueStore<String, InboundMsg> keyValueStore = > streams.store("Store", QueryableStoreTypes.keyValueStore()); > KeyValueIterator<String, InboundMsg> allMsg = keyValueStore.all(); > > Is there any other way to read from the state store and possibly stream it > to a topic? As there can be a couple of million messages int he topics, > reading all of them with an iterator will be not performant enough. > > Thanks, > Jozsef >
signature.asc
Description: OpenPGP digital signature