This is exactly what I am confused about, if i understand it correctly each
of the map function in the co-flat map would receive one tuple each at a
time .. so that would mean if i have a datastream of centroids, it would
arrive one at a time on the partitions and that would defeat the purpose.

Are you proposing that i put the entire list of centroid as a single
datastream object so that the map functions get the entire list whenever it
is called? 

Would it be possible for you to just give an example or a code snippet or a
link to some use case of the co-flat map function?

Thanks a lot for your help throughout.

Regards
Biplob Biswas 


Gyula Fóra wrote
> Hi,
> 
> Iterating after every incoming point/centroid update means that you
> basically defeat the purpose of having parallelism in your Flink job.
> 
> If you only "sync" the centroids periodically by the broadcast you can
> make
> your program run efficiently in parallel. This should be fine for machine
> learning use-cases where the results should converge anyways.
> 
> Gyula
> 
> Biplob Biswas <

> revolutionisme@

> > ezt írta (időpont: 2016. máj. 2.,
> H, 17:02):
> 
>> Hi Gyula,
>>
>> Could you explain a bit why i wouldn't want the centroids to be collected
>> after every point?
>>
>> I mean, once I get a streamed point via map1 function .. i would want to
>> compare the distance of the point with a centroid which arrives via map2
>> function and i keep on comparing for every centroid which comes in
>> subsequently, once the update of the centroid happens shouldn't i collect
>> the entire set? Thus, updating a centroid and collecting it back for the
>> next point in the iteration.
>>
>> I may not be getting the concept properly here, so an example snippet
>> would
>> help in a long run.
>>
>> Thanks & Regards
>> Biplob
>> Gyula Fóra wrote
>> > Hey,
>> >
>> > I think you got the good idea :)
>> >
>> > So your coflatmap will get all the centroids that you have sent to the
>> > stream in the closeWith call. This means that whenever you collect a
>> new
>> > set of centroids they will be iterated back. This means you don't
>> always
>> > want to send the centroids out on the collector, only periodically.
>> >
>> > The order in which these come is pretty much arbitrary so you need to
>> make
>> > sure to add some logic by which you can order it if this is important.
>> >
>> > Im not sure if this helped or not :D
>> >
>> > Gyula
>> >
>> > Biplob Biswas <
>>
>> > revolutionisme@
>>
>> > > ezt írta (időpont: 2016. máj. 2.,
>> > H, 13:13):
>> >
>> >> Hi Gyula,
>> >>
>> >> I understand more now how this thing might work and its fascinating.
>> >> Although I still have one question with the coflatmap function.
>> >>
>> >> First, let me explain what I understand and whether its correct or
>> not:
>> >> 1. The connected iterative stream ensures that the coflatmap function
>> >> receive the points and the centroids which are broadcasted on each
>> >> iteration
>> >> defined by closewith.
>> >>
>> >> 2. So in the coflatmap function, on one map I get the points and on
>> the
>> >> other map function i get the centroids which are broadcasted.
>> >>
>> >> Now comes the part I am assuming a bit because I dont understand from
>> the
>> >> theory.
>> >> 3. Assuming I can use the broadcasted centroids, I calculate the
>> nearest
>> >> centroid from the streamed point and I update the centroid and only
>> use
>> >> one
>> >> of the collectors to return the updated centroids list back.
>> >>
>> >>
>> >> The question here is, I am assuming that this operation is not done in
>> >> parallel as if streams are sent in parallel how would I ensure correct
>> >> update of the centroids as multiple points can try to update the same
>> >> centroid in parallel .
>> >>
>> >> I hope I made myself clear with this.
>> >>
>> >> Thanks and Regards
>> >> Biplob
>> >> Biplob Biswas wrote
>> >> > Hi Gyula,
>> >> >
>> >> > I read your workaround and started reading about flink iterations,
>> >> > coflatmap operators and other things. Now, I do understand a few
>> things
>> >> > but the solution you provided is not completely clear to me.
>> >> >
>> >> > I understand the following things from your post.
>> >> > 1. You initially have a datastream of points, on which you iterate
>> and
>> >> the
>> >> > 'withFeedbackType' defines the type of the connected stream so
>> rather
>> >> than
>> >> > "Points" the type is  "Centroids" now.
>> >> >
>> >> > 2.On this connected stream (which I understand, only have the
>> streamed
>> >> > points right now), you run a flat map operator. And you mention
>> >> /
>> >> > "MyCoFlatmap would be a CoFlatMapFunction which on 1 input receive
>> >> events
>> >> > and update its local centroids (and periodically output the
>> centroids)
>> >> and
>> >> > on the other input would send centroids of other flatmaps and would
>> >> merge
>> >> > them to the local."
>> >> /
>> >> > I dont understand this part completely, if i am not wrong, you are
>> >> saying
>> >> > that the co flatmap function would have 2 map functions. Now i dont
>> >> > understand this part .. as to what specifically am i doing in each
>> map
>> >> > function?
>> >> >
>> >> > 3. lastly, the updated centroids which came back from the coflatmap
>> >> > function is fed back to the stream again and this is the part i get
>> >> lost
>> >> > again ... how is this centroid fed back and if this is fed back what
>> >> > happens to the point stream? and if it does somehow is fed back, how
>> do
>> >> i
>> >> > catch it in the coflatmap function?
>> >> >
>> >> >
>> >> > If I understand this a bit, then in your code the first set of
>> >> centroids
>> >> > are created in the coflatmap function and you dont already have a
>> list
>> >> of
>> >> > centroids to start with? Am i assuming it correct?
>> >> >
>> >> > I underwent the process of iteration in the Kmeans example from this
>> >> > following link:
>> >> >
>> >>
>> https://github.com/stratosphere/stratosphere/blob/master/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/clustering/KMeans.java
>> >> >
>> >> > and I understand how this is working .. but i am stil not clear how
>> ur
>> >> > example is working.
>> >> >
>> >> > Could you please explain it a bit more? with some examples maybe?
>> >> >
>> >> > Thanks a lot.
>> >> > Gyula Fóra-2 wrote
>> >> >> Hi Biplob,
>> >> >>
>> >> >> I have implemented a similar algorithm as Aljoscha mentioned.
>> >> >>
>> >> >> First things to clarify are the following:
>> >> >> There is currently no abstraction for keeping objects (in you case
>> >> >> centroids) in a centralized way that can be updated/read by all
>> >> >> operators.
>> >> >> This would probably be very costly and is actually not necessary in
>> >> your
>> >> >> case.
>> >> >>
>> >> >> Broadcast a stream in contrast with other partitioning methods mean
>> >> that
>> >> >> the events will be replicated to all downstream operators. This not
>> a
>> >> >> magical operator that will make state available among parallel
>> >> instances.
>> >> >>
>> >> >> Now let me explain what I think you want from Flink and how to do
>> it
>> >> :)
>> >> >>
>> >> >> You have input data stream and a set of centroids to be updated
>> based
>> >> on
>> >> >> the incoming records. As you want to do this in parallel you have
>> an
>> >> >> operator (let's say a flatmap) that keeps the centroids locally and
>> >> >> updates
>> >> >> it on it's inputs. Now you have a set of independently updated
>> >> centroids,
>> >> >> so you want to merge them and update the centroids in each flatmap.
>> >> >>
>> >> >> Let's see how to do this. Given that you have your centroids
>> locally,
>> >> >> updating them is super easy, so I will not talk about that. The
>> >> >> problematic
>> >> >> part is periodically merging end "broadcasting" the centroids so
>> all
>> >> the
>> >> >> flatmaps eventually see the same (they don't have to always be the
>> >> same
>> >> >> for
>> >> >> clustering probably). There is no operator for sending state
>> >> (centroids)
>> >> >> between subtasks so you have to be clever here. We can actually use
>> >> >> cyclic
>> >> >> streams to solve this problem by sending the centroids as simple
>> >> events
>> >> >> to
>> >> >> a CoFlatMap:
>> >> >>
>> >> >> DataStream
>> >> >>
>> > 
> <Point>
>> >> >>  input = ...
>> >> >> ConnectedIterativeStreams&lt;Point, Centroids&gt;
>> inputsAndCentroids
>> =
>> >> >> input.iterate().withFeedbackType(Centroids.class)
>> >> >> DataStream
>> >> >>
>> > 
> <Centroids>
>> >> >>  updatedCentroids =
>> >> >> inputsAndCentroids.flatMap(MyCoFlatmap)
>> >> >> inputsAndCentroids.closeWith(updatedCentroids.broadcast())
>> >> >>
>> >> >> MyCoFlatmap would be a CoFlatMapFunction which on 1 input receive
>> >> events
>> >> >> and update its local centroids (and periodically output the
>> centroids)
>> >> >> and
>> >> >> on the other input would send centroids of other flatmaps and would
>> >> merge
>> >> >> them to the local.
>> >> >>
>> >> >> This might be a lot to take in at first, so you might want to read
>> up
>> >> on
>> >> >> streaming iterations and connected streams before you start.
>> >> >>
>> >> >> Let me know if this makes sense.
>> >> >>
>> >> >> Cheers,
>> >> >> Gyula
>> >> >>
>> >> >>
>> >> >> Biplob Biswas &lt;
>> >>
>> >> >> revolutionisme@
>> >>
>> >> >> &gt; ezt írta (időpont: 2016. ápr. 28.,
>> >> >> Cs, 14:41):
>> >> >>
>> >> >>> That would really be great, any example would help me proceed with
>> my
>> >> >>> work.
>> >> >>> Thanks a lot.
>> >> >>>
>> >> >>>
>> >> >>> Aljoscha Krettek wrote
>> >> >>> > Hi Biplob,
>> >> >>> > one of our developers had a stream clustering example a while
>> back.
>> >> It
>> >> >>> was
>> >> >>> > using a broadcast feedback edge with a co-operator to update the
>> >> >>> > centroids.
>> >> >>> > I'll directly include him in the email so that he will notice
>> and
>> >> can
>> >> >>> send
>> >> >>> > you the example.
>> >> >>> >
>> >> >>> > Cheers,
>> >> >>> > Aljoscha
>> >> >>> >
>> >> >>> > On Thu, 28 Apr 2016 at 13:57 Biplob Biswas &lt;
>> >> >>>
>> >> >>> > revolutionisme@
>> >> >>>
>> >> >>> > &gt; wrote:
>> >> >>> >
>> >> >>> >> I am pretty new to flink systems, thus can anyone atleast give
>> me
>> >> an
>> >> >>> >> example
>> >> >>> >> of how datastream.broadcast() method works? From the
>> documentation
>> >> i
>> >> >>> get
>> >> >>> >> the
>> >> >>> >> following:
>> >> >>> >>
>> >> >>> >> broadcast()
>> >> >>> >> Sets the partitioning of the DataStream so that the output
>> >> elements
>> >> >>> are
>> >> >>> >> broadcasted to every parallel instance of the next operation.
>> >> >>> >>
>> >> >>> >> If the output elements are broadcasted, then how are they
>> >> retrieved?
>> >> >>> Or
>> >> >>> >> maybe I am looking at this method in a completely wrong way?
>> >> >>> >>
>> >> >>> >> Thanks
>> >> >>> >> Biplob Biswas
>> >> >>> >>
>> >> >>> >>
>> >> >>> >>
>> >> >>> >> --
>> >> >>> >> View this message in context:
>> >> >>> >>
>> >> >>>
>> >>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6543.html
>> >> >>> >> Sent from the Apache Flink User Mailing List archive. mailing
>> list
>> >> >>> >> archive
>> >> >>> >> at Nabble.com.
>> >> >>> >>
>> >> >>>
>> >> >>>
>> >> >>>
>> >> >>>
>> >> >>>
>> >> >>> --
>> >> >>> View this message in context:
>> >> >>>
>> >>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6548.html
>> >> >>> Sent from the Apache Flink User Mailing List archive. mailing list
>> >> >>> archive
>> >> >>> at Nabble.com.
>> >> >>>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> --
>> >> View this message in context:
>> >>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6612.html
>> >> Sent from the Apache Flink User Mailing List archive. mailing list
>> >> archive
>> >> at Nabble.com.
>> >>
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6619.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive
>> at Nabble.com.
>>





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6707.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to