You’ll get this issue if you just take the first 2000 lines of that file. The 
problem is triangleCount() expects srdId < dstId which is not the case in the 
file (e.g. vertex 28). You can get round this by calling 
graph.convertToCanonical Edges() which removes bi-directional edges and ensures 
srcId < dstId. Which version of Spark are you on? Can’t remember what version 
that method was introduced in.

Robin
> On 26 Jun 2015, at 09:44, Roman Sokolov <ole...@gmail.com> wrote:
> 
> Ok, but what does it means? I did not change the core files of spark, so is 
> it a bug there?
> PS: on small datasets (<500 Mb) I have no problem.
> 
> Am 25.06.2015 18:02 schrieb "Ted Yu" <yuzhih...@gmail.com 
> <mailto:yuzhih...@gmail.com>>:
> The assertion failure from TriangleCount.scala corresponds with the following 
> lines:
> 
>     g.outerJoinVertices(counters) {
>       (vid, _, optCounter: Option[Int]) =>
>         val dblCount = optCounter.getOrElse(0)
>         // double count should be even (divisible by two)
>         assert((dblCount & 1) == 0)
> 
> Cheers
> 
> On Thu, Jun 25, 2015 at 6:20 AM, Roman Sokolov <ole...@gmail.com 
> <mailto:ole...@gmail.com>> wrote:
> Hello!
> I am trying to compute number of triangles with GraphX. But get memory error 
> or heap size, even though the dataset is very small (1Gb). I run the code in 
> spark-shell, having 16Gb RAM machine (also tried with 2 workers on separate 
> machines 8Gb RAM each). So I have 15x more memory than the dataset size is, 
> but it is not enough. What should I do with terabytes sized datasets? How do 
> people process it? Read a lot of documentation and 2 Spark books, and still 
> have no clue :(
> 
> Tried to run with the options, no effect:
> ./bin/spark-shell --executor-memory 6g --driver-memory 9g 
> --total-executor-cores 100
> 
> The code is simple:
> 
> val graph = GraphLoader.edgeListFile(sc, 
> "/home/ubuntu/data/soc-LiveJournal1/lj.stdout",  
>               edgeStorageLevel = StorageLevel.MEMORY_AND_DISK_SER, 
>               vertexStorageLevel = 
> StorageLevel.MEMORY_AND_DISK_SER).partitionBy(PartitionStrategy.RandomVertexCut)
> 
> println(graph.numEdges)
> println(graph.numVertices)
> 
> val triangleNum = graph.triangleCount().vertices.map(x => x._2).reduce(_ + 
> _)/3
> 
> (dataset is from here: 
> http://konect.uni-koblenz.de/downloads/tsv/soc-LiveJournal1.tar.bz2 
> <http://konect.uni-koblenz.de/downloads/tsv/soc-LiveJournal1.tar.bz2> first 
> two lines contain % characters, so have to be removed).
> 
> 
> UPD: today tried on 32Gb machine (from spark shell again), now got another 
> error:
> 
> [Stage 8:>                                                         (0 + 4) / 
> 32]15/06/25 13:03:05 WARN ShippableVertexPartitionOps: Joining two 
> VertexPartitions with different indexes is slow.
> 15/06/25 13:03:05 ERROR Executor: Exception in task 3.0 in stage 8.0 (TID 227)
> java.lang.AssertionError: assertion failed
>       at scala.Predef$.assert(Predef.scala:165)
>       at 
> org.apache.spark.graphx.lib.TriangleCount$$anonfun$7.apply(TriangleCount.scala:90)
>       at 
> org.apache.spark.graphx.lib.TriangleCount$$anonfun$7.apply(TriangleCount.scala:87)
>       at 
> org.apache.spark.graphx.impl.VertexPartitionBaseOps.leftJoin(VertexPartitionBaseOps.scala:140)
>       at 
> org.apache.spark.graphx.impl.VertexPartitionBaseOps.leftJoin(VertexPartitionBaseOps.scala:133)
>       at 
> org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$3.apply(VertexRDDImpl.scala:159)
>       at 
> org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$3.apply(VertexRDDImpl.scala:156)
>       at 
> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>       at org.apache.spark.graphx.VertexRDD.compute(VertexRDD.scala:71)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>       at org.apache.spark.scheduler.Task.run(Task.scala:70)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:745)
> 
> 
> 
> 
> 
> 
> -- 
> Best regards, Roman Sokolov
> 
> 
> 

Reply via email to