Hi Sachin, Yes it will be called each time a key is modified, it will do this continuously until you stop the app.
Eno > On 9 Oct 2016, at 16:50, Sachin Mittal <sjmit...@gmail.com> wrote: > > Hi, > It is actually a KTable-KTable join. > > I have a stream (K1, A) which is aggregated as (Key, List<A>) hence it > creates a KTable. > I have another stream (K2, B) which is aggregated as (Key, List<B>) hence > it creates another KTable. > > Then I have > KTable (Key, List<A>).leftJoin( KTable(Key, List<B>), ValueJoiner { > new List<AB> = merge (List<A>, List<B>) > return (Key, new List<AB>) > }).to("new-result-topic) > > So what I understand is that every time ValueJoiner is called, it picks the > latest modified List<A> and List<B> for a key and merges them and then > updates the new-result-topic with new modified list for same key. > > So then when I do KStream("new-result-topic).forEach((key, List<AB>) > //this callback is called multiple times for same key kx and each time > it contains new modified List<AB> (as and when it gets modified by above > process) > }); > > So please let me know if my understanding is correct. I suppose it will be > called every time a key is modified or it buffers the changes and calls it > once in a given time span. > > Thanks > Sachin > > > > On Sun, Oct 9, 2016 at 3:07 PM, Eno Thereska <eno.there...@gmail.com> wrote: > >> Hi Sachin, >> >> Some comments inline: >> >>> On 9 Oct 2016, at 08:19, Sachin Mittal <sjmit...@gmail.com> wrote: >>> >>> Hi, >>> I needed some light on how joins actually work on continuous stream of >> data. >>> >>> Say I have 2 topics which I need to join (left join). >>> Data record in each topic is aggregated like (key, value) <=> (string, >> list) >>> >>> Topic 1 >>> key1: [A01, A02, A03, A04 ..] >>> Key2: [A11, A12, A13, A14 ..] >>> .... >>> >>> Topic 2 >>> key1: [B01, B02, B03, B04 ..] >>> Key2: [B11, B12, B13, B14 ..] >>> .... >>> >>> Joined topic >>> Key1: [A01, B01...] >>> Key2: [A11, B11 ...] >>> >>> Now let us say I get 2 records [Key1: A05] & [Key1: B05] >>> So as per aggregation they are appended to the Topic 1 and Topic 2. >>> >>> I assume this will again call the join operation and the records would >> get >>> appended to Key1 data? Let me know if my understanding is correct here. >> >> Yes, that is correct. The join operation is continuously called each time >> there are new records consumed from the topic. The consuming happens >> continuously too. >> >>> >>> If I am reading the joined topic using foreach will I again get record >> for >>> key1 with new appended data in the original list so now my record is >>> Key1: [A01, B01..., A05, B05 ... ] >> >> Correct. >> >> >>> >>> What I wanted to ask was in case of reading each record from a topic, if >>> the value against that key is modified will it be read again (if it was >>> read before also)? >>> Or the record is read only once via that stream program? >> >> So this depends on how the value for a key is modified. I'm assuming a new >> record with the new value is produced to the topic. There will be two broad >> options here: >> >> - if you are doing a KSTream-KStream join, the "time" when the new value >> is updated will matter (these kinds of joins are done with a time boundary, >> e.g., join everything within a time difference of 10 minutes). E.g., say >> the join result so far is Key1: [A01, B01..., A05, B05 ... ]. If the value >> for Key1 is now [B06] then the output will depend on the time of the join. >> - if you are doing a KStream-KTable join, it depends on whether the value >> change happens on the KStream or KTable. >> >> Before going further, could you clarify if you'll have a KStream-KStream >> join or a KStream-KTable join? >> >> Thanks >> Eno >> >>> >>> Please let me know how such a scenario works. >>> >>> Thanks >>> Sachin >> >>