+1 for the renaming! On Wed, Jan 6, 2016 at 8:01 PM, Vasiliki Kalavri <vasilikikala...@gmail.com> wrote:
> issue created: https://issues.apache.org/jira/browse/FLINK-3207 > > If anyone has any other suggestion about the renaming, let me know :) > > -V. > > On 5 January 2016 at 11:52, Aljoscha Krettek <aljos...@apache.org> wrote: > > > Nice to hear. :D > > > > I think you can go ahead and add the Jira. About the renaming: I also > > think that it would make sense to do it. > > > On 04 Jan 2016, at 19:48, Vasiliki Kalavri <vasilikikala...@gmail.com> > > wrote: > > > > > > Hello squirrels and happy new year! > > > > > > I'm reviving this thread to share some results and discuss next steps. > > > > > > Using the Either type I was able to get rid of redundant messages and > > > vertex state. During the past few weeks, I have been running > experiments, > > > which show that the performance of this "Pregel" model has improved a > > lot :) > > > In [1], you can see the speedup of GSA and Pregel over Spargel, for > SSSP > > > and Connected Components (CC), for the Livejournal (68m edges), Orkut > > (117m > > > edges) and Wikipedia (340m edges) datasets. > > > > > > Regarding next steps, if no objections, I will open a Jira for adding a > > > Pregel iteration abstraction to Gelly. The Gelly guide has to be > updated > > to > > > reflect the spectrum of iteration abstractions that we have discussed > in > > > this thread, i.e. Pregel -> Spargel (Scatter-Gather) -> GSA. > > > > > > I think it might also be a good idea to do some renaming. Currently, we > > > call the Spargel iteration "vertex-centric", which fits better to the > > > Pregel abstraction. I propose we rename the spargel iteration into > > > "scatter-gather" or "signal-collect" (where it was first introduced > [2]). > > > Any other ideas? > > > > > > Thanks, > > > -Vasia. > > > > > > [1]: > > > > > > https://drive.google.com/file/d/0BzQJrI2eGlyYRTRjMkp1d3R6eVE/view?usp=sharing > > > [2]: http://link.springer.com/chapter/10.1007/978-3-642-17746-0_48 > > > > > > On 11 November 2015 at 11:05, Stephan Ewen <se...@apache.org> wrote: > > > > > >> See: https://issues.apache.org/jira/browse/FLINK-3002 > > >> > > >> On Wed, Nov 11, 2015 at 10:54 AM, Stephan Ewen <se...@apache.org> > > wrote: > > >> > > >>> "Either" an "Optional" types are quite useful. > > >>> > > >>> Let's add them to the core Java API. > > >>> > > >>> On Wed, Nov 11, 2015 at 10:00 AM, Vasiliki Kalavri < > > >>> vasilikikala...@gmail.com> wrote: > > >>> > > >>>> Thanks Fabian! I'll try that :) > > >>>> > > >>>> On 10 November 2015 at 22:31, Fabian Hueske <fhue...@gmail.com> > > wrote: > > >>>> > > >>>>> You could implement a Java Either type (similar to Scala's Either) > > >> that > > >>>>> either has a Message or the VertexState and a corresponding > > >>>> TypeInformation > > >>>>> and TypeSerializer that serializes a byte flag to indicate which > both > > >>>> types > > >>>>> is used. > > >>>>> It might actually make sense, to add a generic Either type to the > > Java > > >>>> API > > >>>>> in general (similar to the Java Tuples with resemble the Scala > > >> Tuples). > > >>>>> > > >>>>> Cheers, Fabian > > >>>>> > > >>>>> 2015-11-10 22:16 GMT+01:00 Vasiliki Kalavri < > > >> vasilikikala...@gmail.com > > >>>>> : > > >>>>> > > >>>>>> Hi, > > >>>>>> > > >>>>>> after running a few experiments, I can confirm that putting the > > >>>> combiner > > >>>>>> after the flatMap is indeed more efficient. > > >>>>>> > > >>>>>> I ran SSSP and Connected Components with Spargel, GSA, and the > > >> Pregel > > >>>>> model > > >>>>>> and the results are the following: > > >>>>>> > > >>>>>> - for SSSP, Spargel is always the slowest, GSA is a ~1.2x faster > and > > >>>>> Pregel > > >>>>>> is ~1.1x faster without combiner, ~1.3x faster with combiner. > > >>>>>> - for Connected Components, Spargel and GSA perform similarly, > while > > >>>>> Pregel > > >>>>>> is 1.4-1.6x slower. > > >>>>>> > > >>>>>> To start with, this is much better than I expected :) > > >>>>>> However, there is a main shortcoming in my current implementation > > >> that > > >>>>>> negatively impacts performance: > > >>>>>> Since the compute function coGroup needs to output both new vertex > > >>>> values > > >>>>>> and new messages, I emit a wrapping tuple that contains both > vertex > > >>>> state > > >>>>>> and messages and then filter them out based on a boolean field. > The > > >>>>> problem > > >>>>>> is that since I cannot emit null fields, I emit a dummy message > for > > >>>> each > > >>>>>> new vertex state and a dummy vertex state for each new message. > That > > >>>>>> essentially means that the intermediate messages result is double > in > > >>>>> size, > > >>>>>> if say the vertex values are of the same type as the messages (can > > >> be > > >>>>> worse > > >>>>>> if the vertex values are more complex). > > >>>>>> So my question is, is there a way to avoid this redundancy, by > > >> either > > >>>>>> emitting null fields or by creating an operator that could emit 2 > > >>>>> different > > >>>>>> types of tuples? > > >>>>>> > > >>>>>> Thanks! > > >>>>>> -Vasia. > > >>>>>> > > >>>>>> On 9 November 2015 at 15:20, Fabian Hueske <fhue...@gmail.com> > > >> wrote: > > >>>>>> > > >>>>>>> Hi Vasia, > > >>>>>>> > > >>>>>>> sorry for the late reply. > > >>>>>>> I don't think there is a big difference. In both cases, the > > >>>>> partitioning > > >>>>>>> and sorting happens at the end of the iteration. > > >>>>>>> If the groupReduce is applied before the workset is returned, the > > >>>>> sorting > > >>>>>>> happens on the filtered result (after the flatMap) which might be > > >> a > > >>>>>> little > > >>>>>>> bit more efficient (depending on the ratio of messages and > > >> solution > > >>>> set > > >>>>>>> updates). Also it does not require that the initial workset is > > >>>> sorted > > >>>>> for > > >>>>>>> the first groupReduce. > > >>>>>>> > > >>>>>>> I would put it at the end. > > >>>>>>> > > >>>>>>> Cheers, Fabian > > >>>>>>> > > >>>>>>> 2015-11-05 17:19 GMT+01:00 Vasiliki Kalavri < > > >>>> vasilikikala...@gmail.com > > >>>>>> : > > >>>>>>> > > >>>>>>>> @Fabian > > >>>>>>>> > > >>>>>>>> Is there any advantage in putting the reducer-combiner before > > >>>>> updating > > >>>>>>> the > > >>>>>>>> workset vs. after (i.e. right before the join with the solution > > >>>> set)? > > >>>>>>>> > > >>>>>>>> If it helps, here are the plans of these 2 alternatives: > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>> > > >>>> > > >> > > > https://drive.google.com/file/d/0BzQJrI2eGlyYcFV2RFo5dUFNXzg/view?usp=sharing > > >>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>> > > >>>> > > >> > > > https://drive.google.com/file/d/0BzQJrI2eGlyYN014NXp6OEZUdGs/view?usp=sharing > > >>>>>>>> > > >>>>>>>> Thanks a lot for the help! > > >>>>>>>> > > >>>>>>>> -Vasia. > > >>>>>>>> > > >>>>>>>> On 30 October 2015 at 21:28, Fabian Hueske <fhue...@gmail.com> > > >>>>> wrote: > > >>>>>>>> > > >>>>>>>>> We can of course inject an optional ReduceFunction (or > > >>>> GroupReduce, > > >>>>>> or > > >>>>>>>>> combinable GroupReduce) to reduce the size of the work set. > > >>>>>>>>> I suggested to remove the GroupReduce function, because it did > > >>>> only > > >>>>>>>> collect > > >>>>>>>>> all messages into a single record by emitting the input > > >> iterator > > >>>>>> which > > >>>>>>> is > > >>>>>>>>> quite dangerous. Applying a combinable reduce function is > > >> could > > >>>>>> improve > > >>>>>>>> the > > >>>>>>>>> performance considerably. > > >>>>>>>>> > > >>>>>>>>> The good news is that it would come "for free" because the > > >>>>> necessary > > >>>>>>>>> partitioning and sorting can be reused (given the forwardField > > >>>>>>>> annotations > > >>>>>>>>> are correctly set): > > >>>>>>>>> - The partitioning of the reduce can be reused for the join > > >> with > > >>>>> the > > >>>>>>>>> solution set > > >>>>>>>>> - The sort of the reduce is preserved by the join with the > > >>>>> in-memory > > >>>>>>>>> hash-table of the solution set and can be reused for the > > >>>> coGroup. > > >>>>>>>>> > > >>>>>>>>> Best, > > >>>>>>>>> Fabian > > >>>>>>>>> > > >>>>>>>>> 2015-10-30 18:38 GMT+01:00 Vasiliki Kalavri < > > >>>>>> vasilikikala...@gmail.com > > >>>>>>>> : > > >>>>>>>>> > > >>>>>>>>>> Hi Fabian, > > >>>>>>>>>> > > >>>>>>>>>> thanks so much for looking into this so quickly :-) > > >>>>>>>>>> > > >>>>>>>>>> One update I have to make is that I tried running a few > > >>>>> experiments > > >>>>>>>> with > > >>>>>>>>>> this on a 6-node cluster. The current implementation gets > > >>>> stuck > > >>>>> at > > >>>>>>>>>> "Rebuilding Workset Properties" and never finishes a single > > >>>>>>> iteration. > > >>>>>>>>>> Running the plan of one superstep without a delta iteration > > >>>>>>> terminates > > >>>>>>>>>> fine. I didn't have access to the cluster today, so I > > >> couldn't > > >>>>>> debug > > >>>>>>>> this > > >>>>>>>>>> further, but I will do as soon as I have access again. > > >>>>>>>>>> > > >>>>>>>>>> The rest of my comments are inline: > > >>>>>>>>>> > > >>>>>>>>>> On 30 October 2015 at 17:53, Fabian Hueske < > > >> fhue...@gmail.com > > >>>>> > > >>>>>>> wrote: > > >>>>>>>>>> > > >>>>>>>>>>> Hi Vasia, > > >>>>>>>>>>> > > >>>>>>>>>>> I had a look at your new implementation and have a few > > >> ideas > > >>>>> for > > >>>>>>>>>>> improvements. > > >>>>>>>>>>> 1) Sending out the input iterator as you do in the last > > >>>>>> GroupReduce > > >>>>>>>> is > > >>>>>>>>>>> quite dangerous and does not give a benefit compared to > > >>>>>> collecting > > >>>>>>>> all > > >>>>>>>>>>> elements. Even though it is an iterator, it needs to be > > >>>>>> completely > > >>>>>>>>>>> materialized in-memory whenever the record is touched by > > >>>> Flink > > >>>>> or > > >>>>>>>> user > > >>>>>>>>>>> code. > > >>>>>>>>>>> I would propose to skip the reduce step completely and > > >>>> handle > > >>>>> all > > >>>>>>>>>> messages > > >>>>>>>>>>> separates and only collect them in the CoGroup function > > >>>> before > > >>>>>>> giving > > >>>>>>>>>> them > > >>>>>>>>>>> into the VertexComputeFunction. Be careful, to only do > > >> that > > >>>>> with > > >>>>>>>>>>> objectReuse disabled or take care to properly copy the > > >>>>> messages. > > >>>>>> If > > >>>>>>>> you > > >>>>>>>>>>> collect the messages in the CoGroup, you don't need the > > >>>>>>> GroupReduce, > > >>>>>>>>> have > > >>>>>>>>>>> smaller records and you can remove the MessageIterator > > >> class > > >>>>>>>>> completely. > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> I see. The idea was to expose to message combiner that user > > >>>>> could > > >>>>>>>>>> implement if the messages are combinable, e.g. min, sum. > > >> This > > >>>>> is a > > >>>>>>>>> common > > >>>>>>>>>> case and reduces the message load significantly. Is there a > > >>>> way I > > >>>>>>> could > > >>>>>>>>> do > > >>>>>>>>>> something similar before the coGroup? > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>>> 2) Add this annotation to the AppendVertexState function: > > >>>>>>>>>>> @ForwardedFieldsFirst("*->f0"). This indicates that the > > >>>>> complete > > >>>>>>>>> element > > >>>>>>>>>> of > > >>>>>>>>>>> the first input becomes the first field of the output. > > >> Since > > >>>>> the > > >>>>>>>> input > > >>>>>>>>> is > > >>>>>>>>>>> partitioned on "f0" (it comes out of the partitioned > > >>>> solution > > >>>>>> set) > > >>>>>>>> the > > >>>>>>>>>>> result of ApplyVertexState will be partitioned on "f0.f0" > > >>>> which > > >>>>>> is > > >>>>>>>>>>> (accidentially :-D) the join key of the following coGroup > > >>>>>> function > > >>>>>>> -> > > >>>>>>>>> no > > >>>>>>>>>>> partitioning :-) > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> Great! I totally missed that ;) > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>>> 3) Adding the two flatMap functions behind the CoGroup > > >>>> prevents > > >>>>>>>>> chaining > > >>>>>>>>>>> and causes therefore some serialization overhead but > > >>>> shouldn't > > >>>>> be > > >>>>>>> too > > >>>>>>>>>> bad. > > >>>>>>>>>>> > > >>>>>>>>>>> So in total I would make this program as follows: > > >>>>>>>>>>> > > >>>>>>>>>>> iVertices<K,VV> > > >>>>>>>>>>> iMessage<K, Message> = iVertices.map(new InitWorkSet()); > > >>>>>>>>>>> > > >>>>>>>>>>> iteration = iVertices.iterateDelta(iMessages, maxIt, 0) > > >>>>>>>>>>> verticesWithMessage<Vertex, Message> = > > >>>>> iteration.getSolutionSet() > > >>>>>>>>>>> .join(iteration.workSet()) > > >>>>>>>>>>> .where(0) // solution set is local and build side > > >>>>>>>>>>> .equalTo(0) // workset is shuffled and probe side of > > >>>> hashjoin > > >>>>>>>>>>> superstepComp<Vertex,Tuple2<K, Message>,Bool> = > > >>>>>>>>>>> verticesWithMessage.coGroup(edgessWithValue) > > >>>>>>>>>>> .where("f0.f0") // vwm is locally forward and sorted > > >>>>>>>>>>> .equalTo(0) // edges are already partitioned and sorted > > >>>> (if > > >>>>>>> cached > > >>>>>>>>>>> correctly) > > >>>>>>>>>>> .with(...) // The coGroup collects all messages in a > > >>>>> collection > > >>>>>>> and > > >>>>>>>>>> gives > > >>>>>>>>>>> it to the ComputeFunction > > >>>>>>>>>>> delta<Vertex> = superStepComp.flatMap(...) // partitioned > > >>>> when > > >>>>>>> merged > > >>>>>>>>>> into > > >>>>>>>>>>> solution set > > >>>>>>>>>>> workSet<K, Message> = superStepComp.flatMap(...) // > > >>>> partitioned > > >>>>>> for > > >>>>>>>>> join > > >>>>>>>>>>> iteration.closeWith(delta, workSet) > > >>>>>>>>>>> > > >>>>>>>>>>> So, if I am correct, the program will > > >>>>>>>>>>> - partition the workset > > >>>>>>>>>>> - sort the vertices with messages > > >>>>>>>>>>> - partition the delta > > >>>>>>>>>>> > > >>>>>>>>>>> One observation I have is that this program requires that > > >>>> all > > >>>>>>>> messages > > >>>>>>>>>> fit > > >>>>>>>>>>> into memory. Was that also the case before? > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> I believe not. The plan has one coGroup that produces the > > >>>>> messages > > >>>>>>>> and a > > >>>>>>>>>> following coGroup that groups by the messages "target ID" > > >> and > > >>>>>>> consumes > > >>>>>>>>>> them in an iterator. That doesn't require them to fit in > > >>>> memory, > > >>>>>>>> right? > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> I'm also working on a version where the graph is > > >> represented > > >>>> as > > >>>>> an > > >>>>>>>>>> adjacency list, instead of two separate datasets of vertices > > >>>> and > > >>>>>>> edges. > > >>>>>>>>> The > > >>>>>>>>>> disadvantage is that the graph has to fit in memory, but I > > >>>> think > > >>>>>> the > > >>>>>>>>>> advantages are many. We'll be able to support edge value > > >>>>> updates, > > >>>>>>> edge > > >>>>>>>>>> mutations and different edge access order guarantees. I'll > > >> get > > >>>>> back > > >>>>>>> to > > >>>>>>>>> this > > >>>>>>>>>> thread when I have a working prototype. > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> Cheers, > > >>>>>>>>>>> Fabian > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> Thanks again! > > >>>>>>>>>> > > >>>>>>>>>> Cheers, > > >>>>>>>>>> -Vasia. > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> 2015-10-27 19:10 GMT+01:00 Vasiliki Kalavri < > > >>>>>>>> vasilikikala...@gmail.com > > >>>>>>>>>> : > > >>>>>>>>>>> > > >>>>>>>>>>>> @Martin: thanks for your input! If you ran into any > > >> other > > >>>>>> issues > > >>>>>>>>> that I > > >>>>>>>>>>>> didn't mention, please let us know. Obviously, even with > > >>>> my > > >>>>>>>> proposal, > > >>>>>>>>>>> there > > >>>>>>>>>>>> are still features we cannot support, e.g. updating edge > > >>>>> values > > >>>>>>> and > > >>>>>>>>>> graph > > >>>>>>>>>>>> mutations. We'll need to re-think the underlying > > >> iteration > > >>>>>> and/or > > >>>>>>>>> graph > > >>>>>>>>>>>> representation for those. > > >>>>>>>>>>>> > > >>>>>>>>>>>> @Fabian: thanks a lot, no rush :) > > >>>>>>>>>>>> Let me give you some more information that might make it > > >>>>> easier > > >>>>>>> to > > >>>>>>>>>> reason > > >>>>>>>>>>>> about performance: > > >>>>>>>>>>>> > > >>>>>>>>>>>> Currently, in Spargel the SolutionSet (SS) keeps the > > >>>> vertex > > >>>>>> state > > >>>>>>>> and > > >>>>>>>>>> the > > >>>>>>>>>>>> workset (WS) keeps the active vertices. The iteration is > > >>>>>> composed > > >>>>>>>> of > > >>>>>>>>> 2 > > >>>>>>>>>>>> coGroups. The first one takes the WS and the edges and > > >>>>> produces > > >>>>>>>>>> messages. > > >>>>>>>>>>>> The second one takes the messages and the SS and > > >> produced > > >>>> the > > >>>>>> new > > >>>>>>>> WS > > >>>>>>>>>> and > > >>>>>>>>>>>> the SS-delta. > > >>>>>>>>>>>> > > >>>>>>>>>>>> In my proposal, the SS has the vertex state and the WS > > >> has > > >>>>>>>> <vertexId, > > >>>>>>>>>>>> MessageIterator> pairs, i.e. the inbox of each vertex. > > >> The > > >>>>> plan > > >>>>>>> is > > >>>>>>>>> more > > >>>>>>>>>>>> complicated because compute() needs to have two > > >> iterators: > > >>>>> over > > >>>>>>> the > > >>>>>>>>>> edges > > >>>>>>>>>>>> and over the messages. > > >>>>>>>>>>>> First, I join SS and WS to get the active vertices (have > > >>>>>>> received a > > >>>>>>>>>> msg) > > >>>>>>>>>>>> and their current state. Then I coGroup the result with > > >>>> the > > >>>>>> edges > > >>>>>>>> to > > >>>>>>>>>>> access > > >>>>>>>>>>>> the neighbors. Now the main problem is that this coGroup > > >>>>> needs > > >>>>>> to > > >>>>>>>>> have > > >>>>>>>>>> 2 > > >>>>>>>>>>>> outputs: the new messages and the new vertex value. I > > >>>>> couldn't > > >>>>>>>> really > > >>>>>>>>>>> find > > >>>>>>>>>>>> a nice way to do this, so I'm emitting a Tuple that > > >>>> contains > > >>>>>> both > > >>>>>>>>> types > > >>>>>>>>>>> and > > >>>>>>>>>>>> I have a flag to separate them later with 2 flatMaps. > > >> From > > >>>>> the > > >>>>>>>> vertex > > >>>>>>>>>>>> flatMap, I crete the SS-delta and from the messaged > > >>>> flatMap I > > >>>>>>>> apply a > > >>>>>>>>>>>> reduce to group the messages by vertex and send them to > > >>>> the > > >>>>> new > > >>>>>>> WS. > > >>>>>>>>> One > > >>>>>>>>>>>> optimization would be to expose a combiner here to > > >> reduce > > >>>>>> message > > >>>>>>>>> size. > > >>>>>>>>>>>> > > >>>>>>>>>>>> tl;dr: > > >>>>>>>>>>>> 1. 2 coGroups vs. Join + coGroup + flatMap + reduce > > >>>>>>>>>>>> 2. how can we efficiently emit 2 different types of > > >>>> records > > >>>>>> from > > >>>>>>> a > > >>>>>>>>>>> coGroup? > > >>>>>>>>>>>> 3. does it make any difference if we group/combine the > > >>>>> messages > > >>>>>>>>> before > > >>>>>>>>>>>> updating the workset or after? > > >>>>>>>>>>>> > > >>>>>>>>>>>> Cheers, > > >>>>>>>>>>>> -Vasia. > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> On 27 October 2015 at 18:39, Fabian Hueske < > > >>>>> fhue...@gmail.com> > > >>>>>>>>> wrote: > > >>>>>>>>>>>> > > >>>>>>>>>>>>> I'll try to have a look at the proposal from a > > >>>> performance > > >>>>>>> point > > >>>>>>>> of > > >>>>>>>>>>> view > > >>>>>>>>>>>> in > > >>>>>>>>>>>>> the next days. > > >>>>>>>>>>>>> Please ping me, if I don't follow up this thread. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Cheers, Fabian > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> 2015-10-27 18:28 GMT+01:00 Martin Junghanns < > > >>>>>>>>> m.jungha...@mailbox.org > > >>>>>>>>>>> : > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> Hi, > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> At our group, we also moved several algorithms from > > >>>>> Giraph > > >>>>>> to > > >>>>>>>>> Gelly > > >>>>>>>>>>> and > > >>>>>>>>>>>>>> ran into some confusing issues (first in > > >>>> understanding, > > >>>>>>> second > > >>>>>>>>>> during > > >>>>>>>>>>>>>> implementation) caused by the conceptional > > >> differences > > >>>>> you > > >>>>>>>>>> described. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> If there are no concrete advantages (performance > > >>>> mainly) > > >>>>> in > > >>>>>>> the > > >>>>>>>>>>> Spargel > > >>>>>>>>>>>>>> implementation, we would be very happy to see the > > >>>> Gelly > > >>>>> API > > >>>>>>> be > > >>>>>>>>>>> aligned > > >>>>>>>>>>>> to > > >>>>>>>>>>>>>> Pregel-like systems. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Your SSSP example speaks for itself. > > >> Straightforward, > > >>>> if > > >>>>>> the > > >>>>>>>>> reader > > >>>>>>>>>>> is > > >>>>>>>>>>>>>> familiar with Pregel/Giraph/... > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Best, > > >>>>>>>>>>>>>> Martin > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> On 27.10.2015 17:40, Vasiliki Kalavri wrote: > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Hello squirrels, > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> I want to discuss with you a few concerns I have > > >>>> about > > >>>>> our > > >>>>>>>>> current > > >>>>>>>>>>>>>>> vertex-centric model implementation, Spargel, now > > >>>> fully > > >>>>>>>> subsumed > > >>>>>>>>>> by > > >>>>>>>>>>>>> Gelly. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Spargel is our implementation of Pregel [1], but it > > >>>>>> violates > > >>>>>>>>> some > > >>>>>>>>>>>>>>> fundamental properties of the model, as described > > >> in > > >>>> the > > >>>>>>> paper > > >>>>>>>>> and > > >>>>>>>>>>> as > > >>>>>>>>>>>>>>> implemented in e.g. Giraph, GPS, Hama. I often find > > >>>>> myself > > >>>>>>>>>> confused > > >>>>>>>>>>>> both > > >>>>>>>>>>>>>>> when trying to explain it to current Giraph users > > >> and > > >>>>> when > > >>>>>>>>> porting > > >>>>>>>>>>> my > > >>>>>>>>>>>>>>> Giraph algorithms to it. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> More specifically: > > >>>>>>>>>>>>>>> - in the Pregel model, messages produced in > > >>>> superstep n, > > >>>>>> are > > >>>>>>>>>>> received > > >>>>>>>>>>>> in > > >>>>>>>>>>>>>>> superstep n+1. In Spargel, they are produced and > > >>>>> consumed > > >>>>>> in > > >>>>>>>> the > > >>>>>>>>>>> same > > >>>>>>>>>>>>>>> iteration. > > >>>>>>>>>>>>>>> - in Pregel, vertices are active during a > > >> superstep, > > >>>> if > > >>>>>> they > > >>>>>>>>> have > > >>>>>>>>>>>>> received > > >>>>>>>>>>>>>>> a message in the previous superstep. In Spargel, a > > >>>>> vertex > > >>>>>> is > > >>>>>>>>>> active > > >>>>>>>>>>>>> during > > >>>>>>>>>>>>>>> a superstep if it has changed its value. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> These two differences require a lot of rethinking > > >>>> when > > >>>>>>> porting > > >>>>>>>>>>>>>>> applications > > >>>>>>>>>>>>>>> and can easily cause bugs. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> The most important problem however is that we > > >> require > > >>>>> the > > >>>>>>> user > > >>>>>>>>> to > > >>>>>>>>>>>> split > > >>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>> computation in 2 phases (2 UDFs): > > >>>>>>>>>>>>>>> - messaging: has access to the vertex state and can > > >>>>>> produce > > >>>>>>>>>> messages > > >>>>>>>>>>>>>>> - update: has access to incoming messages and can > > >>>> update > > >>>>>> the > > >>>>>>>>>> vertex > > >>>>>>>>>>>>> value > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Pregel/Giraph only expose one UDF to the user: > > >>>>>>>>>>>>>>> - compute: has access to both the vertex state and > > >>>> the > > >>>>>>>> incoming > > >>>>>>>>>>>>> messages, > > >>>>>>>>>>>>>>> can produce messages and update the vertex value. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> This might not seem like a big deal, but except > > >> from > > >>>>>> forcing > > >>>>>>>> the > > >>>>>>>>>>> user > > >>>>>>>>>>>> to > > >>>>>>>>>>>>>>> split their program logic into 2 phases, Spargel > > >> also > > >>>>>> makes > > >>>>>>>> some > > >>>>>>>>>>>> common > > >>>>>>>>>>>>>>> computation patterns non-intuitive or impossible to > > >>>>>> write. A > > >>>>>>>>> very > > >>>>>>>>>>>> simple > > >>>>>>>>>>>>>>> example is propagating a message based on its value > > >>>> or > > >>>>>>> sender > > >>>>>>>>> ID. > > >>>>>>>>>> To > > >>>>>>>>>>>> do > > >>>>>>>>>>>>>>> this with Spargel, one has to store all the > > >> incoming > > >>>>>>> messages > > >>>>>>>> in > > >>>>>>>>>> the > > >>>>>>>>>>>>>>> vertex > > >>>>>>>>>>>>>>> value (might be of different type btw) during the > > >>>>>> messaging > > >>>>>>>>> phase, > > >>>>>>>>>>> so > > >>>>>>>>>>>>> that > > >>>>>>>>>>>>>>> they can be accessed during the update phase. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> So, my first question is, when implementing > > >> Spargel, > > >>>>> were > > >>>>>>>> other > > >>>>>>>>>>>>>>> alternatives considered and maybe rejected in favor > > >>>> of > > >>>>>>>>> performance > > >>>>>>>>>>> or > > >>>>>>>>>>>>>>> because of some other reason? If someone knows, I > > >>>> would > > >>>>>> love > > >>>>>>>> to > > >>>>>>>>>> hear > > >>>>>>>>>>>>> about > > >>>>>>>>>>>>>>> them! > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Second, I wrote a prototype implementation [2] that > > >>>> only > > >>>>>>>> exposes > > >>>>>>>>>> one > > >>>>>>>>>>>>> UDF, > > >>>>>>>>>>>>>>> compute(), by keeping the vertex state in the > > >>>> solution > > >>>>> set > > >>>>>>> and > > >>>>>>>>> the > > >>>>>>>>>>>>>>> messages > > >>>>>>>>>>>>>>> in the workset. This way all previously mentioned > > >>>>>>> limitations > > >>>>>>>> go > > >>>>>>>>>>> away > > >>>>>>>>>>>>> and > > >>>>>>>>>>>>>>> the API (see "SSSPComputeFunction" in the example > > >>>> [3]) > > >>>>>>> looks a > > >>>>>>>>> lot > > >>>>>>>>>>>> more > > >>>>>>>>>>>>>>> like Giraph (see [4]). > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> I have not run any experiments yet and the > > >> prototype > > >>>> has > > >>>>>>> some > > >>>>>>>>> ugly > > >>>>>>>>>>>>> hacks, > > >>>>>>>>>>>>>>> but if you think any of this makes sense, then I'd > > >> be > > >>>>>>> willing > > >>>>>>>> to > > >>>>>>>>>>>> follow > > >>>>>>>>>>>>> up > > >>>>>>>>>>>>>>> and try to optimize it. If we see that it performs > > >>>> well, > > >>>>>> we > > >>>>>>>> can > > >>>>>>>>>>>> consider > > >>>>>>>>>>>>>>> either replacing Spargel or adding it as an > > >>>> alternative. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Thanks for reading this long e-mail and looking > > >>>> forward > > >>>>> to > > >>>>>>>> your > > >>>>>>>>>>> input! > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Cheers, > > >>>>>>>>>>>>>>> -Vasia. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> [1]: > > >>>> https://kowshik.github.io/JPregel/pregel_paper.pdf > > >>>>>>>>>>>>>>> [2]: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>> > > >>>> > > >> > > > https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew > > >>>>>>>>>>>>>>> [3]: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>> > > >>>> > > >> > > > https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java > > >>>>>>>>>>>>>>> [4]: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>> > > >>>> > > >> > > > https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>> > > >>>> > > >>> > > >>> > > >> > > > > >