Case closed, behaviour is actually as expected. - The source topic contains
multiplied data that gets propagated into the join just as it should. I'm
leveraging a stream processor for deduplication now.

Best wishes
Karsten

Vikram Singh <vikram.si...@clouzersolutions.com.invalid> schrieb am Fr.,
23. Feb. 2024, 12:13:

> +Ajit Kharpude <ajit.kharp...@clouzersolutions.com>
>
> On Fri, Feb 23, 2024 at 1:14 PM Karsten Stöckmann <
> karsten.stoeckm...@gmail.com> wrote:
>
> > Hi,
> >
> > I am observing somewhat unexpected (from my point of view) behaviour
> > while ke-key / re-partitioning operations in order to prepare a
> > KTable-KTable join.
> >
> > Assume two (simplified) source data structures from two respective
> topics:
> >
> > class User {
> >   Long id; // PK
> >   String name;
> > }
> >
> > class Attribute {
> >   Long id; // PK
> >   Integer number;
> >   Long user_id; // FK
> > }
> >
> > Now in order to build an aggregate user containing all of its
> > attributes (0-n), the 'attributes' topic needs to be re-keyed to its
> > FK ('native' FK join is not possible as there's no right join
> > operation) using a collection object.
> >
> > class GroupedAttributes {
> >   List<Integer> numbers = new ArrayList<>();
> >   public GroupedAttributes add(Integer v) {
> >     numbers.add(v);
> >     return this;
> >   }
> >   public GroupedAttributes remove(Integer v) {
> >     numbers.remove(v);
> >     return this;
> >   }
> > }
> >
> > Re-Key operation:
> >
> > KTable<Long, GroupedAttributes> groupedAttributes = attributes // this
> > is a KTable<Long, Attribute>
> >     .groupBy(
> >         (k, v) -> KeyValue.pair(v.userId(), v.number()),
> >         Grouped.with(
> >             "attributes-grouped",
> >             Serdes.Long(),
> >             Serdes.Integer()))
> >     .aggregate(
> >         GroupedAttributes::new,
> >         (k, v, a) -> a.add(v),
> >         (k, v, a) -> a.remove(v),
> >         Named.as("attributes-grouped-aggregated"),
> >         Materialized.with(Serdes.Long, groupedAttributesSerde));
> >
> > This internally creates a state store and associated topic
> > 'attributes-grouped-aggregated-changelog' containing the aggregated
> > 'number' attributes re-keyed to their FK (user_id).
> >
> > Now for a User associated with exactly one Attribute, I'd expected the
> > topic to contain exactly one record with the user's key and a
> > GroupedAttributes object with one item. But: in fact that topic
> > contains thousands of records for that particular user with an ever
> > growing list of always the same attribute 'number', which is
> > eventually reduced to the (expected) final object with one attribute
> > 'number'.
> >
> > E.g.:
> >
> > offset: 1, key: 100, { "numbers": [1] }
> > offset: 3, key: 100, { "numbers": [1, 1] }
> > offset: 6, key: 100, { "numbers": [1, 1, 1] }
> > offset: 9, key: 100, { "numbers": [1, 1, 1, 1] }
> > ...
> > offset 262211, key: 100, { "numbers": [1, 1] }
> > offset 262213, key: 100, { "numbers": [1] }
> >
> > Can anyone please shed some light on the internal workings and explain
> > if this is expected behaviour?
> >
> > Best wishes,
> > Karsten
> >
>
>
> --
> Thanks & Regards
> *VIKRAM S SINGH*
>

Reply via email to