Repository: spark Updated Branches: refs/heads/master 644e31524 -> 7c92b49d6
[SPARK-1986][GraphX]move lib.Analytics to org.apache.spark.examples to support ~/spark/bin/run-example GraphXAnalytics triangles /soc-LiveJournal1.txt --numEPart=256 Author: Larry Xiao <xia...@sjtu.edu.cn> Closes #1766 from larryxiao/1986 and squashes the following commits: bb77cd9 [Larry Xiao] [SPARK-1986][GraphX]move lib.Analytics to org.apache.spark.examples Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7c92b49d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7c92b49d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7c92b49d Branch: refs/heads/master Commit: 7c92b49d6b62f88fcde883aacb60c5e32ae54b30 Parents: 644e315 Author: Larry Xiao <xia...@sjtu.edu.cn> Authored: Tue Sep 2 18:29:08 2014 -0700 Committer: Ankur Dave <ankurd...@gmail.com> Committed: Tue Sep 2 18:29:08 2014 -0700 ---------------------------------------------------------------------- .../spark/examples/graphx/Analytics.scala | 162 +++++++++++++++++++ .../examples/graphx/LiveJournalPageRank.scala | 2 +- .../org/apache/spark/graphx/lib/Analytics.scala | 161 ------------------ 3 files changed, 163 insertions(+), 162 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/7c92b49d/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala new file mode 100644 index 0000000..c4317a6 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.graphx + +import scala.collection.mutable +import org.apache.spark._ +import org.apache.spark.storage.StorageLevel +import org.apache.spark.graphx._ +import org.apache.spark.graphx.lib._ +import org.apache.spark.graphx.PartitionStrategy._ + +/** + * Driver program for running graph algorithms. + */ +object Analytics extends Logging { + + def main(args: Array[String]): Unit = { + if (args.length < 2) { + System.err.println( + "Usage: Analytics <taskType> <file> --numEPart=<num_edge_partitions> [other options]") + System.exit(1) + } + + val taskType = args(0) + val fname = args(1) + val optionsList = args.drop(2).map { arg => + arg.dropWhile(_ == '-').split('=') match { + case Array(opt, v) => (opt -> v) + case _ => throw new IllegalArgumentException("Invalid argument: " + arg) + } + } + val options = mutable.Map(optionsList: _*) + + def pickPartitioner(v: String): PartitionStrategy = { + // TODO: Use reflection rather than listing all the partitioning strategies here. + v match { + case "RandomVertexCut" => RandomVertexCut + case "EdgePartition1D" => EdgePartition1D + case "EdgePartition2D" => EdgePartition2D + case "CanonicalRandomVertexCut" => CanonicalRandomVertexCut + case _ => throw new IllegalArgumentException("Invalid PartitionStrategy: " + v) + } + } + + val conf = new SparkConf() + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator") + .set("spark.locality.wait", "100000") + + val numEPart = options.remove("numEPart").map(_.toInt).getOrElse { + println("Set the number of edge partitions using --numEPart.") + sys.exit(1) + } + val partitionStrategy: Option[PartitionStrategy] = options.remove("partStrategy") + .map(pickPartitioner(_)) + val edgeStorageLevel = options.remove("edgeStorageLevel") + .map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY) + val vertexStorageLevel = options.remove("vertexStorageLevel") + .map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY) + + taskType match { + case "pagerank" => + val tol = options.remove("tol").map(_.toFloat).getOrElse(0.001F) + val outFname = options.remove("output").getOrElse("") + val numIterOpt = options.remove("numIter").map(_.toInt) + + options.foreach { + case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) + } + + println("======================================") + println("| PageRank |") + println("======================================") + + val sc = new SparkContext(conf.setAppName("PageRank(" + fname + ")")) + + val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname, + minEdgePartitions = numEPart, + edgeStorageLevel = edgeStorageLevel, + vertexStorageLevel = vertexStorageLevel).cache() + val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)) + + println("GRAPHX: Number of vertices " + graph.vertices.count) + println("GRAPHX: Number of edges " + graph.edges.count) + + val pr = (numIterOpt match { + case Some(numIter) => PageRank.run(graph, numIter) + case None => PageRank.runUntilConvergence(graph, tol) + }).vertices.cache() + + println("GRAPHX: Total rank: " + pr.map(_._2).reduce(_ + _)) + + if (!outFname.isEmpty) { + logWarning("Saving pageranks of pages to " + outFname) + pr.map{case (id, r) => id + "\t" + r}.saveAsTextFile(outFname) + } + + sc.stop() + + case "cc" => + options.foreach { + case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) + } + + println("======================================") + println("| Connected Components |") + println("======================================") + + val sc = new SparkContext(conf.setAppName("ConnectedComponents(" + fname + ")")) + val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname, + minEdgePartitions = numEPart, + edgeStorageLevel = edgeStorageLevel, + vertexStorageLevel = vertexStorageLevel).cache() + val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)) + + val cc = ConnectedComponents.run(graph) + println("Components: " + cc.vertices.map{ case (vid,data) => data}.distinct()) + sc.stop() + + case "triangles" => + options.foreach { + case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) + } + + println("======================================") + println("| Triangle Count |") + println("======================================") + + val sc = new SparkContext(conf.setAppName("TriangleCount(" + fname + ")")) + val graph = GraphLoader.edgeListFile(sc, fname, + canonicalOrientation = true, + minEdgePartitions = numEPart, + edgeStorageLevel = edgeStorageLevel, + vertexStorageLevel = vertexStorageLevel) + // TriangleCount requires the graph to be partitioned + .partitionBy(partitionStrategy.getOrElse(RandomVertexCut)).cache() + val triangles = TriangleCount.run(graph) + println("Triangles: " + triangles.vertices.map { + case (vid, data) => data.toLong + }.reduce(_ + _) / 3) + sc.stop() + + case _ => + println("Invalid task type.") + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/7c92b49d/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala index 6ef3b62..bdc8fa7 100644 --- a/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala +++ b/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala @@ -20,7 +20,7 @@ package org.apache.spark.examples.graphx import org.apache.spark.SparkContext._ import org.apache.spark._ import org.apache.spark.graphx._ -import org.apache.spark.graphx.lib.Analytics +import org.apache.spark.examples.graphx.Analytics /** * Uses GraphX to run PageRank on a LiveJournal social network graph. Download the dataset from http://git-wip-us.apache.org/repos/asf/spark/blob/7c92b49d/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala ---------------------------------------------------------------------- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala deleted file mode 100644 index c1513a0..0000000 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.graphx.lib - -import scala.collection.mutable -import org.apache.spark._ -import org.apache.spark.storage.StorageLevel -import org.apache.spark.graphx._ -import org.apache.spark.graphx.PartitionStrategy._ - -/** - * Driver program for running graph algorithms. - */ -object Analytics extends Logging { - - def main(args: Array[String]): Unit = { - if (args.length < 2) { - System.err.println( - "Usage: Analytics <taskType> <file> --numEPart=<num_edge_partitions> [other options]") - System.exit(1) - } - - val taskType = args(0) - val fname = args(1) - val optionsList = args.drop(2).map { arg => - arg.dropWhile(_ == '-').split('=') match { - case Array(opt, v) => (opt -> v) - case _ => throw new IllegalArgumentException("Invalid argument: " + arg) - } - } - val options = mutable.Map(optionsList: _*) - - def pickPartitioner(v: String): PartitionStrategy = { - // TODO: Use reflection rather than listing all the partitioning strategies here. - v match { - case "RandomVertexCut" => RandomVertexCut - case "EdgePartition1D" => EdgePartition1D - case "EdgePartition2D" => EdgePartition2D - case "CanonicalRandomVertexCut" => CanonicalRandomVertexCut - case _ => throw new IllegalArgumentException("Invalid PartitionStrategy: " + v) - } - } - - val conf = new SparkConf() - .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator") - .set("spark.locality.wait", "100000") - - val numEPart = options.remove("numEPart").map(_.toInt).getOrElse { - println("Set the number of edge partitions using --numEPart.") - sys.exit(1) - } - val partitionStrategy: Option[PartitionStrategy] = options.remove("partStrategy") - .map(pickPartitioner(_)) - val edgeStorageLevel = options.remove("edgeStorageLevel") - .map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY) - val vertexStorageLevel = options.remove("vertexStorageLevel") - .map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY) - - taskType match { - case "pagerank" => - val tol = options.remove("tol").map(_.toFloat).getOrElse(0.001F) - val outFname = options.remove("output").getOrElse("") - val numIterOpt = options.remove("numIter").map(_.toInt) - - options.foreach { - case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) - } - - println("======================================") - println("| PageRank |") - println("======================================") - - val sc = new SparkContext(conf.setAppName("PageRank(" + fname + ")")) - - val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname, - minEdgePartitions = numEPart, - edgeStorageLevel = edgeStorageLevel, - vertexStorageLevel = vertexStorageLevel).cache() - val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)) - - println("GRAPHX: Number of vertices " + graph.vertices.count) - println("GRAPHX: Number of edges " + graph.edges.count) - - val pr = (numIterOpt match { - case Some(numIter) => PageRank.run(graph, numIter) - case None => PageRank.runUntilConvergence(graph, tol) - }).vertices.cache() - - println("GRAPHX: Total rank: " + pr.map(_._2).reduce(_ + _)) - - if (!outFname.isEmpty) { - logWarning("Saving pageranks of pages to " + outFname) - pr.map{case (id, r) => id + "\t" + r}.saveAsTextFile(outFname) - } - - sc.stop() - - case "cc" => - options.foreach { - case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) - } - - println("======================================") - println("| Connected Components |") - println("======================================") - - val sc = new SparkContext(conf.setAppName("ConnectedComponents(" + fname + ")")) - val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname, - minEdgePartitions = numEPart, - edgeStorageLevel = edgeStorageLevel, - vertexStorageLevel = vertexStorageLevel).cache() - val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)) - - val cc = ConnectedComponents.run(graph) - println("Components: " + cc.vertices.map{ case (vid,data) => data}.distinct()) - sc.stop() - - case "triangles" => - options.foreach { - case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) - } - - println("======================================") - println("| Triangle Count |") - println("======================================") - - val sc = new SparkContext(conf.setAppName("TriangleCount(" + fname + ")")) - val graph = GraphLoader.edgeListFile(sc, fname, - canonicalOrientation = true, - minEdgePartitions = numEPart, - edgeStorageLevel = edgeStorageLevel, - vertexStorageLevel = vertexStorageLevel) - // TriangleCount requires the graph to be partitioned - .partitionBy(partitionStrategy.getOrElse(RandomVertexCut)).cache() - val triangles = TriangleCount.run(graph) - println("Triangles: " + triangles.vertices.map { - case (vid, data) => data.toLong - }.reduce(_ + _) / 3) - sc.stop() - - case _ => - println("Invalid task type.") - } - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org