Thanks a lot Bruno, much clearer now. It's only my opinion but since the Topology is a concept of the API as well as the repartitioning logic, for me also this mechanism should be a bit more transparent, but it aslo maybe that I'm plain wrong here :)
Thanks ! On Thu, May 14, 2020 at 9:24 PM Bruno Cadonna <br...@confluent.io> wrote: > Hi Raffaele, > > Change is an internal class in Streams and also its SerDes are > internal. To consume the repartition topic you mention outside of > Streams you would need to use those internal classes (note: I've never > tried this). Those classes can change at any time. So consuming from > repartition topics for other than educational purposes is not a good > idea. > > toStream() only emits the new value of the Change. > > Regarding docs, since these are internals, the code is the best > source. For example: > > The Change class: > > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java > > Here the Change class is used to first remove the old value from the > aggregate and then to add the new value to the aggregate: > > https://github.com/apache/kafka/blob/873e9446ef8426061e2f1b6cd21815b270e27f03/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java#L90 > > Best, > Bruno > > On Thu, May 14, 2020 at 8:50 PM Raffaele Esposito > <rafaelral...@gmail.com> wrote: > > > > Hi Bruno, > > Also when you mention: > > > > The record structure key, (oldValue, newValue) is called Change in > > Kafka Streams and it is used where updates are emitted downstream > > > > Does it also mean the same happen when we convert a KTable to a KStream ? > > Do you know any docs or article about this topics? > > > > Thanks again, > > Raffaele > > > > > > > > On Thu, May 14, 2020 at 8:39 PM Raffaele Esposito < > rafaelral...@gmail.com> > > wrote: > > > > > Hi Bruno, > > > Thanks, > > > One more thing, As I told you I was consuming the repartitioning topic > > > created by group by > > > and I just saw the old and new value, as you are telling me now they > are > > > indeed marked as old and new, > > > is this mark visible somehow consuming the repartitioning topic ? > > > Raffaele > > > > > > On Thu, May 14, 2020 at 7:48 PM Bruno Cadonna <br...@confluent.io> > wrote: > > > > > >> Hi Raffaele, > > >> > > >> In your example, Kafka Streams would send the new and the old value > > >> downstream. More specifically, the groupBy() would send (as you also > > >> observed) > > >> > > >> London, (old value: London, new value: null) > > >> Berlin, (old value: null, new value: Berlin) > > >> > > >> At the count() record London, (old value: London, new value: null) > > >> would detract 1 from key London and record Berlin, (old value: null, > > >> new value: Berlin) would add 1 to Berlin. > > >> > > >> The record structure key, (oldValue, newValue) is called Change in > > >> Kafka Streams and it is used where updates are emitted downstream. > > >> > > >> Best, > > >> Bruno > > >> > > >> On Thu, May 14, 2020 at 12:17 PM Raffaele Esposito > > >> <rafaelral...@gmail.com> wrote: > > >> > > > >> > I m trying to better understand KTable and I have encountered a > > >> behaviour I > > >> > cannot wrap my mind around it. > > >> > > > >> > So* groupByKey()* can only be applied to KStream and not to KTable, > > >> that's > > >> > because of the nature of KTable that and its UPSERT logic. > > >> > What I don't understand correctly and therefore ask your help for > that > > >> is > > >> > how *groupBy()* can actually be applied on KTable, the documentation > > >> says > > >> > that: > > >> > > > >> > groupBy() is a shorthand for selectKey(...).groupByKey() > > >> > > > >> > But both these operations can only be applied to KStreams. > > >> > > > >> > The documentation also says: > > >> > > > >> > Because a new key is selected, an internal repartitioning topic > will be > > >> > created in Kafka ... All data of this KTable will be redistributed > > >> through > > >> > the repartitioning topic by writing all update records to and > rereading > > >> all > > >> > update records from it, such that the resulting KGroupedTable is > > >> > partitioned on the new key. > > >> > > > >> > Now assume we want to count the favourite cities of users: > > >> > > > >> > We have a key table like: > > >> > > > >> > Mike,LondonAlice,Paris Fred,RomeMike,Berlin (changed his mind) > > >> > > > >> > I would use: > > >> > > > >> > KTable<String, String> usersAndCitiesTable = > > >> > builder.table("user-keys-and-cities"); > > >> > > > >> > KTable<String, Long> favouriteCities = > > >> > usersAndCitiesTable.groupBy((user,city)->new KeyValue<>(city, city)) > > >> > .count(Materialized.<String, Long, KeyValueStore<Bytes, > > >> > byte[]>>as("CountsByCity") > > >> > > > >> > I took a look at the repartitioning topic created because of the > > >> groupBy, > > >> > and can see the record mapped using the KeyValueMapper provided > > >> > > > >> > I noticed that table generates two entries when Mike changes his > mind, > > >> one > > >> > for London (the old city) and one for Berlin (the new city) > > >> > > > >> > Are this entries marked somehow? if yes, how ? > > >> > > > >> > How does Kafka make sure that on London count is applied a -1 and > the > > >> > Berlin count a +1 when the new record with Mike's new favorite city > > >> arrives. > > >> > > > >> > > > >> > Any help or suggestion is highly appreciated ! > > >> > > > >> > Thanks > > >> > > > >