Hi,
It is actually a KTable-KTable join.

I have a stream (K1, A) which is aggregated as (Key, List<A>) hence it
creates a KTable.
I have another stream (K2, B) which is aggregated as (Key, List<B>) hence
it creates another KTable.

Then I have
KTable (Key, List<A>).leftJoin(  KTable(Key, List<B>),   ValueJoiner {
     new List<AB> = merge (List<A>, List<B>)
    return (Key, new List<AB>)
}).to("new-result-topic)

So what I understand is that every time ValueJoiner is called, it picks the
latest modified List<A> and List<B> for a key and merges them and then
updates the new-result-topic with new modified list for same key.

So then when I do KStream("new-result-topic).forEach((key, List<AB>)
    //this callback is called multiple times for same key kx and each time
it contains new modified List<AB> (as and when it gets modified by above
process)
});

So please let me know if my understanding is correct. I suppose it will be
called every time a key is modified or it buffers the changes and calls it
once in a given time span.

Thanks
Sachin



On Sun, Oct 9, 2016 at 3:07 PM, Eno Thereska <eno.there...@gmail.com> wrote:

> Hi Sachin,
>
> Some comments inline:
>
> > On 9 Oct 2016, at 08:19, Sachin Mittal <sjmit...@gmail.com> wrote:
> >
> > Hi,
> > I needed some light on how joins actually work on continuous stream of
> data.
> >
> > Say I have 2 topics which I need to join (left join).
> > Data record in each topic is aggregated like (key, value) <=> (string,
> list)
> >
> > Topic 1
> > key1: [A01, A02, A03, A04 ..]
> > Key2: [A11, A12, A13, A14 ..]
> > ....
> >
> > Topic 2
> > key1: [B01, B02, B03, B04 ..]
> > Key2: [B11, B12, B13, B14 ..]
> > ....
> >
> > Joined topic
> > Key1: [A01, B01...]
> > Key2: [A11, B11 ...]
> >
> > Now let us say I get 2 records [Key1: A05] & [Key1: B05]
> > So as per aggregation they are appended to the Topic 1 and Topic 2.
> >
> > I assume this will again call the join operation and the records would
> get
> > appended to Key1 data? Let me know if my understanding is correct here.
>
> Yes, that is correct. The join operation is continuously called each time
> there are new records consumed from the topic. The consuming happens
> continuously too.
>
> >
> > If I am reading the joined topic using foreach will I again get record
> for
> > key1 with new appended data in the original list so now my record is
> > Key1: [A01, B01..., A05, B05 ... ]
>
> Correct.
>
>
> >
> > What I wanted to ask was in case of reading each record from a topic, if
> > the value against that key is modified will it be read again (if it was
> > read before also)?
> > Or the record is read only once via that stream program?
>
> So this depends on how the value for a key is modified. I'm assuming a new
> record with the new value is produced to the topic. There will be two broad
> options here:
>
> - if you are doing a KSTream-KStream join, the "time" when the new value
> is updated will matter (these kinds of joins are done with a time boundary,
> e.g., join everything within a time difference of 10 minutes). E.g., say
> the join result so far is Key1: [A01, B01..., A05, B05 ... ]. If the value
> for Key1 is now [B06] then the output will depend on the time of the join.
> - if you are doing a KStream-KTable join, it depends on whether the value
> change happens on the KStream or KTable.
>
> Before going further, could you clarify if you'll have a KStream-KStream
> join or a KStream-KTable join?
>
> Thanks
> Eno
>
> >
> > Please let me know how such a scenario works.
> >
> > Thanks
> > Sachin
>
>

Reply via email to