Yep, I already found it. So I added 1 line:

val graph = GraphLoader.edgeListFile(sc, "....", ...)
val newgraph = graph.convertToCanonicalEdges()

and could successfully count triangles on "newgraph". Next will test it on
bigger (several Gb) networks.

I am using Spark 1.3 and 1.4 but haven't seen this function in
https://spark.apache.org/docs/latest/graphx-programming-guide.html

Thanks a lot guys!
Am 26.06.2015 13:50 schrieb "Ted Yu" <yuzhih...@gmail.com>:

> See SPARK-4917 which went into Spark 1.3.0
>
> On Fri, Jun 26, 2015 at 2:27 AM, Robin East <robin.e...@xense.co.uk>
> wrote:
>
>> You’ll get this issue if you just take the first 2000 lines of that file.
>> The problem is triangleCount() expects srdId < dstId which is not the case
>> in the file (e.g. vertex 28). You can get round this by calling
>> graph.convertToCanonical Edges() which removes bi-directional edges and
>> ensures srcId < dstId. Which version of Spark are you on? Can’t remember
>> what version that method was introduced in.
>>
>> Robin
>>
>> On 26 Jun 2015, at 09:44, Roman Sokolov <ole...@gmail.com> wrote:
>>
>> Ok, but what does it means? I did not change the core files of spark, so
>> is it a bug there?
>> PS: on small datasets (<500 Mb) I have no problem.
>> Am 25.06.2015 18:02 schrieb "Ted Yu" <yuzhih...@gmail.com>:
>>
>>> The assertion failure from TriangleCount.scala corresponds with the
>>> following lines:
>>>
>>>     g.outerJoinVertices(counters) {
>>>       (vid, _, optCounter: Option[Int]) =>
>>>         val dblCount = optCounter.getOrElse(0)
>>>         // double count should be even (divisible by two)
>>>         assert((dblCount & 1) == 0)
>>>
>>> Cheers
>>>
>>> On Thu, Jun 25, 2015 at 6:20 AM, Roman Sokolov <ole...@gmail.com> wrote:
>>>
>>>> Hello!
>>>> I am trying to compute number of triangles with GraphX. But get memory
>>>> error or heap size, even though the dataset is very small (1Gb). I run the
>>>> code in spark-shell, having 16Gb RAM machine (also tried with 2 workers on
>>>> separate machines 8Gb RAM each). So I have 15x more memory than the dataset
>>>> size is, but it is not enough. What should I do with terabytes sized
>>>> datasets? How do people process it? Read a lot of documentation and 2 Spark
>>>> books, and still have no clue :(
>>>>
>>>> Tried to run with the options, no effect:
>>>> ./bin/spark-shell --executor-memory 6g --driver-memory 9g
>>>> --total-executor-cores 100
>>>>
>>>> The code is simple:
>>>>
>>>> val graph = GraphLoader.edgeListFile(sc,
>>>> "/home/ubuntu/data/soc-LiveJournal1/lj.stdout",
>>>> edgeStorageLevel = StorageLevel.MEMORY_AND_DISK_SER,
>>>> vertexStorageLevel =
>>>> StorageLevel.MEMORY_AND_DISK_SER).partitionBy(PartitionStrategy.RandomVertexCut)
>>>>
>>>> println(graph.numEdges)
>>>> println(graph.numVertices)
>>>>
>>>> val triangleNum = graph.triangleCount().vertices.map(x =>
>>>> x._2).reduce(_ + _)/3
>>>>
>>>> (dataset is from here:
>>>> http://konect.uni-koblenz.de/downloads/tsv/soc-LiveJournal1.tar.bz2 first
>>>> two lines contain % characters, so have to be removed).
>>>>
>>>>
>>>> UPD: today tried on 32Gb machine (from spark shell again), now got
>>>> another error:
>>>>
>>>> [Stage 8:>                                                         (0 +
>>>> 4) / 32]15/06/25 13:03:05 WARN ShippableVertexPartitionOps: Joining two
>>>> VertexPartitions with different indexes is slow.
>>>> 15/06/25 13:03:05 ERROR Executor: Exception in task 3.0 in stage 8.0
>>>> (TID 227)
>>>> java.lang.AssertionError: assertion failed
>>>> at scala.Predef$.assert(Predef.scala:165)
>>>> at
>>>> org.apache.spark.graphx.lib.TriangleCount$$anonfun$7.apply(TriangleCount.scala:90)
>>>> at
>>>> org.apache.spark.graphx.lib.TriangleCount$$anonfun$7.apply(TriangleCount.scala:87)
>>>> at
>>>> org.apache.spark.graphx.impl.VertexPartitionBaseOps.leftJoin(VertexPartitionBaseOps.scala:140)
>>>> at
>>>> org.apache.spark.graphx.impl.VertexPartitionBaseOps.leftJoin(VertexPartitionBaseOps.scala:133)
>>>> at
>>>> org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$3.apply(VertexRDDImpl.scala:159)
>>>> at
>>>> org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$3.apply(VertexRDDImpl.scala:156)
>>>> at
>>>> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>> at org.apache.spark.graphx.VertexRDD.compute(VertexRDD.scala:71)
>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>> at
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>>>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Best regards, Roman Sokolov
>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to