`Substractor` in only needed for KTable aggregations.

For `KStream` aggregations it's not needed: for this case, each window
is materialized into one row of the result KTable and just updated with
the `Aggregator` for each incoming record (ie, corresponding to key --
and timestamp, for windowed-aggregation).

Note, that for `KStream` each record is a *fact*.

For KTable aggregations it works differently, because a KTable does not
store immutable facts, but can be update over time. This implies, that
if record in the "base" KTable is updated, it's old value must be
removed from the result KTable while the new value must be added to the
result KTable. The `Subtractor` is used for the former.

Example: you count some keys:

<A,a>
<B,b>
<A,a>
<C,c>

This result is a KTable

<A,2>
<B,1>
<C,1>

Next, you apply a groupBy to the KTable, to build a histogram over the
count (ie, you count how many different keys have the same count). For
this, you set the existing count as new key getting

<2,dummy>
<1,dummy>
<1,dummy>

resulting in KTable

<2,1> // only one key, namely A, is counted twice
<1,2> // two keys, namely B and C, have count one

Not let's assume the original input stream get another <B,b>. This
updates the first/base KTable to

<A,2>
<B,2>
<C,1>

Not, B is counted twice and thus, there are two keys (A and B) with a
count of 2 and only one key (C) with a count of 1. Hence, to update the
second KTable, the old <B,1> must be subtracted and the new <B,2> need
to be added, updating the second KTable to

<2,2> // we add the new <B,2> here
<1,1> // we subtract the old <B,1> here


Does this make sense?

-Matthias



