Hi Alex, Thanks for the quick response. What I have is around 8 streams branched from a single stream, that down the line again gets joined into 1. Now each branched stream can have duplicates and when joining all this data I just have kind of endless tuples of data.
So what I was thinking what if I can actually remove all the duplicates right at start then I will have manageable data to do the joins. So I checked the code, and wanted to know how can this be inserted into existing pipeline. Basically my current code was something like this: Properties props = new Properties(); .... final StreamsBuilder builder = new StreamsBuilder(); final KStream<K, V> input = builder.stream("input-topic"); input.... //my pipeline starts ..... final Topology topology = builder.build(props); final KafkaStreams streams = new KafkaStreams(topology, props); ...... streams.start(); This will change to: Properties props = new Properties(); .... final StoreBuilder<WindowStore<K, V>> dedupStoreBuilder = ..... ..... final KStream<K, V> input = builder.stream("input-topic"); final KStream<K, V> deduplicated = input.transform(() -> new DeduplicationTransformer<>(windowSize.toMillis(), (key, value) -> value)); deduplicated.... //my existing pipeline ..... //rest same as before streams.start(); Let me know if I got this right. Thanks Sachin On Tue, Dec 10, 2019 at 6:59 PM Alex Brekken <brek...@gmail.com> wrote: > Hi Sachin, is your goal to prevent any records with a duplicate key from > ever getting sent downstream? The KTable you have in your example will of > course have the most recent record for a given key, but it will still emit > updates. So if key "A" arrives a second time (with no change to the > value), it will still emitted. (depending on how rapidly you get duplicate > events, some might get removed by internal caching but you will still > likely get at least 1 of those duplicates sent further downstream through > the topology) Take a look at this example from Confluent to see if it > would work for your case: > > https://github.com/confluentinc/kafka-streams-examples/blob/5.3.1-post/src/test/java/io/confluent/examples/streams/EventDeduplicationLambdaIntegrationTest.java > . > > > Also, what is the reason for wanting to eliminate duplicates? Do you have > downstream aggregators that you don't want to incorrectly count duplicated > events? > > Alex > > On Tue, Dec 10, 2019 at 7:05 AM Sachin Mittal <sjmit...@gmail.com> wrote: > > > Hi, > > I am using streams and I get messages like: (K, V) > > (A, a), (B, b), (C, c), (A, a), (C, c), (A, a) ..... > > I wanted to define a topology which would filter out duplicate messages > > from upstream. > > > > I want to know if this is possible? > > The code I have written to do this is something like this: > > > > source.groupBy((k, v) -> new Key(k, v)) > > .reduce((av, nv) -> nv) > > .toStream() > > > > So basically I create a new key which is combination of existing (k,v). > > Then I group by it and reduce it to a table to just store the final > value. > > Finally I convert that to a stream to be used downstream. > > > > My question is is that would this logic work? > > Like if I get another message (A, a) it will basically replace the > existing > > (A, a) in the table and no new message would get appended to the > resulting > > stream. > > > > Is my understanding correct? > > > > If not then is there any other way to achieve this? > > > > Thanks > > Sachin > > >