Hi Gyula, Could you explain a bit why i wouldn't want the centroids to be collected after every point?
I mean, once I get a streamed point via map1 function .. i would want to compare the distance of the point with a centroid which arrives via map2 function and i keep on comparing for every centroid which comes in subsequently, once the update of the centroid happens shouldn't i collect the entire set? Thus, updating a centroid and collecting it back for the next point in the iteration. I may not be getting the concept properly here, so an example snippet would help in a long run. Thanks & Regards Biplob Gyula Fóra wrote > Hey, > > I think you got the good idea :) > > So your coflatmap will get all the centroids that you have sent to the > stream in the closeWith call. This means that whenever you collect a new > set of centroids they will be iterated back. This means you don't always > want to send the centroids out on the collector, only periodically. > > The order in which these come is pretty much arbitrary so you need to make > sure to add some logic by which you can order it if this is important. > > Im not sure if this helped or not :D > > Gyula > > Biplob Biswas < > revolutionisme@ > > ezt írta (időpont: 2016. máj. 2., > H, 13:13): > >> Hi Gyula, >> >> I understand more now how this thing might work and its fascinating. >> Although I still have one question with the coflatmap function. >> >> First, let me explain what I understand and whether its correct or not: >> 1. The connected iterative stream ensures that the coflatmap function >> receive the points and the centroids which are broadcasted on each >> iteration >> defined by closewith. >> >> 2. So in the coflatmap function, on one map I get the points and on the >> other map function i get the centroids which are broadcasted. >> >> Now comes the part I am assuming a bit because I dont understand from the >> theory. >> 3. Assuming I can use the broadcasted centroids, I calculate the nearest >> centroid from the streamed point and I update the centroid and only use >> one >> of the collectors to return the updated centroids list back. >> >> >> The question here is, I am assuming that this operation is not done in >> parallel as if streams are sent in parallel how would I ensure correct >> update of the centroids as multiple points can try to update the same >> centroid in parallel . >> >> I hope I made myself clear with this. >> >> Thanks and Regards >> Biplob >> Biplob Biswas wrote >> > Hi Gyula, >> > >> > I read your workaround and started reading about flink iterations, >> > coflatmap operators and other things. Now, I do understand a few things >> > but the solution you provided is not completely clear to me. >> > >> > I understand the following things from your post. >> > 1. You initially have a datastream of points, on which you iterate and >> the >> > 'withFeedbackType' defines the type of the connected stream so rather >> than >> > "Points" the type is "Centroids" now. >> > >> > 2.On this connected stream (which I understand, only have the streamed >> > points right now), you run a flat map operator. And you mention >> / >> > "MyCoFlatmap would be a CoFlatMapFunction which on 1 input receive >> events >> > and update its local centroids (and periodically output the centroids) >> and >> > on the other input would send centroids of other flatmaps and would >> merge >> > them to the local." >> / >> > I dont understand this part completely, if i am not wrong, you are >> saying >> > that the co flatmap function would have 2 map functions. Now i dont >> > understand this part .. as to what specifically am i doing in each map >> > function? >> > >> > 3. lastly, the updated centroids which came back from the coflatmap >> > function is fed back to the stream again and this is the part i get >> lost >> > again ... how is this centroid fed back and if this is fed back what >> > happens to the point stream? and if it does somehow is fed back, how do >> i >> > catch it in the coflatmap function? >> > >> > >> > If I understand this a bit, then in your code the first set of >> centroids >> > are created in the coflatmap function and you dont already have a list >> of >> > centroids to start with? Am i assuming it correct? >> > >> > I underwent the process of iteration in the Kmeans example from this >> > following link: >> > >> https://github.com/stratosphere/stratosphere/blob/master/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/clustering/KMeans.java >> > >> > and I understand how this is working .. but i am stil not clear how ur >> > example is working. >> > >> > Could you please explain it a bit more? with some examples maybe? >> > >> > Thanks a lot. >> > Gyula Fóra-2 wrote >> >> Hi Biplob, >> >> >> >> I have implemented a similar algorithm as Aljoscha mentioned. >> >> >> >> First things to clarify are the following: >> >> There is currently no abstraction for keeping objects (in you case >> >> centroids) in a centralized way that can be updated/read by all >> >> operators. >> >> This would probably be very costly and is actually not necessary in >> your >> >> case. >> >> >> >> Broadcast a stream in contrast with other partitioning methods mean >> that >> >> the events will be replicated to all downstream operators. This not a >> >> magical operator that will make state available among parallel >> instances. >> >> >> >> Now let me explain what I think you want from Flink and how to do it >> :) >> >> >> >> You have input data stream and a set of centroids to be updated based >> on >> >> the incoming records. As you want to do this in parallel you have an >> >> operator (let's say a flatmap) that keeps the centroids locally and >> >> updates >> >> it on it's inputs. Now you have a set of independently updated >> centroids, >> >> so you want to merge them and update the centroids in each flatmap. >> >> >> >> Let's see how to do this. Given that you have your centroids locally, >> >> updating them is super easy, so I will not talk about that. The >> >> problematic >> >> part is periodically merging end "broadcasting" the centroids so all >> the >> >> flatmaps eventually see the same (they don't have to always be the >> same >> >> for >> >> clustering probably). There is no operator for sending state >> (centroids) >> >> between subtasks so you have to be clever here. We can actually use >> >> cyclic >> >> streams to solve this problem by sending the centroids as simple >> events >> >> to >> >> a CoFlatMap: >> >> >> >> DataStream >> >> > <Point> >> >> input = ... >> >> ConnectedIterativeStreams<Point, Centroids> inputsAndCentroids = >> >> input.iterate().withFeedbackType(Centroids.class) >> >> DataStream >> >> > <Centroids> >> >> updatedCentroids = >> >> inputsAndCentroids.flatMap(MyCoFlatmap) >> >> inputsAndCentroids.closeWith(updatedCentroids.broadcast()) >> >> >> >> MyCoFlatmap would be a CoFlatMapFunction which on 1 input receive >> events >> >> and update its local centroids (and periodically output the centroids) >> >> and >> >> on the other input would send centroids of other flatmaps and would >> merge >> >> them to the local. >> >> >> >> This might be a lot to take in at first, so you might want to read up >> on >> >> streaming iterations and connected streams before you start. >> >> >> >> Let me know if this makes sense. >> >> >> >> Cheers, >> >> Gyula >> >> >> >> >> >> Biplob Biswas < >> >> >> revolutionisme@ >> >> >> > ezt írta (időpont: 2016. ápr. 28., >> >> Cs, 14:41): >> >> >> >>> That would really be great, any example would help me proceed with my >> >>> work. >> >>> Thanks a lot. >> >>> >> >>> >> >>> Aljoscha Krettek wrote >> >>> > Hi Biplob, >> >>> > one of our developers had a stream clustering example a while back. >> It >> >>> was >> >>> > using a broadcast feedback edge with a co-operator to update the >> >>> > centroids. >> >>> > I'll directly include him in the email so that he will notice and >> can >> >>> send >> >>> > you the example. >> >>> > >> >>> > Cheers, >> >>> > Aljoscha >> >>> > >> >>> > On Thu, 28 Apr 2016 at 13:57 Biplob Biswas < >> >>> >> >>> > revolutionisme@ >> >>> >> >>> > > wrote: >> >>> > >> >>> >> I am pretty new to flink systems, thus can anyone atleast give me >> an >> >>> >> example >> >>> >> of how datastream.broadcast() method works? From the documentation >> i >> >>> get >> >>> >> the >> >>> >> following: >> >>> >> >> >>> >> broadcast() >> >>> >> Sets the partitioning of the DataStream so that the output >> elements >> >>> are >> >>> >> broadcasted to every parallel instance of the next operation. >> >>> >> >> >>> >> If the output elements are broadcasted, then how are they >> retrieved? >> >>> Or >> >>> >> maybe I am looking at this method in a completely wrong way? >> >>> >> >> >>> >> Thanks >> >>> >> Biplob Biswas >> >>> >> >> >>> >> >> >>> >> >> >>> >> -- >> >>> >> View this message in context: >> >>> >> >> >>> >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6543.html >> >>> >> Sent from the Apache Flink User Mailing List archive. mailing list >> >>> >> archive >> >>> >> at Nabble.com. >> >>> >> >> >>> >> >>> >> >>> >> >>> >> >>> >> >>> -- >> >>> View this message in context: >> >>> >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6548.html >> >>> Sent from the Apache Flink User Mailing List archive. mailing list >> >>> archive >> >>> at Nabble.com. >> >>> >> >> >> >> >> >> -- >> View this message in context: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6612.html >> Sent from the Apache Flink User Mailing List archive. mailing list >> archive >> at Nabble.com. >> -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6619.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.