On 9/24/18 8:10 PM, Michael Eugene wrote:
> Thanks for your response again.  As far as your feedback on what I am trying 
> to accomplish - I need to use the aggregate method because I need the 
> aggregator to be very dynamic.  Multiple, heterogenous types need to be 
> handled in one aggregation.  
> 
> So I can’t really use a reducer for that.  
> 
> I am mapping multiple rows of data into one output row.  I didn’t want to add 
> all of my test data and output data.  
> 
> I’m not saying your explanation is incorrect.  On the contrary, I wanted to 
> summarize it in my own terms. Can you say that my saying “subtractor in the 
> aggregate method actually kicks in when the key changes during an immediately 
> preceding groupBy” is a correct understanding of what you are saying? 
> 
> 
> Sent from my iPhone
> 
>> On Sep 25, 2018, at 4:31 AM, Vasily Sulatskov <vas...@sulatskov.net> wrote:
>>
>> Hi,
>>
>> As far as I understand your code it's not a very good example of table
>> aggregation. The way I see it, your code does the following:
>>
>> 1. You take some stream
>> 2. Do some map, changing keys and values
>> 3. groupByKey
>> 4. Reduce - keeping the last value for every key, here you get a KTable
>> 5. GroupBy - artifically translate KTable into KGroupedTable, but no
>> real grouping has been done the transformation is (k, v) => (k, v)
>> 6. aggregate()
>>
>> And now assuming that addition is actual addition, and subtraction is
>> real subtraction, you would get exactly the same as on step 4. I think
>> if you try to observe how the data flows through your app, that
>> wouldn't show much about real aggregation, as no real aggregation
>> actually takes place.
>>
>> I can't really add much to this. I think the explanation I've given is
>> mostly correct, and what really helped me to understand why a
>> subtractor is necessary is coming up with a simple example (as I've
>> shown) and working through it manually step by step. And I recommend
>> doing just that.
>>
>> Try doing some real aggregation when some keys in the source table are
>> mapped to one key in the aggregated table. I.e. say your table
>> represents number of view per page, so try to compute number of views
>> per page category.
>>
>> Just like when you use aggregation in SQL using some aggregation
>> function, in order for this to make sense you must map several rows in
>> the source into one row in the output.
>>> On Mon, Sep 24, 2018 at 6:48 PM Michael Eugene <far...@hotmail.com> wrote:
>>>
>>> Ok thanks for taking the time again to respond. So you are saying that the 
>>> subtractor actually handles when the key changes for the earlier groupBy?  
>>> (Maybe not 100% but that is sort of what it is handling).
>>> Also my code is below (I took out some of it to avoid clutter) - I am 
>>> grouping twice and maybe that is why I am getting some unpredictable 
>>> results.
>>>
>>> val result = kstream
>>>  .map[String, Array[Byte]]((key1: String, value: Array[Byte]) => {
>>>
>>>      val newKey = reKey(byteArray, groupByColumnList, List.empty[String])
>>>      val valueNew = getColumnValue(byteArray, aggregateColumnList)
>>>
>>>    (newKey, serialise(valueNew))})
>>>
>>>  .groupByKey(Serialized.`with`(Serdes.String, Serdes.ByteArray))
>>>  .reduce((oldVal: Array[Byte], newVal: Array[Byte]) => 
>>> newVal)(Materialized.`with`(Serdes.String, Serdes.ByteArray))
>>>  .groupBy((k,v) => (k,v))(Serialized.`with`(Serdes.String, 
>>> Serdes.ByteArray))
>>>  .aggregate[Array[Byte]](serialise(0L) )(
>>>    (key: String
>>>      ,newValue: Array[Byte]
>>>      , agg: Array[Byte]) => {
>>>
>>>
>>>      type typeI = String
>>>      type typeL = Long
>>>      val nVS = deserialise[String](newValue)
>>>      val nVSI = nVS.toInt
>>>      val aVL = deserialise[Long](agg)
>>>      val agg1 = nVSI + aVL
>>>      println(agg1.toString)
>>>      serialise(agg1)
>>>    }
>>>  ,
>>>  (key: String
>>>   , oldValue: Array[Byte]
>>>   , agg: Array[Byte]) => {
>>>
>>>
>>> // Just doing the same thing as the adder.
>>>
>>>    type typeI = String
>>>    type typeL = Long
>>>    val nVS = deserialise[String](oldValue)
>>>    val nVSI = nVS.toInt
>>>    val aVL = deserialise[Long](agg)
>>>    val agg1 = nVSI + aVL
>>>    println(agg1.toString)
>>>    serialise(agg1)
>>>  }
>>>      )(Materialized.`with`(Serdes.String, Serdes.ByteArray))
>>>  .mapValues(v => { val s = deserialise[Long](v).toString; println(s); s})
>>>  .toStream
>>>  .to(outputTopic)
>>>
>>>
>>> ________________________________
>>> From: Vasily Sulatskov <vas...@sulatskov.net>
>>> Sent: Monday, September 24, 2018 12:12 PM
>>> To: users@kafka.apache.org
>>> Subject: Re: Subtractor
>>>
>>> Hi,
>>>
>>> Given that you need a subtractor you are probably calling
>>> KGroupedTable.aggregate(). In order to get a KGroupedTable you called
>>> (in a general case) KTable.groupBy().
>>>
>>> I.e you have an original (pre-groupBy) table stream (changelog): where
>>> a message key is say pageId, and value is say number of hits in the
>>> current time window (or something like that):
>>>
>>> key(pageId)=1, value=1
>>> key(pageId)=2, value=2
>>> key(pageId)=3, value=10
>>> key(pageId)=1, value=3
>>> key(pageId)=2, value=4
>>> key(pageId)=3, value=11
>>>
>>> Now you want to build a table that contains number of hits in the
>>> current time window per page category, so you group your values by
>>> well, categoryId, let's say pageId=1 and pageId=2 belong to
>>> categoryId=1 and pageId=3 belongs to some other categoryId.
>>>
>>> Essentially .groupBy() will transform your changelog to (and dropping
>>> messages from different categories):
>>>
>>> key(categoryId=1), value=1 (old key=1)
>>> key(categoryId=1), value=2 (old key=2)
>>> key(categoryId=1), value=3 (old key=1)
>>> key(categoryId=1), value=4 (old key=2)
>>>
>>> Which is how the example I've given in the previous email came to be.
>>>
>>> And the final aggregation result will be:
>>>
>>> categoryId=1, sum(value) = 4
>>>
>>> And under the hood, kafka will represent this messages as such:
>>>
>>> key(categoryId=1), newValue=1, oldValue=null (oldKey=1)
>>> key(categoryId=1), newValue=2, oldValue=null (oldKey=2)
>>> key(categoryId=1), newValue=3, oldValue=1 (oldKey=1)
>>> key(categoryId=1), newValue=4, oldValue=2 (oldKey=2)
>>>
>>> So you see, as you compute your aggregation, new values for an old key
>>> (pre groupBy) arrive and essentially replace old values for the same
>>> old (pre groupBy) keys. And to do this "replacement" you need a
>>> subtraction operation.
>>>
>>> The only thing is that kafka doesn't carry around oldKey as I've shown
>>> above just for the demonstration, it's not necessary, it just calls
>>> adder on newValue and subtractor on oldValue. But in order to
>>> understand I like to think about it using "old keys".
>>>
>>> Hope that clears it up.
>>>
>>>> On Mon, Sep 24, 2018 at 12:03 PM Michael Eugene <far...@hotmail.com> wrote:
>>>>
>>>> First off thanks or taking the time out of your schedule to respond.
>>>>
>>>> You lost me at almost the beginning, specifically at mapping to a 
>>>> different key.  If those records come in...
>>>>
>>>> key=1, value=1
>>>> key=2, value=2
>>>> key=1, value=3
>>>> key=2, value=4
>>>>
>>>> Here is all that should happen in my application
>>>> 1. You start with aggregated value zero.
>>>> 2. You handle (key=1, value=1) -> agg=1
>>>> 3. You handle (key=2, value=2) -> agg=2
>>>> 4. You handle (key=1, value=3) -> why not just add 3 to the earlier 1 so 
>>>> it is agg 4?
>>>> 5. You handle (key=2, value=4) -> why not just add 4 to the earlier 2 so 
>>>> it is agg 6?
>>>>
>>>> I have no interest in mapping to different keys.  That's kind of making 
>>>> this exercise more complex.
>>>>
>>>> Also one of the confusing points is why in older versions of Kafka did you 
>>>> not need a subtractor?  Only in 2.0 am I required to give a subtractor. 
>>>> 1.1 I didn't need one.
>>>>
>>>> ________________________________
>>>> From: Vasily Sulatskov <vas...@sulatskov.net>
>>>> Sent: Monday, September 24, 2018 9:46 AM
>>>> To: users@kafka.apache.org
>>>> Subject: Re: Subtractor
>>>>
>>>> Hi,
>>>>
>>>> If I am not mistaken it works like this.
>>>>
>>>> Remember that kafka is a streaming system, i.e. there's no way for
>>>> kafka streams to look at all the current value for a given key, and
>>>> compute the aggregation by repeatedly calling your adder (starting
>>>> with zero value). Values arrive at different times, with value for
>>>> different keys in between them, and you expect kafka streams to always
>>>> give you the up to date aggregated value.
>>>>
>>>> Put yourself in the place of kafka-streams application, how would you
>>>> compute say a sum of all keys that get mapped to a single key after
>>>> with a pen and a paper? I bet you would keep track of last arrived
>>>> value for each key, and the total aggregated value.
>>>>
>>>> So let's say here's a stream of values that all had originally
>>>> different keys, but you mapped them via groupBy() to a different key,
>>>> and they arrive to you like this:
>>>>
>>>> key=1, value=1
>>>> key=2, value=2
>>>> key=1, value=3
>>>> key=2, value=4
>>>>
>>>> 1. You start with aggregated value zero.
>>>> 2. You handle (key=1, value=1) -> agg=1
>>>> 3. You handle (key=2, value=2) -> agg=3
>>>> 4. You handle (key=1, value=3), now you can't just add 3 to your
>>>> aggregated value, you must add new value for key=1, and subtract old
>>>> value for key=1: newAgg = oldAgg + newValueForKey1 - newValueForKey1:
>>>> agg = 3 + 3 - 1 -> agg = 5
>>>> 5. You handle (key=2, value=4), again you must look up a previous
>>>> value for key=2 and subtract it from the aggregated value: agg = 5 + 4
>>>> - 2 -> agg = 7
>>>>
>>>> And this is basically how it works.
>>>>
>>>> If you look into more details there are some complications though,
>>>> such as kakfa-streams transforming a sequence of values into a
>>>> sequence of changes of values, so your KStream[T] becomes more like
>>>> KStream[Change[T]] where change carries both new and old value, and
>>>> over the wire this change gets transmitted as two separate kafka
>>>> messages.
>>>>> On Mon, Sep 24, 2018 at 10:56 AM Michael Eugene <far...@hotmail.com> 
>>>>> wrote:
>>>>>
>>>>> Can someone explain to me the point of the Subtractor in an aggregator?  
>>>>> I have to have one, because there is no concrete default implentation of 
>>>>> it, but I am just trying to get a "normal" aggregation working and I 
>>>>> don't see why I need a subtractor.  Other than of course I need to make 
>>>>> the program compile.
>>>>>
>>>>> I'm using Kafka Streams DSL 2.0
>>>>
>>>>
>>>>
>>>> --
>>>> Best regards,
>>>> Vasily Sulatskov
>>>
>>>
>>>
>>> --
>>> Best regards,
>>> Vasily Sulatskov
>>
>>
>>
>> -- 
>> Best regards,
>> Vasily Sulatskov

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to