+1 for the renaming!

On Wed, Jan 6, 2016 at 8:01 PM, Vasiliki Kalavri <vasilikikala...@gmail.com>
wrote:

> issue created: https://issues.apache.org/jira/browse/FLINK-3207
>
> If anyone has any other suggestion about the renaming, let me know :)
>
> -V.
>
> On 5 January 2016 at 11:52, Aljoscha Krettek <aljos...@apache.org> wrote:
>
> > Nice to hear. :D
> >
> > I think you can go ahead and add the Jira. About the renaming: I also
> > think that it would make sense to do it.
> > > On 04 Jan 2016, at 19:48, Vasiliki Kalavri <vasilikikala...@gmail.com>
> > wrote:
> > >
> > > Hello squirrels and happy new year!
> > >
> > > I'm reviving this thread to share some results and discuss next steps.
> > >
> > > Using the Either type I was able to get rid of redundant messages and
> > > vertex state. During the past few weeks, I have been running
> experiments,
> > > which show that the performance of this "Pregel" model has improved a
> > lot :)
> > > In [1], you can see the speedup of GSA and Pregel over Spargel, for
> SSSP
> > > and Connected Components (CC), for the Livejournal (68m edges), Orkut
> > (117m
> > > edges) and Wikipedia (340m edges) datasets.
> > >
> > > Regarding next steps, if no objections, I will open a Jira for adding a
> > > Pregel iteration abstraction to Gelly. The Gelly guide has to be
> updated
> > to
> > > reflect the spectrum of iteration abstractions that we have discussed
> in
> > > this thread, i.e. Pregel -> Spargel (Scatter-Gather) -> GSA.
> > >
> > > I think it might also be a good idea to do some renaming. Currently, we
> > > call the Spargel iteration "vertex-centric", which fits better to the
> > > Pregel abstraction. I propose we rename the spargel iteration into
> > > "scatter-gather" or "signal-collect" (where it was first introduced
> [2]).
> > > Any other ideas?
> > >
> > > Thanks,
> > > -Vasia.
> > >
> > > [1]:
> > >
> >
> https://drive.google.com/file/d/0BzQJrI2eGlyYRTRjMkp1d3R6eVE/view?usp=sharing
> > > [2]: http://link.springer.com/chapter/10.1007/978-3-642-17746-0_48
> > >
> > > On 11 November 2015 at 11:05, Stephan Ewen <se...@apache.org> wrote:
> > >
> > >> See: https://issues.apache.org/jira/browse/FLINK-3002
> > >>
> > >> On Wed, Nov 11, 2015 at 10:54 AM, Stephan Ewen <se...@apache.org>
> > wrote:
> > >>
> > >>> "Either" an "Optional" types are quite useful.
> > >>>
> > >>> Let's add them to the core Java API.
> > >>>
> > >>> On Wed, Nov 11, 2015 at 10:00 AM, Vasiliki Kalavri <
> > >>> vasilikikala...@gmail.com> wrote:
> > >>>
> > >>>> Thanks Fabian! I'll try that :)
> > >>>>
> > >>>> On 10 November 2015 at 22:31, Fabian Hueske <fhue...@gmail.com>
> > wrote:
> > >>>>
> > >>>>> You could implement a Java Either type (similar to Scala's Either)
> > >> that
> > >>>>> either has a Message or the VertexState and a corresponding
> > >>>> TypeInformation
> > >>>>> and TypeSerializer that serializes a byte flag to indicate which
> both
> > >>>> types
> > >>>>> is used.
> > >>>>> It might actually make sense, to add a generic Either type to the
> > Java
> > >>>> API
> > >>>>> in general (similar to the Java Tuples with resemble the Scala
> > >> Tuples).
> > >>>>>
> > >>>>> Cheers, Fabian
> > >>>>>
> > >>>>> 2015-11-10 22:16 GMT+01:00 Vasiliki Kalavri <
> > >> vasilikikala...@gmail.com
> > >>>>> :
> > >>>>>
> > >>>>>> Hi,
> > >>>>>>
> > >>>>>> after running a few experiments, I can confirm that putting the
> > >>>> combiner
> > >>>>>> after the flatMap is indeed more efficient.
> > >>>>>>
> > >>>>>> I ran SSSP and Connected Components with Spargel, GSA, and the
> > >> Pregel
> > >>>>> model
> > >>>>>> and the results are the following:
> > >>>>>>
> > >>>>>> - for SSSP, Spargel is always the slowest, GSA is a ~1.2x faster
> and
> > >>>>> Pregel
> > >>>>>> is ~1.1x faster without combiner, ~1.3x faster with combiner.
> > >>>>>> - for Connected Components, Spargel and GSA perform similarly,
> while
> > >>>>> Pregel
> > >>>>>> is 1.4-1.6x slower.
> > >>>>>>
> > >>>>>> To start with, this is much better than I expected :)
> > >>>>>> However, there is a main shortcoming in my current implementation
> > >> that
> > >>>>>> negatively impacts performance:
> > >>>>>> Since the compute function coGroup needs to output both new vertex
> > >>>> values
> > >>>>>> and new messages, I emit a wrapping tuple that contains both
> vertex
> > >>>> state
> > >>>>>> and messages and then filter them out based on a boolean field.
> The
> > >>>>> problem
> > >>>>>> is that since I cannot emit null fields, I emit a dummy message
> for
> > >>>> each
> > >>>>>> new vertex state and a dummy vertex state for each new message.
> That
> > >>>>>> essentially means that the intermediate messages result is double
> in
> > >>>>> size,
> > >>>>>> if say the vertex values are of the same type as the messages (can
> > >> be
> > >>>>> worse
> > >>>>>> if the vertex values are more complex).
> > >>>>>> So my question is, is there a way to avoid this redundancy, by
> > >> either
> > >>>>>> emitting null fields or by creating an operator that could emit 2
> > >>>>> different
> > >>>>>> types of tuples?
> > >>>>>>
> > >>>>>> Thanks!
> > >>>>>> -Vasia.
> > >>>>>>
> > >>>>>> On 9 November 2015 at 15:20, Fabian Hueske <fhue...@gmail.com>
> > >> wrote:
> > >>>>>>
> > >>>>>>> Hi Vasia,
> > >>>>>>>
> > >>>>>>> sorry for the late reply.
> > >>>>>>> I don't think there is a big difference. In both cases, the
> > >>>>> partitioning
> > >>>>>>> and sorting happens at the end of the iteration.
> > >>>>>>> If the groupReduce is applied before the workset is returned, the
> > >>>>> sorting
> > >>>>>>> happens on the filtered result (after the flatMap) which might be
> > >> a
> > >>>>>> little
> > >>>>>>> bit more efficient (depending on the ratio of messages and
> > >> solution
> > >>>> set
> > >>>>>>> updates). Also it does not require that the initial workset is
> > >>>> sorted
> > >>>>> for
> > >>>>>>> the first groupReduce.
> > >>>>>>>
> > >>>>>>> I would put it at the end.
> > >>>>>>>
> > >>>>>>> Cheers, Fabian
> > >>>>>>>
> > >>>>>>> 2015-11-05 17:19 GMT+01:00 Vasiliki Kalavri <
> > >>>> vasilikikala...@gmail.com
> > >>>>>> :
> > >>>>>>>
> > >>>>>>>> @Fabian
> > >>>>>>>>
> > >>>>>>>> Is there any advantage in putting the reducer-combiner before
> > >>>>> updating
> > >>>>>>> the
> > >>>>>>>> workset vs. after (i.e. right before the join with the solution
> > >>>> set)?
> > >>>>>>>>
> > >>>>>>>> If it helps, here are the plans of these 2 alternatives:
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>
> >
> https://drive.google.com/file/d/0BzQJrI2eGlyYcFV2RFo5dUFNXzg/view?usp=sharing
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>
> >
> https://drive.google.com/file/d/0BzQJrI2eGlyYN014NXp6OEZUdGs/view?usp=sharing
> > >>>>>>>>
> > >>>>>>>> Thanks a lot for the help!
> > >>>>>>>>
> > >>>>>>>> -Vasia.
> > >>>>>>>>
> > >>>>>>>> On 30 October 2015 at 21:28, Fabian Hueske <fhue...@gmail.com>
> > >>>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> We can of course inject an optional ReduceFunction (or
> > >>>> GroupReduce,
> > >>>>>> or
> > >>>>>>>>> combinable GroupReduce) to reduce the size of the work set.
> > >>>>>>>>> I suggested to remove the GroupReduce function, because it did
> > >>>> only
> > >>>>>>>> collect
> > >>>>>>>>> all messages into a single record by emitting the input
> > >> iterator
> > >>>>>> which
> > >>>>>>> is
> > >>>>>>>>> quite dangerous. Applying a combinable reduce function is
> > >> could
> > >>>>>> improve
> > >>>>>>>> the
> > >>>>>>>>> performance considerably.
> > >>>>>>>>>
> > >>>>>>>>> The good news is that it would come "for free" because the
> > >>>>> necessary
> > >>>>>>>>> partitioning and sorting can be reused (given the forwardField
> > >>>>>>>> annotations
> > >>>>>>>>> are correctly set):
> > >>>>>>>>> - The partitioning of the reduce can be reused for the join
> > >> with
> > >>>>> the
> > >>>>>>>>> solution set
> > >>>>>>>>> - The sort of the reduce is preserved by the join with the
> > >>>>> in-memory
> > >>>>>>>>> hash-table of the solution set and can be reused for the
> > >>>> coGroup.
> > >>>>>>>>>
> > >>>>>>>>> Best,
> > >>>>>>>>> Fabian
> > >>>>>>>>>
> > >>>>>>>>> 2015-10-30 18:38 GMT+01:00 Vasiliki Kalavri <
> > >>>>>> vasilikikala...@gmail.com
> > >>>>>>>> :
> > >>>>>>>>>
> > >>>>>>>>>> Hi Fabian,
> > >>>>>>>>>>
> > >>>>>>>>>> thanks so much for looking into this so quickly :-)
> > >>>>>>>>>>
> > >>>>>>>>>> One update I have to make is that I tried running a few
> > >>>>> experiments
> > >>>>>>>> with
> > >>>>>>>>>> this on a 6-node cluster. The current implementation gets
> > >>>> stuck
> > >>>>> at
> > >>>>>>>>>> "Rebuilding Workset Properties" and never finishes a single
> > >>>>>>> iteration.
> > >>>>>>>>>> Running the plan of one superstep without a delta iteration
> > >>>>>>> terminates
> > >>>>>>>>>> fine. I didn't have access to the cluster today, so I
> > >> couldn't
> > >>>>>> debug
> > >>>>>>>> this
> > >>>>>>>>>> further, but I will do as soon as I have access again.
> > >>>>>>>>>>
> > >>>>>>>>>> The rest of my comments are inline:
> > >>>>>>>>>>
> > >>>>>>>>>> On 30 October 2015 at 17:53, Fabian Hueske <
> > >> fhue...@gmail.com
> > >>>>>
> > >>>>>>> wrote:
> > >>>>>>>>>>
> > >>>>>>>>>>> Hi Vasia,
> > >>>>>>>>>>>
> > >>>>>>>>>>> I had a look at your new implementation and have a few
> > >> ideas
> > >>>>> for
> > >>>>>>>>>>> improvements.
> > >>>>>>>>>>> 1) Sending out the input iterator as you do in the last
> > >>>>>> GroupReduce
> > >>>>>>>> is
> > >>>>>>>>>>> quite dangerous and does not give a benefit compared to
> > >>>>>> collecting
> > >>>>>>>> all
> > >>>>>>>>>>> elements. Even though it is an iterator, it needs to be
> > >>>>>> completely
> > >>>>>>>>>>> materialized in-memory whenever the record is touched by
> > >>>> Flink
> > >>>>> or
> > >>>>>>>> user
> > >>>>>>>>>>> code.
> > >>>>>>>>>>> I would propose to skip the reduce step completely and
> > >>>> handle
> > >>>>> all
> > >>>>>>>>>> messages
> > >>>>>>>>>>> separates and only collect them in the CoGroup function
> > >>>> before
> > >>>>>>> giving
> > >>>>>>>>>> them
> > >>>>>>>>>>> into the VertexComputeFunction. Be careful, to only do
> > >> that
> > >>>>> with
> > >>>>>>>>>>> objectReuse disabled or take care to properly copy the
> > >>>>> messages.
> > >>>>>> If
> > >>>>>>>> you
> > >>>>>>>>>>> collect the messages in the CoGroup, you don't need the
> > >>>>>>> GroupReduce,
> > >>>>>>>>> have
> > >>>>>>>>>>> smaller records and you can remove the MessageIterator
> > >> class
> > >>>>>>>>> completely.
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> ​I see. The idea was to expose to message combiner that user
> > >>>>> could
> > >>>>>>>>>> ​implement if the messages are combinable, e.g. min, sum.
> > >> This
> > >>>>> is a
> > >>>>>>>>> common
> > >>>>>>>>>> case and reduces the message load significantly. Is there a
> > >>>> way I
> > >>>>>>> could
> > >>>>>>>>> do
> > >>>>>>>>>> something similar before the coGroup?
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>> 2) Add this annotation to the AppendVertexState function:
> > >>>>>>>>>>> @ForwardedFieldsFirst("*->f0"). This indicates that the
> > >>>>> complete
> > >>>>>>>>> element
> > >>>>>>>>>> of
> > >>>>>>>>>>> the first input becomes the first field of the output.
> > >> Since
> > >>>>> the
> > >>>>>>>> input
> > >>>>>>>>> is
> > >>>>>>>>>>> partitioned on "f0" (it comes out of the partitioned
> > >>>> solution
> > >>>>>> set)
> > >>>>>>>> the
> > >>>>>>>>>>> result of ApplyVertexState will be partitioned on "f0.f0"
> > >>>> which
> > >>>>>> is
> > >>>>>>>>>>> (accidentially :-D) the join key of the following coGroup
> > >>>>>> function
> > >>>>>>> ->
> > >>>>>>>>> no
> > >>>>>>>>>>> partitioning :-)
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> ​Great! I totally missed that ;)​
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>> 3) Adding the two flatMap functions behind the CoGroup
> > >>>> prevents
> > >>>>>>>>> chaining
> > >>>>>>>>>>> and causes therefore some serialization overhead but
> > >>>> shouldn't
> > >>>>> be
> > >>>>>>> too
> > >>>>>>>>>> bad.
> > >>>>>>>>>>>
> > >>>>>>>>>>> So in total I would make this program as follows:
> > >>>>>>>>>>>
> > >>>>>>>>>>> iVertices<K,VV>
> > >>>>>>>>>>> iMessage<K, Message> = iVertices.map(new InitWorkSet());
> > >>>>>>>>>>>
> > >>>>>>>>>>> iteration = iVertices.iterateDelta(iMessages, maxIt, 0)
> > >>>>>>>>>>> verticesWithMessage<Vertex, Message> =
> > >>>>> iteration.getSolutionSet()
> > >>>>>>>>>>>  .join(iteration.workSet())
> > >>>>>>>>>>>  .where(0) // solution set is local and build side
> > >>>>>>>>>>>  .equalTo(0) // workset is shuffled and probe side of
> > >>>> hashjoin
> > >>>>>>>>>>> superstepComp<Vertex,Tuple2<K, Message>,Bool> =
> > >>>>>>>>>>> verticesWithMessage.coGroup(edgessWithValue)
> > >>>>>>>>>>>  .where("f0.f0") // vwm is locally forward and sorted
> > >>>>>>>>>>>  .equalTo(0) //  edges are already partitioned and sorted
> > >>>> (if
> > >>>>>>> cached
> > >>>>>>>>>>> correctly)
> > >>>>>>>>>>>  .with(...) // The coGroup collects all messages in a
> > >>>>> collection
> > >>>>>>> and
> > >>>>>>>>>> gives
> > >>>>>>>>>>> it to the ComputeFunction
> > >>>>>>>>>>> delta<Vertex> = superStepComp.flatMap(...) // partitioned
> > >>>> when
> > >>>>>>> merged
> > >>>>>>>>>> into
> > >>>>>>>>>>> solution set
> > >>>>>>>>>>> workSet<K, Message> = superStepComp.flatMap(...) //
> > >>>> partitioned
> > >>>>>> for
> > >>>>>>>>> join
> > >>>>>>>>>>> iteration.closeWith(delta, workSet)
> > >>>>>>>>>>>
> > >>>>>>>>>>> So, if I am correct, the program will
> > >>>>>>>>>>> - partition the workset
> > >>>>>>>>>>> - sort the vertices with messages
> > >>>>>>>>>>> - partition the delta
> > >>>>>>>>>>>
> > >>>>>>>>>>> One observation I have is that this program requires that
> > >>>> all
> > >>>>>>>> messages
> > >>>>>>>>>> fit
> > >>>>>>>>>>> into memory. Was that also the case before?
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> ​I believe not. The plan has one coGroup that produces the
> > >>>>> messages
> > >>>>>>>> and a
> > >>>>>>>>>> following coGroup that groups by the messages "target ID"
> > >> and
> > >>>>>>> consumes
> > >>>>>>>>>> them​ in an iterator. That doesn't require them to fit in
> > >>>> memory,
> > >>>>>>>> right?
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> ​I'm also working on a version where the graph is
> > >> represented
> > >>>> as
> > >>>>> an
> > >>>>>>>>>> adjacency list, instead of two separate datasets of vertices
> > >>>> and
> > >>>>>>> edges.
> > >>>>>>>>> The
> > >>>>>>>>>> disadvantage is that the graph has to fit in memory, but I
> > >>>> think
> > >>>>>> the
> > >>>>>>>>>> advantages are many​. We'll be able to support edge value
> > >>>>> updates,
> > >>>>>>> edge
> > >>>>>>>>>> mutations and different edge access order guarantees. I'll
> > >> get
> > >>>>> back
> > >>>>>>> to
> > >>>>>>>>> this
> > >>>>>>>>>> thread when I have a working prototype.
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> Cheers,
> > >>>>>>>>>>> Fabian
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> ​Thanks again!
> > >>>>>>>>>>
> > >>>>>>>>>> Cheers,
> > >>>>>>>>>> -Vasia.
> > >>>>>>>>>> ​
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> 2015-10-27 19:10 GMT+01:00 Vasiliki Kalavri <
> > >>>>>>>> vasilikikala...@gmail.com
> > >>>>>>>>>> :
> > >>>>>>>>>>>
> > >>>>>>>>>>>> @Martin: thanks for your input! If you ran into any
> > >> other
> > >>>>>> issues
> > >>>>>>>>> that I
> > >>>>>>>>>>>> didn't mention, please let us know. Obviously, even with
> > >>>> my
> > >>>>>>>> proposal,
> > >>>>>>>>>>> there
> > >>>>>>>>>>>> are still features we cannot support, e.g. updating edge
> > >>>>> values
> > >>>>>>> and
> > >>>>>>>>>> graph
> > >>>>>>>>>>>> mutations. We'll need to re-think the underlying
> > >> iteration
> > >>>>>> and/or
> > >>>>>>>>> graph
> > >>>>>>>>>>>> representation for those.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> @Fabian: thanks a lot, no rush :)
> > >>>>>>>>>>>> Let me give you some more information that might make it
> > >>>>> easier
> > >>>>>>> to
> > >>>>>>>>>> reason
> > >>>>>>>>>>>> about performance:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Currently, in Spargel the SolutionSet (SS) keeps the
> > >>>> vertex
> > >>>>>> state
> > >>>>>>>> and
> > >>>>>>>>>> the
> > >>>>>>>>>>>> workset (WS) keeps the active vertices. The iteration is
> > >>>>>> composed
> > >>>>>>>> of
> > >>>>>>>>> 2
> > >>>>>>>>>>>> coGroups. The first one takes the WS and the edges and
> > >>>>> produces
> > >>>>>>>>>> messages.
> > >>>>>>>>>>>> The second one takes the messages and the SS and
> > >> produced
> > >>>> the
> > >>>>>> new
> > >>>>>>>> WS
> > >>>>>>>>>> and
> > >>>>>>>>>>>> the SS-delta.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> In my proposal, the SS has the vertex state and the WS
> > >> has
> > >>>>>>>> <vertexId,
> > >>>>>>>>>>>> MessageIterator> pairs, i.e. the inbox of each vertex.
> > >> The
> > >>>>> plan
> > >>>>>>> is
> > >>>>>>>>> more
> > >>>>>>>>>>>> complicated because compute() needs to have two
> > >> iterators:
> > >>>>> over
> > >>>>>>> the
> > >>>>>>>>>> edges
> > >>>>>>>>>>>> and over the messages.
> > >>>>>>>>>>>> First, I join SS and WS to get the active vertices (have
> > >>>>>>> received a
> > >>>>>>>>>> msg)
> > >>>>>>>>>>>> and their current state. Then I coGroup the result with
> > >>>> the
> > >>>>>> edges
> > >>>>>>>> to
> > >>>>>>>>>>> access
> > >>>>>>>>>>>> the neighbors. Now the main problem is that this coGroup
> > >>>>> needs
> > >>>>>> to
> > >>>>>>>>> have
> > >>>>>>>>>> 2
> > >>>>>>>>>>>> outputs: the new messages and the new vertex value. I
> > >>>>> couldn't
> > >>>>>>>> really
> > >>>>>>>>>>> find
> > >>>>>>>>>>>> a nice way to do this, so I'm emitting a Tuple that
> > >>>> contains
> > >>>>>> both
> > >>>>>>>>> types
> > >>>>>>>>>>> and
> > >>>>>>>>>>>> I have a flag to separate them later with 2 flatMaps.
> > >> From
> > >>>>> the
> > >>>>>>>> vertex
> > >>>>>>>>>>>> flatMap, I crete the SS-delta and from the messaged
> > >>>> flatMap I
> > >>>>>>>> apply a
> > >>>>>>>>>>>> reduce to group the messages by vertex and send them to
> > >>>> the
> > >>>>> new
> > >>>>>>> WS.
> > >>>>>>>>> One
> > >>>>>>>>>>>> optimization would be to expose a combiner here to
> > >> reduce
> > >>>>>> message
> > >>>>>>>>> size.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> tl;dr:
> > >>>>>>>>>>>> 1. 2 coGroups vs. Join + coGroup + flatMap + reduce
> > >>>>>>>>>>>> 2. how can we efficiently emit 2 different types of
> > >>>> records
> > >>>>>> from
> > >>>>>>> a
> > >>>>>>>>>>> coGroup?
> > >>>>>>>>>>>> 3. does it make any difference if we group/combine the
> > >>>>> messages
> > >>>>>>>>> before
> > >>>>>>>>>>>> updating the workset or after?
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>> -Vasia.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> On 27 October 2015 at 18:39, Fabian Hueske <
> > >>>>> fhue...@gmail.com>
> > >>>>>>>>> wrote:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> I'll try to have a look at the proposal from a
> > >>>> performance
> > >>>>>>> point
> > >>>>>>>> of
> > >>>>>>>>>>> view
> > >>>>>>>>>>>> in
> > >>>>>>>>>>>>> the next days.
> > >>>>>>>>>>>>> Please ping me, if I don't follow up this thread.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Cheers, Fabian
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> 2015-10-27 18:28 GMT+01:00 Martin Junghanns <
> > >>>>>>>>> m.jungha...@mailbox.org
> > >>>>>>>>>>> :
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Hi,
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> At our group, we also moved several algorithms from
> > >>>>> Giraph
> > >>>>>> to
> > >>>>>>>>> Gelly
> > >>>>>>>>>>> and
> > >>>>>>>>>>>>>> ran into some confusing issues (first in
> > >>>> understanding,
> > >>>>>>> second
> > >>>>>>>>>> during
> > >>>>>>>>>>>>>> implementation) caused by the conceptional
> > >> differences
> > >>>>> you
> > >>>>>>>>>> described.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> If there are no concrete advantages (performance
> > >>>> mainly)
> > >>>>> in
> > >>>>>>> the
> > >>>>>>>>>>> Spargel
> > >>>>>>>>>>>>>> implementation, we would be very happy to see the
> > >>>> Gelly
> > >>>>> API
> > >>>>>>> be
> > >>>>>>>>>>> aligned
> > >>>>>>>>>>>> to
> > >>>>>>>>>>>>>> Pregel-like systems.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Your SSSP example speaks for itself.
> > >> Straightforward,
> > >>>> if
> > >>>>>> the
> > >>>>>>>>> reader
> > >>>>>>>>>>> is
> > >>>>>>>>>>>>>> familiar with Pregel/Giraph/...
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>>> Martin
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> On 27.10.2015 17:40, Vasiliki Kalavri wrote:
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Hello squirrels,
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> I want to discuss with you a few concerns I have
> > >>>> about
> > >>>>> our
> > >>>>>>>>> current
> > >>>>>>>>>>>>>>> vertex-centric model implementation, Spargel, now
> > >>>> fully
> > >>>>>>>> subsumed
> > >>>>>>>>>> by
> > >>>>>>>>>>>>> Gelly.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Spargel is our implementation of Pregel [1], but it
> > >>>>>> violates
> > >>>>>>>>> some
> > >>>>>>>>>>>>>>> fundamental properties of the model, as described
> > >> in
> > >>>> the
> > >>>>>>> paper
> > >>>>>>>>> and
> > >>>>>>>>>>> as
> > >>>>>>>>>>>>>>> implemented in e.g. Giraph, GPS, Hama. I often find
> > >>>>> myself
> > >>>>>>>>>> confused
> > >>>>>>>>>>>> both
> > >>>>>>>>>>>>>>> when trying to explain it to current Giraph users
> > >> and
> > >>>>> when
> > >>>>>>>>> porting
> > >>>>>>>>>>> my
> > >>>>>>>>>>>>>>> Giraph algorithms to it.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> More specifically:
> > >>>>>>>>>>>>>>> - in the Pregel model, messages produced in
> > >>>> superstep n,
> > >>>>>> are
> > >>>>>>>>>>> received
> > >>>>>>>>>>>> in
> > >>>>>>>>>>>>>>> superstep n+1. In Spargel, they are produced and
> > >>>>> consumed
> > >>>>>> in
> > >>>>>>>> the
> > >>>>>>>>>>> same
> > >>>>>>>>>>>>>>> iteration.
> > >>>>>>>>>>>>>>> - in Pregel, vertices are active during a
> > >> superstep,
> > >>>> if
> > >>>>>> they
> > >>>>>>>>> have
> > >>>>>>>>>>>>> received
> > >>>>>>>>>>>>>>> a message in the previous superstep. In Spargel, a
> > >>>>> vertex
> > >>>>>> is
> > >>>>>>>>>> active
> > >>>>>>>>>>>>> during
> > >>>>>>>>>>>>>>> a superstep if it has changed its value.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> These two differences require a lot of rethinking
> > >>>> when
> > >>>>>>> porting
> > >>>>>>>>>>>>>>> applications
> > >>>>>>>>>>>>>>> and can easily cause bugs.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> The most important problem however is that we
> > >> require
> > >>>>> the
> > >>>>>>> user
> > >>>>>>>>> to
> > >>>>>>>>>>>> split
> > >>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>> computation in 2 phases (2 UDFs):
> > >>>>>>>>>>>>>>> - messaging: has access to the vertex state and can
> > >>>>>> produce
> > >>>>>>>>>> messages
> > >>>>>>>>>>>>>>> - update: has access to incoming messages and can
> > >>>> update
> > >>>>>> the
> > >>>>>>>>>> vertex
> > >>>>>>>>>>>>> value
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Pregel/Giraph only expose one UDF to the user:
> > >>>>>>>>>>>>>>> - compute: has access to both the vertex state and
> > >>>> the
> > >>>>>>>> incoming
> > >>>>>>>>>>>>> messages,
> > >>>>>>>>>>>>>>> can produce messages and update the vertex value.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> This might not seem like a big deal, but except
> > >> from
> > >>>>>> forcing
> > >>>>>>>> the
> > >>>>>>>>>>> user
> > >>>>>>>>>>>> to
> > >>>>>>>>>>>>>>> split their program logic into 2 phases, Spargel
> > >> also
> > >>>>>> makes
> > >>>>>>>> some
> > >>>>>>>>>>>> common
> > >>>>>>>>>>>>>>> computation patterns non-intuitive or impossible to
> > >>>>>> write. A
> > >>>>>>>>> very
> > >>>>>>>>>>>> simple
> > >>>>>>>>>>>>>>> example is propagating a message based on its value
> > >>>> or
> > >>>>>>> sender
> > >>>>>>>>> ID.
> > >>>>>>>>>> To
> > >>>>>>>>>>>> do
> > >>>>>>>>>>>>>>> this with Spargel, one has to store all the
> > >> incoming
> > >>>>>>> messages
> > >>>>>>>> in
> > >>>>>>>>>> the
> > >>>>>>>>>>>>>>> vertex
> > >>>>>>>>>>>>>>> value (might be of different type btw) during the
> > >>>>>> messaging
> > >>>>>>>>> phase,
> > >>>>>>>>>>> so
> > >>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>> they can be accessed during the update phase.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> So, my first question is, when implementing
> > >> Spargel,
> > >>>>> were
> > >>>>>>>> other
> > >>>>>>>>>>>>>>> alternatives considered and maybe rejected in favor
> > >>>> of
> > >>>>>>>>> performance
> > >>>>>>>>>>> or
> > >>>>>>>>>>>>>>> because of some other reason? If someone knows, I
> > >>>> would
> > >>>>>> love
> > >>>>>>>> to
> > >>>>>>>>>> hear
> > >>>>>>>>>>>>> about
> > >>>>>>>>>>>>>>> them!
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Second, I wrote a prototype implementation [2] that
> > >>>> only
> > >>>>>>>> exposes
> > >>>>>>>>>> one
> > >>>>>>>>>>>>> UDF,
> > >>>>>>>>>>>>>>> compute(), by keeping the vertex state in the
> > >>>> solution
> > >>>>> set
> > >>>>>>> and
> > >>>>>>>>> the
> > >>>>>>>>>>>>>>> messages
> > >>>>>>>>>>>>>>> in the workset. This way all previously mentioned
> > >>>>>>> limitations
> > >>>>>>>> go
> > >>>>>>>>>>> away
> > >>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>> the API (see "SSSPComputeFunction" in the example
> > >>>> [3])
> > >>>>>>> looks a
> > >>>>>>>>> lot
> > >>>>>>>>>>>> more
> > >>>>>>>>>>>>>>> like Giraph (see [4]).
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> I have not run any experiments yet and the
> > >> prototype
> > >>>> has
> > >>>>>>> some
> > >>>>>>>>> ugly
> > >>>>>>>>>>>>> hacks,
> > >>>>>>>>>>>>>>> but if you think any of this makes sense, then I'd
> > >> be
> > >>>>>>> willing
> > >>>>>>>> to
> > >>>>>>>>>>>> follow
> > >>>>>>>>>>>>> up
> > >>>>>>>>>>>>>>> and try to optimize it. If we see that it performs
> > >>>> well,
> > >>>>>> we
> > >>>>>>>> can
> > >>>>>>>>>>>> consider
> > >>>>>>>>>>>>>>> either replacing Spargel or adding it as an
> > >>>> alternative.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Thanks for reading this long e-mail and looking
> > >>>> forward
> > >>>>> to
> > >>>>>>>> your
> > >>>>>>>>>>> input!
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>>>>> -Vasia.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> [1]:
> > >>>> https://kowshik.github.io/JPregel/pregel_paper.pdf
> > >>>>>>>>>>>>>>> [2]:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>
> >
> https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew
> > >>>>>>>>>>>>>>> [3]:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>
> >
> https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java
> > >>>>>>>>>>>>>>> [4]:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>
> >
> https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>>
> > >>
> >
> >
>

Reply via email to