This is exactly what I am confused about, if i understand it correctly each of the map function in the co-flat map would receive one tuple each at a time .. so that would mean if i have a datastream of centroids, it would arrive one at a time on the partitions and that would defeat the purpose.
Are you proposing that i put the entire list of centroid as a single datastream object so that the map functions get the entire list whenever it is called? Would it be possible for you to just give an example or a code snippet or a link to some use case of the co-flat map function? Thanks a lot for your help throughout. Regards Biplob Biswas Gyula Fóra wrote > Hi, > > Iterating after every incoming point/centroid update means that you > basically defeat the purpose of having parallelism in your Flink job. > > If you only "sync" the centroids periodically by the broadcast you can > make > your program run efficiently in parallel. This should be fine for machine > learning use-cases where the results should converge anyways. > > Gyula > > Biplob Biswas < > revolutionisme@ > > ezt írta (időpont: 2016. máj. 2., > H, 17:02): > >> Hi Gyula, >> >> Could you explain a bit why i wouldn't want the centroids to be collected >> after every point? >> >> I mean, once I get a streamed point via map1 function .. i would want to >> compare the distance of the point with a centroid which arrives via map2 >> function and i keep on comparing for every centroid which comes in >> subsequently, once the update of the centroid happens shouldn't i collect >> the entire set? Thus, updating a centroid and collecting it back for the >> next point in the iteration. >> >> I may not be getting the concept properly here, so an example snippet >> would >> help in a long run. >> >> Thanks & Regards >> Biplob >> Gyula Fóra wrote >> > Hey, >> > >> > I think you got the good idea :) >> > >> > So your coflatmap will get all the centroids that you have sent to the >> > stream in the closeWith call. This means that whenever you collect a >> new >> > set of centroids they will be iterated back. This means you don't >> always >> > want to send the centroids out on the collector, only periodically. >> > >> > The order in which these come is pretty much arbitrary so you need to >> make >> > sure to add some logic by which you can order it if this is important. >> > >> > Im not sure if this helped or not :D >> > >> > Gyula >> > >> > Biplob Biswas < >> >> > revolutionisme@ >> >> > > ezt írta (időpont: 2016. máj. 2., >> > H, 13:13): >> > >> >> Hi Gyula, >> >> >> >> I understand more now how this thing might work and its fascinating. >> >> Although I still have one question with the coflatmap function. >> >> >> >> First, let me explain what I understand and whether its correct or >> not: >> >> 1. The connected iterative stream ensures that the coflatmap function >> >> receive the points and the centroids which are broadcasted on each >> >> iteration >> >> defined by closewith. >> >> >> >> 2. So in the coflatmap function, on one map I get the points and on >> the >> >> other map function i get the centroids which are broadcasted. >> >> >> >> Now comes the part I am assuming a bit because I dont understand from >> the >> >> theory. >> >> 3. Assuming I can use the broadcasted centroids, I calculate the >> nearest >> >> centroid from the streamed point and I update the centroid and only >> use >> >> one >> >> of the collectors to return the updated centroids list back. >> >> >> >> >> >> The question here is, I am assuming that this operation is not done in >> >> parallel as if streams are sent in parallel how would I ensure correct >> >> update of the centroids as multiple points can try to update the same >> >> centroid in parallel . >> >> >> >> I hope I made myself clear with this. >> >> >> >> Thanks and Regards >> >> Biplob >> >> Biplob Biswas wrote >> >> > Hi Gyula, >> >> > >> >> > I read your workaround and started reading about flink iterations, >> >> > coflatmap operators and other things. Now, I do understand a few >> things >> >> > but the solution you provided is not completely clear to me. >> >> > >> >> > I understand the following things from your post. >> >> > 1. You initially have a datastream of points, on which you iterate >> and >> >> the >> >> > 'withFeedbackType' defines the type of the connected stream so >> rather >> >> than >> >> > "Points" the type is "Centroids" now. >> >> > >> >> > 2.On this connected stream (which I understand, only have the >> streamed >> >> > points right now), you run a flat map operator. And you mention >> >> / >> >> > "MyCoFlatmap would be a CoFlatMapFunction which on 1 input receive >> >> events >> >> > and update its local centroids (and periodically output the >> centroids) >> >> and >> >> > on the other input would send centroids of other flatmaps and would >> >> merge >> >> > them to the local." >> >> / >> >> > I dont understand this part completely, if i am not wrong, you are >> >> saying >> >> > that the co flatmap function would have 2 map functions. Now i dont >> >> > understand this part .. as to what specifically am i doing in each >> map >> >> > function? >> >> > >> >> > 3. lastly, the updated centroids which came back from the coflatmap >> >> > function is fed back to the stream again and this is the part i get >> >> lost >> >> > again ... how is this centroid fed back and if this is fed back what >> >> > happens to the point stream? and if it does somehow is fed back, how >> do >> >> i >> >> > catch it in the coflatmap function? >> >> > >> >> > >> >> > If I understand this a bit, then in your code the first set of >> >> centroids >> >> > are created in the coflatmap function and you dont already have a >> list >> >> of >> >> > centroids to start with? Am i assuming it correct? >> >> > >> >> > I underwent the process of iteration in the Kmeans example from this >> >> > following link: >> >> > >> >> >> https://github.com/stratosphere/stratosphere/blob/master/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/clustering/KMeans.java >> >> > >> >> > and I understand how this is working .. but i am stil not clear how >> ur >> >> > example is working. >> >> > >> >> > Could you please explain it a bit more? with some examples maybe? >> >> > >> >> > Thanks a lot. >> >> > Gyula Fóra-2 wrote >> >> >> Hi Biplob, >> >> >> >> >> >> I have implemented a similar algorithm as Aljoscha mentioned. >> >> >> >> >> >> First things to clarify are the following: >> >> >> There is currently no abstraction for keeping objects (in you case >> >> >> centroids) in a centralized way that can be updated/read by all >> >> >> operators. >> >> >> This would probably be very costly and is actually not necessary in >> >> your >> >> >> case. >> >> >> >> >> >> Broadcast a stream in contrast with other partitioning methods mean >> >> that >> >> >> the events will be replicated to all downstream operators. This not >> a >> >> >> magical operator that will make state available among parallel >> >> instances. >> >> >> >> >> >> Now let me explain what I think you want from Flink and how to do >> it >> >> :) >> >> >> >> >> >> You have input data stream and a set of centroids to be updated >> based >> >> on >> >> >> the incoming records. As you want to do this in parallel you have >> an >> >> >> operator (let's say a flatmap) that keeps the centroids locally and >> >> >> updates >> >> >> it on it's inputs. Now you have a set of independently updated >> >> centroids, >> >> >> so you want to merge them and update the centroids in each flatmap. >> >> >> >> >> >> Let's see how to do this. Given that you have your centroids >> locally, >> >> >> updating them is super easy, so I will not talk about that. The >> >> >> problematic >> >> >> part is periodically merging end "broadcasting" the centroids so >> all >> >> the >> >> >> flatmaps eventually see the same (they don't have to always be the >> >> same >> >> >> for >> >> >> clustering probably). There is no operator for sending state >> >> (centroids) >> >> >> between subtasks so you have to be clever here. We can actually use >> >> >> cyclic >> >> >> streams to solve this problem by sending the centroids as simple >> >> events >> >> >> to >> >> >> a CoFlatMap: >> >> >> >> >> >> DataStream >> >> >> >> > > <Point> >> >> >> input = ... >> >> >> ConnectedIterativeStreams<Point, Centroids> >> inputsAndCentroids >> = >> >> >> input.iterate().withFeedbackType(Centroids.class) >> >> >> DataStream >> >> >> >> > > <Centroids> >> >> >> updatedCentroids = >> >> >> inputsAndCentroids.flatMap(MyCoFlatmap) >> >> >> inputsAndCentroids.closeWith(updatedCentroids.broadcast()) >> >> >> >> >> >> MyCoFlatmap would be a CoFlatMapFunction which on 1 input receive >> >> events >> >> >> and update its local centroids (and periodically output the >> centroids) >> >> >> and >> >> >> on the other input would send centroids of other flatmaps and would >> >> merge >> >> >> them to the local. >> >> >> >> >> >> This might be a lot to take in at first, so you might want to read >> up >> >> on >> >> >> streaming iterations and connected streams before you start. >> >> >> >> >> >> Let me know if this makes sense. >> >> >> >> >> >> Cheers, >> >> >> Gyula >> >> >> >> >> >> >> >> >> Biplob Biswas < >> >> >> >> >> revolutionisme@ >> >> >> >> >> > ezt írta (időpont: 2016. ápr. 28., >> >> >> Cs, 14:41): >> >> >> >> >> >>> That would really be great, any example would help me proceed with >> my >> >> >>> work. >> >> >>> Thanks a lot. >> >> >>> >> >> >>> >> >> >>> Aljoscha Krettek wrote >> >> >>> > Hi Biplob, >> >> >>> > one of our developers had a stream clustering example a while >> back. >> >> It >> >> >>> was >> >> >>> > using a broadcast feedback edge with a co-operator to update the >> >> >>> > centroids. >> >> >>> > I'll directly include him in the email so that he will notice >> and >> >> can >> >> >>> send >> >> >>> > you the example. >> >> >>> > >> >> >>> > Cheers, >> >> >>> > Aljoscha >> >> >>> > >> >> >>> > On Thu, 28 Apr 2016 at 13:57 Biplob Biswas < >> >> >>> >> >> >>> > revolutionisme@ >> >> >>> >> >> >>> > > wrote: >> >> >>> > >> >> >>> >> I am pretty new to flink systems, thus can anyone atleast give >> me >> >> an >> >> >>> >> example >> >> >>> >> of how datastream.broadcast() method works? From the >> documentation >> >> i >> >> >>> get >> >> >>> >> the >> >> >>> >> following: >> >> >>> >> >> >> >>> >> broadcast() >> >> >>> >> Sets the partitioning of the DataStream so that the output >> >> elements >> >> >>> are >> >> >>> >> broadcasted to every parallel instance of the next operation. >> >> >>> >> >> >> >>> >> If the output elements are broadcasted, then how are they >> >> retrieved? >> >> >>> Or >> >> >>> >> maybe I am looking at this method in a completely wrong way? >> >> >>> >> >> >> >>> >> Thanks >> >> >>> >> Biplob Biswas >> >> >>> >> >> >> >>> >> >> >> >>> >> >> >> >>> >> -- >> >> >>> >> View this message in context: >> >> >>> >> >> >> >>> >> >> >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6543.html >> >> >>> >> Sent from the Apache Flink User Mailing List archive. mailing >> list >> >> >>> >> archive >> >> >>> >> at Nabble.com. >> >> >>> >> >> >> >>> >> >> >>> >> >> >>> >> >> >>> >> >> >>> >> >> >>> -- >> >> >>> View this message in context: >> >> >>> >> >> >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6548.html >> >> >>> Sent from the Apache Flink User Mailing List archive. mailing list >> >> >>> archive >> >> >>> at Nabble.com. >> >> >>> >> >> >> >> >> >> >> >> >> >> >> >> -- >> >> View this message in context: >> >> >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6612.html >> >> Sent from the Apache Flink User Mailing List archive. mailing list >> >> archive >> >> at Nabble.com. >> >> >> >> >> >> >> >> -- >> View this message in context: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6619.html >> Sent from the Apache Flink User Mailing List archive. mailing list >> archive >> at Nabble.com. >> -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6707.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.