Hi Sachin,

Yes it will be called each time a key is modified, it will do this continuously 
until you stop the app.

Eno
> On 9 Oct 2016, at 16:50, Sachin Mittal <sjmit...@gmail.com> wrote:
> 
> Hi,
> It is actually a KTable-KTable join.
> 
> I have a stream (K1, A) which is aggregated as (Key, List<A>) hence it
> creates a KTable.
> I have another stream (K2, B) which is aggregated as (Key, List<B>) hence
> it creates another KTable.
> 
> Then I have
> KTable (Key, List<A>).leftJoin(  KTable(Key, List<B>),   ValueJoiner {
>     new List<AB> = merge (List<A>, List<B>)
>    return (Key, new List<AB>)
> }).to("new-result-topic)
> 
> So what I understand is that every time ValueJoiner is called, it picks the
> latest modified List<A> and List<B> for a key and merges them and then
> updates the new-result-topic with new modified list for same key.
> 
> So then when I do KStream("new-result-topic).forEach((key, List<AB>)
>    //this callback is called multiple times for same key kx and each time
> it contains new modified List<AB> (as and when it gets modified by above
> process)
> });
> 
> So please let me know if my understanding is correct. I suppose it will be
> called every time a key is modified or it buffers the changes and calls it
> once in a given time span.
> 
> Thanks
> Sachin
> 
> 
> 
> On Sun, Oct 9, 2016 at 3:07 PM, Eno Thereska <eno.there...@gmail.com> wrote:
> 
>> Hi Sachin,
>> 
>> Some comments inline:
>> 
>>> On 9 Oct 2016, at 08:19, Sachin Mittal <sjmit...@gmail.com> wrote:
>>> 
>>> Hi,
>>> I needed some light on how joins actually work on continuous stream of
>> data.
>>> 
>>> Say I have 2 topics which I need to join (left join).
>>> Data record in each topic is aggregated like (key, value) <=> (string,
>> list)
>>> 
>>> Topic 1
>>> key1: [A01, A02, A03, A04 ..]
>>> Key2: [A11, A12, A13, A14 ..]
>>> ....
>>> 
>>> Topic 2
>>> key1: [B01, B02, B03, B04 ..]
>>> Key2: [B11, B12, B13, B14 ..]
>>> ....
>>> 
>>> Joined topic
>>> Key1: [A01, B01...]
>>> Key2: [A11, B11 ...]
>>> 
>>> Now let us say I get 2 records [Key1: A05] & [Key1: B05]
>>> So as per aggregation they are appended to the Topic 1 and Topic 2.
>>> 
>>> I assume this will again call the join operation and the records would
>> get
>>> appended to Key1 data? Let me know if my understanding is correct here.
>> 
>> Yes, that is correct. The join operation is continuously called each time
>> there are new records consumed from the topic. The consuming happens
>> continuously too.
>> 
>>> 
>>> If I am reading the joined topic using foreach will I again get record
>> for
>>> key1 with new appended data in the original list so now my record is
>>> Key1: [A01, B01..., A05, B05 ... ]
>> 
>> Correct.
>> 
>> 
>>> 
>>> What I wanted to ask was in case of reading each record from a topic, if
>>> the value against that key is modified will it be read again (if it was
>>> read before also)?
>>> Or the record is read only once via that stream program?
>> 
>> So this depends on how the value for a key is modified. I'm assuming a new
>> record with the new value is produced to the topic. There will be two broad
>> options here:
>> 
>> - if you are doing a KSTream-KStream join, the "time" when the new value
>> is updated will matter (these kinds of joins are done with a time boundary,
>> e.g., join everything within a time difference of 10 minutes). E.g., say
>> the join result so far is Key1: [A01, B01..., A05, B05 ... ]. If the value
>> for Key1 is now [B06] then the output will depend on the time of the join.
>> - if you are doing a KStream-KTable join, it depends on whether the value
>> change happens on the KStream or KTable.
>> 
>> Before going further, could you clarify if you'll have a KStream-KStream
>> join or a KStream-KTable join?
>> 
>> Thanks
>> Eno
>> 
>>> 
>>> Please let me know how such a scenario works.
>>> 
>>> Thanks
>>> Sachin
>> 
>> 

Reply via email to