Thanks a lot Bruno, much clearer now.
It's only my opinion but since the Topology is a concept of the API as
well as the repartitioning logic, for me also this mechanism should be a
bit more transparent, but it aslo maybe that I'm plain wrong here :)

Thanks !

On Thu, May 14, 2020 at 9:24 PM Bruno Cadonna <br...@confluent.io> wrote:

> Hi Raffaele,
>
> Change is an internal class in Streams and also its SerDes are
> internal. To consume the repartition topic you mention outside of
> Streams you would need to use those internal classes (note: I've never
> tried this). Those classes can change at any time. So consuming from
> repartition topics for other than educational purposes is not a good
> idea.
>
> toStream() only emits the new value of the Change.
>
> Regarding docs, since these are internals, the code is the best
> source. For example:
>
> The Change class:
>
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java
>
> Here the Change class is used to first remove the old value from the
> aggregate and then to add the new value to the aggregate:
>
> https://github.com/apache/kafka/blob/873e9446ef8426061e2f1b6cd21815b270e27f03/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java#L90
>
> Best,
> Bruno
>
> On Thu, May 14, 2020 at 8:50 PM Raffaele Esposito
> <rafaelral...@gmail.com> wrote:
> >
> > Hi Bruno,
> > Also when you mention:
> >
> > The record structure key, (oldValue, newValue) is called Change in
> > Kafka Streams and it is used where updates are emitted downstream
> >
> > Does it also mean the same happen when we convert a KTable to a KStream ?
> > Do you know any docs or article about this topics?
> >
> > Thanks again,
> > Raffaele
> >
> >
> >
> > On Thu, May 14, 2020 at 8:39 PM Raffaele Esposito <
> rafaelral...@gmail.com>
> > wrote:
> >
> > > Hi Bruno,
> > > Thanks,
> > > One more thing, As I told you I was consuming the repartitioning topic
> > > created by group by
> > > and I just saw the old and new value, as you are telling me now they
> are
> > > indeed marked as old and new,
> > > is this mark visible somehow consuming the repartitioning topic ?
> > > Raffaele
> > >
> > > On Thu, May 14, 2020 at 7:48 PM Bruno Cadonna <br...@confluent.io>
> wrote:
> > >
> > >> Hi Raffaele,
> > >>
> > >> In your example, Kafka Streams would send the new and the old value
> > >> downstream. More specifically, the groupBy() would send (as you also
> > >> observed)
> > >>
> > >> London, (old value: London, new value: null)
> > >> Berlin, (old value: null, new value: Berlin)
> > >>
> > >> At the count() record London, (old value: London, new value: null)
> > >> would detract 1 from key London and record Berlin, (old value: null,
> > >> new value: Berlin) would add 1 to Berlin.
> > >>
> > >> The record structure key, (oldValue, newValue) is called Change in
> > >> Kafka Streams and it is used where updates are emitted downstream.
> > >>
> > >> Best,
> > >> Bruno
> > >>
> > >> On Thu, May 14, 2020 at 12:17 PM Raffaele Esposito
> > >> <rafaelral...@gmail.com> wrote:
> > >> >
> > >> > I m trying to better understand KTable and I have encountered a
> > >> behaviour I
> > >> > cannot wrap my mind around it.
> > >> >
> > >> > So* groupByKey()* can only be applied to KStream and not to KTable,
> > >> that's
> > >> > because of the nature of KTable that and its UPSERT logic.
> > >> > What I don't understand correctly and therefore ask your help for
> that
> > >> is
> > >> > how *groupBy()* can actually be applied on KTable, the documentation
> > >> says
> > >> > that:
> > >> >
> > >> > groupBy() is a shorthand for selectKey(...).groupByKey()
> > >> >
> > >> > But both these operations can only be applied to KStreams.
> > >> >
> > >> > The documentation also says:
> > >> >
> > >> > Because a new key is selected, an internal repartitioning topic
> will be
> > >> > created in Kafka ... All data of this KTable will be redistributed
> > >> through
> > >> > the repartitioning topic by writing all update records to and
> rereading
> > >> all
> > >> > update records from it, such that the resulting KGroupedTable is
> > >> > partitioned on the new key.
> > >> >
> > >> > Now assume we want to count the favourite cities of users:
> > >> >
> > >> > We have a key table like:
> > >> >
> > >> > Mike,LondonAlice,Paris Fred,RomeMike,Berlin (changed his mind)
> > >> >
> > >> > I would use:
> > >> >
> > >> > KTable<String, String> usersAndCitiesTable =
> > >> > builder.table("user-keys-and-cities");
> > >> >
> > >> >  KTable<String, Long> favouriteCities =
> > >> > usersAndCitiesTable.groupBy((user,city)->new KeyValue<>(city, city))
> > >> >      .count(Materialized.<String, Long, KeyValueStore<Bytes,
> > >> > byte[]>>as("CountsByCity")
> > >> >
> > >> > I took a look at the repartitioning topic created because of the
> > >> groupBy,
> > >> > and can see the record mapped using the KeyValueMapper provided
> > >> >
> > >> > I noticed that table generates two entries when Mike changes his
> mind,
> > >> one
> > >> > for London (the old city) and one for Berlin (the new city)
> > >> >
> > >> > Are this entries marked somehow?  if yes, how ?
> > >> >
> > >> > How does Kafka make sure that on London count is applied a -1 and
> the
> > >> > Berlin count a +1 when the new record with Mike's new favorite city
> > >> arrives.
> > >> >
> > >> >
> > >> > Any help or suggestion is highly appreciated !
> > >> >
> > >> > Thanks
> > >>
> > >
>

Reply via email to