Yep, I already found it. So I added 1 line: val graph = GraphLoader.edgeListFile(sc, "....", ...) val newgraph = graph.convertToCanonicalEdges()
and could successfully count triangles on "newgraph". Next will test it on bigger (several Gb) networks. I am using Spark 1.3 and 1.4 but haven't seen this function in https://spark.apache.org/docs/latest/graphx-programming-guide.html Thanks a lot guys! Am 26.06.2015 13:50 schrieb "Ted Yu" <yuzhih...@gmail.com>: > See SPARK-4917 which went into Spark 1.3.0 > > On Fri, Jun 26, 2015 at 2:27 AM, Robin East <robin.e...@xense.co.uk> > wrote: > >> You’ll get this issue if you just take the first 2000 lines of that file. >> The problem is triangleCount() expects srdId < dstId which is not the case >> in the file (e.g. vertex 28). You can get round this by calling >> graph.convertToCanonical Edges() which removes bi-directional edges and >> ensures srcId < dstId. Which version of Spark are you on? Can’t remember >> what version that method was introduced in. >> >> Robin >> >> On 26 Jun 2015, at 09:44, Roman Sokolov <ole...@gmail.com> wrote: >> >> Ok, but what does it means? I did not change the core files of spark, so >> is it a bug there? >> PS: on small datasets (<500 Mb) I have no problem. >> Am 25.06.2015 18:02 schrieb "Ted Yu" <yuzhih...@gmail.com>: >> >>> The assertion failure from TriangleCount.scala corresponds with the >>> following lines: >>> >>> g.outerJoinVertices(counters) { >>> (vid, _, optCounter: Option[Int]) => >>> val dblCount = optCounter.getOrElse(0) >>> // double count should be even (divisible by two) >>> assert((dblCount & 1) == 0) >>> >>> Cheers >>> >>> On Thu, Jun 25, 2015 at 6:20 AM, Roman Sokolov <ole...@gmail.com> wrote: >>> >>>> Hello! >>>> I am trying to compute number of triangles with GraphX. But get memory >>>> error or heap size, even though the dataset is very small (1Gb). I run the >>>> code in spark-shell, having 16Gb RAM machine (also tried with 2 workers on >>>> separate machines 8Gb RAM each). So I have 15x more memory than the dataset >>>> size is, but it is not enough. What should I do with terabytes sized >>>> datasets? How do people process it? Read a lot of documentation and 2 Spark >>>> books, and still have no clue :( >>>> >>>> Tried to run with the options, no effect: >>>> ./bin/spark-shell --executor-memory 6g --driver-memory 9g >>>> --total-executor-cores 100 >>>> >>>> The code is simple: >>>> >>>> val graph = GraphLoader.edgeListFile(sc, >>>> "/home/ubuntu/data/soc-LiveJournal1/lj.stdout", >>>> edgeStorageLevel = StorageLevel.MEMORY_AND_DISK_SER, >>>> vertexStorageLevel = >>>> StorageLevel.MEMORY_AND_DISK_SER).partitionBy(PartitionStrategy.RandomVertexCut) >>>> >>>> println(graph.numEdges) >>>> println(graph.numVertices) >>>> >>>> val triangleNum = graph.triangleCount().vertices.map(x => >>>> x._2).reduce(_ + _)/3 >>>> >>>> (dataset is from here: >>>> http://konect.uni-koblenz.de/downloads/tsv/soc-LiveJournal1.tar.bz2 first >>>> two lines contain % characters, so have to be removed). >>>> >>>> >>>> UPD: today tried on 32Gb machine (from spark shell again), now got >>>> another error: >>>> >>>> [Stage 8:> (0 + >>>> 4) / 32]15/06/25 13:03:05 WARN ShippableVertexPartitionOps: Joining two >>>> VertexPartitions with different indexes is slow. >>>> 15/06/25 13:03:05 ERROR Executor: Exception in task 3.0 in stage 8.0 >>>> (TID 227) >>>> java.lang.AssertionError: assertion failed >>>> at scala.Predef$.assert(Predef.scala:165) >>>> at >>>> org.apache.spark.graphx.lib.TriangleCount$$anonfun$7.apply(TriangleCount.scala:90) >>>> at >>>> org.apache.spark.graphx.lib.TriangleCount$$anonfun$7.apply(TriangleCount.scala:87) >>>> at >>>> org.apache.spark.graphx.impl.VertexPartitionBaseOps.leftJoin(VertexPartitionBaseOps.scala:140) >>>> at >>>> org.apache.spark.graphx.impl.VertexPartitionBaseOps.leftJoin(VertexPartitionBaseOps.scala:133) >>>> at >>>> org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$3.apply(VertexRDDImpl.scala:159) >>>> at >>>> org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$3.apply(VertexRDDImpl.scala:156) >>>> at >>>> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >>>> at org.apache.spark.graphx.VertexRDD.compute(VertexRDD.scala:71) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >>>> at >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) >>>> at org.apache.spark.scheduler.Task.run(Task.scala:70) >>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>> >>>> >>>> >>>> >>>> >>>> -- >>>> Best regards, Roman Sokolov >>>> >>>> >>>> >>> >> >