I prepared a small example that outlines how something like this could be implemented: https://gist.github.com/aljoscha/36afedce40abf8ae92b92d4355809ff1
It doesn't include all your requirements, such as count per wall, etc. But this should get you started on the right path. I hope this helps! On Fri, 13 May 2016 at 20:18 nsengupta <[email protected]> wrote: > Hello Flinksters > > > Alright. So, I had a fruitful exchange of messages with Balaji earlier > today, on this topic. I moved ahead with the understanding derived from the > exchange (thanks, Balaji) at the time. But, now I am back because I think > my > approach is unclean, if not incorrect. There probably is a smarter way to > achieve the same but I can't figure it out. > > Here's the problem: > > A building has 4 walls (0,1,2,3). On each wall, a number of devices has > been > planted to capture some physical attribute: let's say temperature at that > spot. Every device has a unique ID. > > A typical tuple looks like this (Reading ==> Temperature as an Integer): > (TupleType,Time,WallID,DeviceID,Reading) > > The system works on the basis of records arriving in a time-window of 60 > seconds. We can consider this to be a Tumbling Window. The time (and Window > assignment etc.) is not the issue here. The 'Time' field increases > monotonically. > > If TupleType == 0, I need to compute and update my data structures from the > stream > > If TupleType == 1, I need to emit the maximum temperature recorded by the > DeviceID out of last 5 readings. > > If TupleType == 2, I need to emit the number of readings so far arrived > from > the particular wall. Obviously, in this case, we will ignore the value of > fields 'DeviceID' and 'Reading' in the tuple. > > The Application generates output for TupleType 1 and TupleType 2. > > The TupleTypes can arrive in any order. For example, TupleType 1 may arrive > with a DeviceID which the application hasn't seen before (no corresponding > TupleType 0 has arrived earlier with that DeviceID). Let us assume that we > have a fallback value to be emitted for such cases, to keep things simple. > > In my mind, the implementation should be along this line: > > - Split the incoming Stream in three separate substreams using SplitStream, > based upon TupleType > - For StreamOFTupleType0, > - KeyBy(DeviceID) > - Apply a Mapper > - Update a Map [DeviceID, [Tuple2(MaxReadingSoFar, > FixedSizeList[Reading])] somewhere > - Apply (next) Mapper > - Calculate the total count of reading the Wall so far > - Update a Map [WallID, Count] > > - For StreamOFTupleType1 > - Access the Map created/updated through the first Mapper above > - Emit > > - For StreamOFTupleType2 > - Access the Map created/updated through the second Mapper above. > - Emit > > I have hit a wall to decide how the live data structures should be created, > updated and accessed, correctly and efficiently in a situation like above. > More importantly, how will they be shared between operators, across > partitions (nodes). > > I can't broadcast the Maps because they are not READONLY (/aka/ LookUp > only). > > I can't create RichMapFunction local data structures because they are not > shared between partitions (my understanding). They will be blind to the > effect of accumulation. Each will begin with an empty Map. > > I have done a bit of exploration and I have found this thread > < > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/mutable-hashmap-outside-of-stream-does-it-get-snapshotted-td6002.html#a6013 > > > in the forum. I have understood what Stephano > < > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=user_nodes&user=266 > > > is suggesting ('..State is moved along pipeline ..') but then, failed to > figure out how to apply in my case, if at all possible. > > I have been thinking about using an external DB-like datastore but I want > to > be sure about the inevitability of that decision. If I use a DB, then the > focus may go to the INSERT/SELECT like queries. My application then becomes > more of a distributed DB application rather than a lean Streaming > application. That thought doesn't make me happy! :-) > > Please make me wiser (by pointing out gaps in understanding where they > exist). If any more specific information helps you, please ask me. > > My primary aim is to have a clarity of the recipe of a UseCase like this. > > -- Nirmalya > > > > > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Sharing-State-between-Operators-tp6911.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. >
