Hi, It is actually a KTable-KTable join. I have a stream (K1, A) which is aggregated as (Key, List<A>) hence it creates a KTable. I have another stream (K2, B) which is aggregated as (Key, List<B>) hence it creates another KTable.
Then I have KTable (Key, List<A>).leftJoin( KTable(Key, List<B>), ValueJoiner { new List<AB> = merge (List<A>, List<B>) return (Key, new List<AB>) }).to("new-result-topic) So what I understand is that every time ValueJoiner is called, it picks the latest modified List<A> and List<B> for a key and merges them and then updates the new-result-topic with new modified list for same key. So then when I do KStream("new-result-topic).forEach((key, List<AB>) //this callback is called multiple times for same key kx and each time it contains new modified List<AB> (as and when it gets modified by above process) }); So please let me know if my understanding is correct. I suppose it will be called every time a key is modified or it buffers the changes and calls it once in a given time span. Thanks Sachin On Sun, Oct 9, 2016 at 3:07 PM, Eno Thereska <eno.there...@gmail.com> wrote: > Hi Sachin, > > Some comments inline: > > > On 9 Oct 2016, at 08:19, Sachin Mittal <sjmit...@gmail.com> wrote: > > > > Hi, > > I needed some light on how joins actually work on continuous stream of > data. > > > > Say I have 2 topics which I need to join (left join). > > Data record in each topic is aggregated like (key, value) <=> (string, > list) > > > > Topic 1 > > key1: [A01, A02, A03, A04 ..] > > Key2: [A11, A12, A13, A14 ..] > > .... > > > > Topic 2 > > key1: [B01, B02, B03, B04 ..] > > Key2: [B11, B12, B13, B14 ..] > > .... > > > > Joined topic > > Key1: [A01, B01...] > > Key2: [A11, B11 ...] > > > > Now let us say I get 2 records [Key1: A05] & [Key1: B05] > > So as per aggregation they are appended to the Topic 1 and Topic 2. > > > > I assume this will again call the join operation and the records would > get > > appended to Key1 data? Let me know if my understanding is correct here. > > Yes, that is correct. The join operation is continuously called each time > there are new records consumed from the topic. The consuming happens > continuously too. > > > > > If I am reading the joined topic using foreach will I again get record > for > > key1 with new appended data in the original list so now my record is > > Key1: [A01, B01..., A05, B05 ... ] > > Correct. > > > > > > What I wanted to ask was in case of reading each record from a topic, if > > the value against that key is modified will it be read again (if it was > > read before also)? > > Or the record is read only once via that stream program? > > So this depends on how the value for a key is modified. I'm assuming a new > record with the new value is produced to the topic. There will be two broad > options here: > > - if you are doing a KSTream-KStream join, the "time" when the new value > is updated will matter (these kinds of joins are done with a time boundary, > e.g., join everything within a time difference of 10 minutes). E.g., say > the join result so far is Key1: [A01, B01..., A05, B05 ... ]. If the value > for Key1 is now [B06] then the output will depend on the time of the join. > - if you are doing a KStream-KTable join, it depends on whether the value > change happens on the KStream or KTable. > > Before going further, could you clarify if you'll have a KStream-KStream > join or a KStream-KTable join? > > Thanks > Eno > > > > > Please let me know how such a scenario works. > > > > Thanks > > Sachin > >