Jozsef,

Your question is a little unclear to me.

> To detect lost messages 

For what topology?

>> KTable<String, InboundMsg> inputTable = builder.table("inputTopic",
>> Consumed.with(...).filter(...));

The code you show contains a `filter()` that can remove record? Could
this be the issue?

It's also unclear to me, if the original "inputTopic" is record stream
or a changelog stream? Thus, I am not sure if you join idea is the right
approach. It would be important to understand the actual topology that
produced the output topic.

Also note, that some DSL operators (like aggregate() and join()) might
drop some records if they have null-key or null-value. Maybe this is the
reason for missing data in the output?


-Matthias


On 6/18/18 2:18 AM, József Molnár wrote:
> Hi!
> 
> I have an application which uses an input and output topic, and every
> message from the input topic should have a corresponding message (with the
> same key) in the output topic.
> 
> To detect lost messages (=no output after a certain amount of time,
> ~10days) I tried to use a KTable - KTable left join and check where the
> output values are null in the result KTable's state store.
> 
> Sample code:
> // Stream setup
> StreamsBuilder builder = new StreamsBuilder();
> KTable<String, InboundMsg> inputTable = builder.table("inputTopic",
> Consumed.with(...).filter(...));
> KTable<String, OutboundMsg> outputTable = builder.table("outputTopic",
> Consumed.with(...));
> 
> Materialized<String, InboundMsg, KeyValueStore<Bytes, byte[]>> store =
>         Materialized.<String, InboundMsg, KeyValueStore<Bytes,
> byte[]>>as("Store")..;
> KTable<String, InboundMsg> joinedTable = inputTable.leftJoin(outputTable,
> ValueMapper, store);
> 
> // Read from store
> ReadOnlyKeyValueStore<String, InboundMsg> keyValueStore =
> streams.store("Store", QueryableStoreTypes.keyValueStore());
> KeyValueIterator<String, InboundMsg> allMsg = keyValueStore.all();
> 
> Is there any other way to read from the state store and possibly stream it
> to a topic? As there can be a couple of million messages int he topics,
> reading all of them with an iterator will be not performant enough.
> 
> Thanks,
> Jozsef
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to