Case closed, behaviour is actually as expected. - The source topic contains multiplied data that gets propagated into the join just as it should. I'm leveraging a stream processor for deduplication now.
Best wishes Karsten Vikram Singh <vikram.si...@clouzersolutions.com.invalid> schrieb am Fr., 23. Feb. 2024, 12:13: > +Ajit Kharpude <ajit.kharp...@clouzersolutions.com> > > On Fri, Feb 23, 2024 at 1:14 PM Karsten Stöckmann < > karsten.stoeckm...@gmail.com> wrote: > > > Hi, > > > > I am observing somewhat unexpected (from my point of view) behaviour > > while ke-key / re-partitioning operations in order to prepare a > > KTable-KTable join. > > > > Assume two (simplified) source data structures from two respective > topics: > > > > class User { > > Long id; // PK > > String name; > > } > > > > class Attribute { > > Long id; // PK > > Integer number; > > Long user_id; // FK > > } > > > > Now in order to build an aggregate user containing all of its > > attributes (0-n), the 'attributes' topic needs to be re-keyed to its > > FK ('native' FK join is not possible as there's no right join > > operation) using a collection object. > > > > class GroupedAttributes { > > List<Integer> numbers = new ArrayList<>(); > > public GroupedAttributes add(Integer v) { > > numbers.add(v); > > return this; > > } > > public GroupedAttributes remove(Integer v) { > > numbers.remove(v); > > return this; > > } > > } > > > > Re-Key operation: > > > > KTable<Long, GroupedAttributes> groupedAttributes = attributes // this > > is a KTable<Long, Attribute> > > .groupBy( > > (k, v) -> KeyValue.pair(v.userId(), v.number()), > > Grouped.with( > > "attributes-grouped", > > Serdes.Long(), > > Serdes.Integer())) > > .aggregate( > > GroupedAttributes::new, > > (k, v, a) -> a.add(v), > > (k, v, a) -> a.remove(v), > > Named.as("attributes-grouped-aggregated"), > > Materialized.with(Serdes.Long, groupedAttributesSerde)); > > > > This internally creates a state store and associated topic > > 'attributes-grouped-aggregated-changelog' containing the aggregated > > 'number' attributes re-keyed to their FK (user_id). > > > > Now for a User associated with exactly one Attribute, I'd expected the > > topic to contain exactly one record with the user's key and a > > GroupedAttributes object with one item. But: in fact that topic > > contains thousands of records for that particular user with an ever > > growing list of always the same attribute 'number', which is > > eventually reduced to the (expected) final object with one attribute > > 'number'. > > > > E.g.: > > > > offset: 1, key: 100, { "numbers": [1] } > > offset: 3, key: 100, { "numbers": [1, 1] } > > offset: 6, key: 100, { "numbers": [1, 1, 1] } > > offset: 9, key: 100, { "numbers": [1, 1, 1, 1] } > > ... > > offset 262211, key: 100, { "numbers": [1, 1] } > > offset 262213, key: 100, { "numbers": [1] } > > > > Can anyone please shed some light on the internal workings and explain > > if this is expected behaviour? > > > > Best wishes, > > Karsten > > > > > -- > Thanks & Regards > *VIKRAM S SINGH* >