You’ll get this issue if you just take the first 2000 lines of that file. The problem is triangleCount() expects srdId < dstId which is not the case in the file (e.g. vertex 28). You can get round this by calling graph.convertToCanonical Edges() which removes bi-directional edges and ensures srcId < dstId. Which version of Spark are you on? Can’t remember what version that method was introduced in.
Robin > On 26 Jun 2015, at 09:44, Roman Sokolov <ole...@gmail.com> wrote: > > Ok, but what does it means? I did not change the core files of spark, so is > it a bug there? > PS: on small datasets (<500 Mb) I have no problem. > > Am 25.06.2015 18:02 schrieb "Ted Yu" <yuzhih...@gmail.com > <mailto:yuzhih...@gmail.com>>: > The assertion failure from TriangleCount.scala corresponds with the following > lines: > > g.outerJoinVertices(counters) { > (vid, _, optCounter: Option[Int]) => > val dblCount = optCounter.getOrElse(0) > // double count should be even (divisible by two) > assert((dblCount & 1) == 0) > > Cheers > > On Thu, Jun 25, 2015 at 6:20 AM, Roman Sokolov <ole...@gmail.com > <mailto:ole...@gmail.com>> wrote: > Hello! > I am trying to compute number of triangles with GraphX. But get memory error > or heap size, even though the dataset is very small (1Gb). I run the code in > spark-shell, having 16Gb RAM machine (also tried with 2 workers on separate > machines 8Gb RAM each). So I have 15x more memory than the dataset size is, > but it is not enough. What should I do with terabytes sized datasets? How do > people process it? Read a lot of documentation and 2 Spark books, and still > have no clue :( > > Tried to run with the options, no effect: > ./bin/spark-shell --executor-memory 6g --driver-memory 9g > --total-executor-cores 100 > > The code is simple: > > val graph = GraphLoader.edgeListFile(sc, > "/home/ubuntu/data/soc-LiveJournal1/lj.stdout", > edgeStorageLevel = StorageLevel.MEMORY_AND_DISK_SER, > vertexStorageLevel = > StorageLevel.MEMORY_AND_DISK_SER).partitionBy(PartitionStrategy.RandomVertexCut) > > println(graph.numEdges) > println(graph.numVertices) > > val triangleNum = graph.triangleCount().vertices.map(x => x._2).reduce(_ + > _)/3 > > (dataset is from here: > http://konect.uni-koblenz.de/downloads/tsv/soc-LiveJournal1.tar.bz2 > <http://konect.uni-koblenz.de/downloads/tsv/soc-LiveJournal1.tar.bz2> first > two lines contain % characters, so have to be removed). > > > UPD: today tried on 32Gb machine (from spark shell again), now got another > error: > > [Stage 8:> (0 + 4) / > 32]15/06/25 13:03:05 WARN ShippableVertexPartitionOps: Joining two > VertexPartitions with different indexes is slow. > 15/06/25 13:03:05 ERROR Executor: Exception in task 3.0 in stage 8.0 (TID 227) > java.lang.AssertionError: assertion failed > at scala.Predef$.assert(Predef.scala:165) > at > org.apache.spark.graphx.lib.TriangleCount$$anonfun$7.apply(TriangleCount.scala:90) > at > org.apache.spark.graphx.lib.TriangleCount$$anonfun$7.apply(TriangleCount.scala:87) > at > org.apache.spark.graphx.impl.VertexPartitionBaseOps.leftJoin(VertexPartitionBaseOps.scala:140) > at > org.apache.spark.graphx.impl.VertexPartitionBaseOps.leftJoin(VertexPartitionBaseOps.scala:133) > at > org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$3.apply(VertexRDDImpl.scala:159) > at > org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$3.apply(VertexRDDImpl.scala:156) > at > org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > at org.apache.spark.graphx.VertexRDD.compute(VertexRDD.scala:71) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) > at org.apache.spark.scheduler.Task.run(Task.scala:70) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > > > > > > > -- > Best regards, Roman Sokolov > > >