Hello,
So this week, during TinkerPop's code freeze, Kuppitz and I have been stress
testing SparkGraphComputer on a 4 Blade cluster using the Friendster dataset
(125M vertices and 2.5B edges).
This is a list of things we learned and fixed.
First, Daniel Kuppitz wrote this really helpful script that gave us a
huge boost in testing time turn arounds. It does the following:
1. Pulls the latest from git://.
2. Builds the code (being smart to delete grapes!)
3. :install hadoop/spark plugins into the Gremlin console.
4. distribute HADOOP_GREMLIN_LIBS jars to all SparkServer nodes
in the cluster.
5. restarts the Spark cluster.
6. plops you at the console ready to rock.
Summary: here are the runtimes for g.V().count() over
SparkGraphComputer's life time with Friendster.
- TinkerPop 3.0.0.MX: 2.5 hours
- TinkerPop 3.0.0: 1.5 hours
- TinkerPop 3.1.1: 23 minutes
*** Of course, this is NOT good for g.V().count() in general,
but realize we are loading the entire graph, even though we only need vertices
(no edge or properties).
https://issues.apache.org/jira/browse/TINKERPOP-962
(g.V().count() should really only take ~5 minutes)
For g.V().out().out().out().count() over Friendster:
- TinkerPop 3.0.0.MX: 12.8 hours
- TinkerPop 3.0.0: 8.6 hours
- TinkerPop 3.1.1: 2.4 hours
Answer: 215664338057221 (thats 215 trillion length 3 paths in
Friendster)
1. Its all about GC control.
- Make many workers (we have 5 per machine) each with
relatively small heaps (10 gigs each).
- The massive heap and store everything in memory model doesn't
work -- it just leads to GC stalls.
2. Make use of TinkerPop's new gremlin.spark.graphStorageLevel (default
is MEMORY_ONLY).
- I like DISK_ONLY as the whole RDD is cached in the cluster's
file system.
- No fetching back to HDFS for data (especially when
you are using ScriptInputFormat which is expensive!)
- And you definitely don't want to go back to the graph
database and stress it needlessly.
- I don't like DISK_AND_MEM unless you know your whole graph
will fit in memory. Once you have to start swapping things out of memory, GC.
- If you have lots and lots of workers (a big cluster),
then DISK_AND_MEM might be good. Be cool if someone tested it.
- DISK_ONLY is a lot like Hadoop. Streaming in records at a
time from the disk (its fast).
3. I had an insane-o bug in our Combiner implementation. < MAX_AMOUNT
should have been <= MAX_AMOUNT.
- For g.V().count(), I went from shuffling 500M of data over
the network to 6.4K.
- The job sped up by 30 minutes after that fix.
- Again, the main reason it was so slow, GC. 500mb
stream of long payloads reduced to a single machine. Moron.
- Its a really bad idea to NOT use combiners for both
VertexProgram and MapReduce.
- This is like the difference between working and not
working.
- Also, this is what scares me about path-based
traversers (match(), as(), path(), etc.). They can't be bulk'd easily.
- We will get smart here though. I have some
inklings.
4. Its an absolute must that you partition your graph once loaded.
- Once the graph is loaded (graphDB, HDFS, etc.), the Spark
partitioner "organizes" the graph around the cluster and then persists it.
- For Friendster, this takes about 15 minutes (w/
ScriptInputFormat as the read from HDFS parser).
- This is important because the loaded graphRDD is
static and just gets a view propagated through it at each iteration. You don't
want to keep shuffling this monster on each iteration.
- This is also why PersistedXXXRDD is crucial. If you
are going to keep running jobs on the same data, the RDD being reused is
already partitioned for you! (tada)
- For graph system providers, if you provide an
InputRDD and you have a partitioner for it, that is a huge savings for
SparkGraphComputer. So smart to do so.
- By partitioning upfront, I was able to reduce the shuffle
load from ~22GB to ~2GB per vertex program iteration on Friendster. Insane.
- I was a fool before. I now know how to read Spark
logs :) which is probably a good thing for me to know.
- This is so important that we now just do it automatically.
- However, if the data source and the Spark cluster are
already "pair partitioned" we don't repartition! (elegant).
-
http://tinkerpop.incubator.apache.org/docs/3.1.1-SNAPSHOT/reference/#sparkgraphcomputer
(scroll down to the "InputRDD and OutputRDD"-section).
5. The graph data message/view shuffle is a lot of data. Make use of
lots of TinkerPop workers() to reduce spills to disk.
- TinkerPop 3.1.0 introduced GraphComputer.workers(). In
SparkGraphComputer, this is the number of partitions in the RDD.
- For Friendster, ScriptInputFormat gives me 229 partitions and
g.V().count() takes 48 minutes.
- If I 5x this to 1145 using "workers(1145)",
g.V().count() takes 25 minutes.
- Thats a 2x speed up but just chopping the data into
finer slices.
- However, for 2290 workers, g.V().count() only gets
marginally better -- 23 minutes.
- This is all about not spilling to disk and not getting GC all
up in it.
- Now imagine if the graph provider's InputRDD already has a
partitioner -- you are looking at ~10 minutes to g.V().count() Friendster (or
like 1 minute if we don't load edges)!
5. I think we need to make Gryo more efficient. I don't think our
serialization is optimal :/. Data seems over sized for what it is. This is all
assumptions right now.
- I also use JavaSerializer for tinkerpop.Payload data and
given that that is a significant chunk of what is shuffled --- it might be more
that than Gryo. :|
- https://issues.apache.org/jira/browse/TINKERPOP-1110
6. There is one last area of the implementation that I think could be
improved. But besides that (and minor "use less objects"-style optimizations),
I think SparkGraphComputer *is* how it should be.
- https://issues.apache.org/jira/browse/TINKERPOP-1108
- If you are a Spark expert, please do review the code and
provide feedback. Its really not that much code.
-
https://github.com/apache/incubator-tinkerpop/blob/09a5d288c4143f2853386ce908c82d9ced3c30e7/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
-
https://github.com/apache/incubator-tinkerpop/blob/09a5d288c4143f2853386ce908c82d9ced3c30e7/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
Anywho…. thats that. TinkerPop 3.1.1 will be sweeeeet.
Enjoy!,
Marko.
http://markorodriguez.com