Hi Gyula,

Could you explain a bit why i wouldn't want the centroids to be collected
after every point? 

I mean, once I get a streamed point via map1 function .. i would want to
compare the distance of the point with a centroid which arrives via map2
function and i keep on comparing for every centroid which comes in
subsequently, once the update of the centroid happens shouldn't i collect
the entire set? Thus, updating a centroid and collecting it back for the
next point in the iteration.

I may not be getting the concept properly here, so an example snippet would
help in a long run. 

Thanks & Regards
Biplob
Gyula Fóra wrote
> Hey,
> 
> I think you got the good idea :)
> 
> So your coflatmap will get all the centroids that you have sent to the
> stream in the closeWith call. This means that whenever you collect a new
> set of centroids they will be iterated back. This means you don't always
> want to send the centroids out on the collector, only periodically.
> 
> The order in which these come is pretty much arbitrary so you need to make
> sure to add some logic by which you can order it if this is important.
> 
> Im not sure if this helped or not :D
> 
> Gyula
> 
> Biplob Biswas <

> revolutionisme@

> > ezt írta (időpont: 2016. máj. 2.,
> H, 13:13):
> 
>> Hi Gyula,
>>
>> I understand more now how this thing might work and its fascinating.
>> Although I still have one question with the coflatmap function.
>>
>> First, let me explain what I understand and whether its correct or not:
>> 1. The connected iterative stream ensures that the coflatmap function
>> receive the points and the centroids which are broadcasted on each
>> iteration
>> defined by closewith.
>>
>> 2. So in the coflatmap function, on one map I get the points and on the
>> other map function i get the centroids which are broadcasted.
>>
>> Now comes the part I am assuming a bit because I dont understand from the
>> theory.
>> 3. Assuming I can use the broadcasted centroids, I calculate the nearest
>> centroid from the streamed point and I update the centroid and only use
>> one
>> of the collectors to return the updated centroids list back.
>>
>>
>> The question here is, I am assuming that this operation is not done in
>> parallel as if streams are sent in parallel how would I ensure correct
>> update of the centroids as multiple points can try to update the same
>> centroid in parallel .
>>
>> I hope I made myself clear with this.
>>
>> Thanks and Regards
>> Biplob
>> Biplob Biswas wrote
>> > Hi Gyula,
>> >
>> > I read your workaround and started reading about flink iterations,
>> > coflatmap operators and other things. Now, I do understand a few things
>> > but the solution you provided is not completely clear to me.
>> >
>> > I understand the following things from your post.
>> > 1. You initially have a datastream of points, on which you iterate and
>> the
>> > 'withFeedbackType' defines the type of the connected stream so rather
>> than
>> > "Points" the type is  "Centroids" now.
>> >
>> > 2.On this connected stream (which I understand, only have the streamed
>> > points right now), you run a flat map operator. And you mention
>> /
>> > "MyCoFlatmap would be a CoFlatMapFunction which on 1 input receive
>> events
>> > and update its local centroids (and periodically output the centroids)
>> and
>> > on the other input would send centroids of other flatmaps and would
>> merge
>> > them to the local."
>> /
>> > I dont understand this part completely, if i am not wrong, you are
>> saying
>> > that the co flatmap function would have 2 map functions. Now i dont
>> > understand this part .. as to what specifically am i doing in each map
>> > function?
>> >
>> > 3. lastly, the updated centroids which came back from the coflatmap
>> > function is fed back to the stream again and this is the part i get
>> lost
>> > again ... how is this centroid fed back and if this is fed back what
>> > happens to the point stream? and if it does somehow is fed back, how do
>> i
>> > catch it in the coflatmap function?
>> >
>> >
>> > If I understand this a bit, then in your code the first set of
>> centroids
>> > are created in the coflatmap function and you dont already have a list
>> of
>> > centroids to start with? Am i assuming it correct?
>> >
>> > I underwent the process of iteration in the Kmeans example from this
>> > following link:
>> >
>> https://github.com/stratosphere/stratosphere/blob/master/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/clustering/KMeans.java
>> >
>> > and I understand how this is working .. but i am stil not clear how ur
>> > example is working.
>> >
>> > Could you please explain it a bit more? with some examples maybe?
>> >
>> > Thanks a lot.
>> > Gyula Fóra-2 wrote
>> >> Hi Biplob,
>> >>
>> >> I have implemented a similar algorithm as Aljoscha mentioned.
>> >>
>> >> First things to clarify are the following:
>> >> There is currently no abstraction for keeping objects (in you case
>> >> centroids) in a centralized way that can be updated/read by all
>> >> operators.
>> >> This would probably be very costly and is actually not necessary in
>> your
>> >> case.
>> >>
>> >> Broadcast a stream in contrast with other partitioning methods mean
>> that
>> >> the events will be replicated to all downstream operators. This not a
>> >> magical operator that will make state available among parallel
>> instances.
>> >>
>> >> Now let me explain what I think you want from Flink and how to do it
>> :)
>> >>
>> >> You have input data stream and a set of centroids to be updated based
>> on
>> >> the incoming records. As you want to do this in parallel you have an
>> >> operator (let's say a flatmap) that keeps the centroids locally and
>> >> updates
>> >> it on it's inputs. Now you have a set of independently updated
>> centroids,
>> >> so you want to merge them and update the centroids in each flatmap.
>> >>
>> >> Let's see how to do this. Given that you have your centroids locally,
>> >> updating them is super easy, so I will not talk about that. The
>> >> problematic
>> >> part is periodically merging end "broadcasting" the centroids so all
>> the
>> >> flatmaps eventually see the same (they don't have to always be the
>> same
>> >> for
>> >> clustering probably). There is no operator for sending state
>> (centroids)
>> >> between subtasks so you have to be clever here. We can actually use
>> >> cyclic
>> >> streams to solve this problem by sending the centroids as simple
>> events
>> >> to
>> >> a CoFlatMap:
>> >>
>> >> DataStream
>> >> 
> <Point>
>> >>  input = ...
>> >> ConnectedIterativeStreams&lt;Point, Centroids&gt; inputsAndCentroids =
>> >> input.iterate().withFeedbackType(Centroids.class)
>> >> DataStream
>> >> 
> <Centroids>
>> >>  updatedCentroids =
>> >> inputsAndCentroids.flatMap(MyCoFlatmap)
>> >> inputsAndCentroids.closeWith(updatedCentroids.broadcast())
>> >>
>> >> MyCoFlatmap would be a CoFlatMapFunction which on 1 input receive
>> events
>> >> and update its local centroids (and periodically output the centroids)
>> >> and
>> >> on the other input would send centroids of other flatmaps and would
>> merge
>> >> them to the local.
>> >>
>> >> This might be a lot to take in at first, so you might want to read up
>> on
>> >> streaming iterations and connected streams before you start.
>> >>
>> >> Let me know if this makes sense.
>> >>
>> >> Cheers,
>> >> Gyula
>> >>
>> >>
>> >> Biplob Biswas &lt;
>>
>> >> revolutionisme@
>>
>> >> &gt; ezt írta (időpont: 2016. ápr. 28.,
>> >> Cs, 14:41):
>> >>
>> >>> That would really be great, any example would help me proceed with my
>> >>> work.
>> >>> Thanks a lot.
>> >>>
>> >>>
>> >>> Aljoscha Krettek wrote
>> >>> > Hi Biplob,
>> >>> > one of our developers had a stream clustering example a while back.
>> It
>> >>> was
>> >>> > using a broadcast feedback edge with a co-operator to update the
>> >>> > centroids.
>> >>> > I'll directly include him in the email so that he will notice and
>> can
>> >>> send
>> >>> > you the example.
>> >>> >
>> >>> > Cheers,
>> >>> > Aljoscha
>> >>> >
>> >>> > On Thu, 28 Apr 2016 at 13:57 Biplob Biswas &lt;
>> >>>
>> >>> > revolutionisme@
>> >>>
>> >>> > &gt; wrote:
>> >>> >
>> >>> >> I am pretty new to flink systems, thus can anyone atleast give me
>> an
>> >>> >> example
>> >>> >> of how datastream.broadcast() method works? From the documentation
>> i
>> >>> get
>> >>> >> the
>> >>> >> following:
>> >>> >>
>> >>> >> broadcast()
>> >>> >> Sets the partitioning of the DataStream so that the output
>> elements
>> >>> are
>> >>> >> broadcasted to every parallel instance of the next operation.
>> >>> >>
>> >>> >> If the output elements are broadcasted, then how are they
>> retrieved?
>> >>> Or
>> >>> >> maybe I am looking at this method in a completely wrong way?
>> >>> >>
>> >>> >> Thanks
>> >>> >> Biplob Biswas
>> >>> >>
>> >>> >>
>> >>> >>
>> >>> >> --
>> >>> >> View this message in context:
>> >>> >>
>> >>>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6543.html
>> >>> >> Sent from the Apache Flink User Mailing List archive. mailing list
>> >>> >> archive
>> >>> >> at Nabble.com.
>> >>> >>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>> View this message in context:
>> >>>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6548.html
>> >>> Sent from the Apache Flink User Mailing List archive. mailing list
>> >>> archive
>> >>> at Nabble.com.
>> >>>
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6612.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive
>> at Nabble.com.
>>





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6619.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to