Repository: spark
Updated Branches:
  refs/heads/master 644e31524 -> 7c92b49d6


[SPARK-1986][GraphX]move lib.Analytics to org.apache.spark.examples

to support ~/spark/bin/run-example GraphXAnalytics triangles
/soc-LiveJournal1.txt --numEPart=256

Author: Larry Xiao <xia...@sjtu.edu.cn>

Closes #1766 from larryxiao/1986 and squashes the following commits:

bb77cd9 [Larry Xiao] [SPARK-1986][GraphX]move lib.Analytics to 
org.apache.spark.examples


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7c92b49d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7c92b49d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7c92b49d

Branch: refs/heads/master
Commit: 7c92b49d6b62f88fcde883aacb60c5e32ae54b30
Parents: 644e315
Author: Larry Xiao <xia...@sjtu.edu.cn>
Authored: Tue Sep 2 18:29:08 2014 -0700
Committer: Ankur Dave <ankurd...@gmail.com>
Committed: Tue Sep 2 18:29:08 2014 -0700

----------------------------------------------------------------------
 .../spark/examples/graphx/Analytics.scala       | 162 +++++++++++++++++++
 .../examples/graphx/LiveJournalPageRank.scala   |   2 +-
 .../org/apache/spark/graphx/lib/Analytics.scala | 161 ------------------
 3 files changed, 163 insertions(+), 162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7c92b49d/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala 
b/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala
new file mode 100644
index 0000000..c4317a6
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.graphx
+
+import scala.collection.mutable
+import org.apache.spark._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.lib._
+import org.apache.spark.graphx.PartitionStrategy._
+
+/**
+ * Driver program for running graph algorithms.
+ */
+object Analytics extends Logging {
+
+  def main(args: Array[String]): Unit = {
+    if (args.length < 2) {
+      System.err.println(
+        "Usage: Analytics <taskType> <file> --numEPart=<num_edge_partitions> 
[other options]")
+      System.exit(1)
+    }
+
+    val taskType = args(0)
+    val fname = args(1)
+    val optionsList = args.drop(2).map { arg =>
+      arg.dropWhile(_ == '-').split('=') match {
+        case Array(opt, v) => (opt -> v)
+        case _ => throw new IllegalArgumentException("Invalid argument: " + 
arg)
+      }
+    }
+    val options = mutable.Map(optionsList: _*)
+
+    def pickPartitioner(v: String): PartitionStrategy = {
+      // TODO: Use reflection rather than listing all the partitioning 
strategies here.
+      v match {
+        case "RandomVertexCut" => RandomVertexCut
+        case "EdgePartition1D" => EdgePartition1D
+        case "EdgePartition2D" => EdgePartition2D
+        case "CanonicalRandomVertexCut" => CanonicalRandomVertexCut
+        case _ => throw new IllegalArgumentException("Invalid 
PartitionStrategy: " + v)
+      }
+    }
+
+    val conf = new SparkConf()
+      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+      .set("spark.kryo.registrator", 
"org.apache.spark.graphx.GraphKryoRegistrator")
+      .set("spark.locality.wait", "100000")
+
+    val numEPart = options.remove("numEPart").map(_.toInt).getOrElse {
+      println("Set the number of edge partitions using --numEPart.")
+      sys.exit(1)
+    }
+    val partitionStrategy: Option[PartitionStrategy] = 
options.remove("partStrategy")
+      .map(pickPartitioner(_))
+    val edgeStorageLevel = options.remove("edgeStorageLevel")
+      .map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY)
+    val vertexStorageLevel = options.remove("vertexStorageLevel")
+      .map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY)
+
+    taskType match {
+      case "pagerank" =>
+        val tol = options.remove("tol").map(_.toFloat).getOrElse(0.001F)
+        val outFname = options.remove("output").getOrElse("")
+        val numIterOpt = options.remove("numIter").map(_.toInt)
+
+        options.foreach {
+          case (opt, _) => throw new IllegalArgumentException("Invalid option: 
" + opt)
+        }
+
+        println("======================================")
+        println("|             PageRank               |")
+        println("======================================")
+
+        val sc = new SparkContext(conf.setAppName("PageRank(" + fname + ")"))
+
+        val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
+          minEdgePartitions = numEPart,
+          edgeStorageLevel = edgeStorageLevel,
+          vertexStorageLevel = vertexStorageLevel).cache()
+        val graph = 
partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
+
+        println("GRAPHX: Number of vertices " + graph.vertices.count)
+        println("GRAPHX: Number of edges " + graph.edges.count)
+
+        val pr = (numIterOpt match {
+          case Some(numIter) => PageRank.run(graph, numIter)
+          case None => PageRank.runUntilConvergence(graph, tol)
+        }).vertices.cache()
+
+        println("GRAPHX: Total rank: " + pr.map(_._2).reduce(_ + _))
+
+        if (!outFname.isEmpty) {
+          logWarning("Saving pageranks of pages to " + outFname)
+          pr.map{case (id, r) => id + "\t" + r}.saveAsTextFile(outFname)
+        }
+
+        sc.stop()
+
+      case "cc" =>
+        options.foreach {
+          case (opt, _) => throw new IllegalArgumentException("Invalid option: 
" + opt)
+        }
+
+        println("======================================")
+        println("|      Connected Components          |")
+        println("======================================")
+
+        val sc = new SparkContext(conf.setAppName("ConnectedComponents(" + 
fname + ")"))
+        val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
+          minEdgePartitions = numEPart,
+          edgeStorageLevel = edgeStorageLevel,
+          vertexStorageLevel = vertexStorageLevel).cache()
+        val graph = 
partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
+
+        val cc = ConnectedComponents.run(graph)
+        println("Components: " + cc.vertices.map{ case (vid,data) => 
data}.distinct())
+        sc.stop()
+
+      case "triangles" =>
+        options.foreach {
+          case (opt, _) => throw new IllegalArgumentException("Invalid option: 
" + opt)
+        }
+
+        println("======================================")
+        println("|      Triangle Count                |")
+        println("======================================")
+
+        val sc = new SparkContext(conf.setAppName("TriangleCount(" + fname + 
")"))
+        val graph = GraphLoader.edgeListFile(sc, fname,
+          canonicalOrientation = true,
+          minEdgePartitions = numEPart,
+          edgeStorageLevel = edgeStorageLevel,
+          vertexStorageLevel = vertexStorageLevel)
+        // TriangleCount requires the graph to be partitioned
+          .partitionBy(partitionStrategy.getOrElse(RandomVertexCut)).cache()
+        val triangles = TriangleCount.run(graph)
+        println("Triangles: " + triangles.vertices.map {
+          case (vid, data) => data.toLong
+        }.reduce(_ + _) / 3)
+        sc.stop()
+
+      case _ =>
+        println("Invalid task type.")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7c92b49d/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala
 
b/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala
index 6ef3b62..bdc8fa7 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala
@@ -20,7 +20,7 @@ package org.apache.spark.examples.graphx
 import org.apache.spark.SparkContext._
 import org.apache.spark._
 import org.apache.spark.graphx._
-import org.apache.spark.graphx.lib.Analytics
+import org.apache.spark.examples.graphx.Analytics
 
 /**
  * Uses GraphX to run PageRank on a LiveJournal social network graph. Download 
the dataset from

http://git-wip-us.apache.org/repos/asf/spark/blob/7c92b49d/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
deleted file mode 100644
index c1513a0..0000000
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.graphx.lib
-
-import scala.collection.mutable
-import org.apache.spark._
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.graphx._
-import org.apache.spark.graphx.PartitionStrategy._
-
-/**
- * Driver program for running graph algorithms.
- */
-object Analytics extends Logging {
-
-  def main(args: Array[String]): Unit = {
-    if (args.length < 2) {
-      System.err.println(
-        "Usage: Analytics <taskType> <file> --numEPart=<num_edge_partitions> 
[other options]")
-      System.exit(1)
-    }
-
-    val taskType = args(0)
-    val fname = args(1)
-    val optionsList = args.drop(2).map { arg =>
-      arg.dropWhile(_ == '-').split('=') match {
-        case Array(opt, v) => (opt -> v)
-        case _ => throw new IllegalArgumentException("Invalid argument: " + 
arg)
-      }
-    }
-    val options = mutable.Map(optionsList: _*)
-
-    def pickPartitioner(v: String): PartitionStrategy = {
-      // TODO: Use reflection rather than listing all the partitioning 
strategies here.
-      v match {
-        case "RandomVertexCut" => RandomVertexCut
-        case "EdgePartition1D" => EdgePartition1D
-        case "EdgePartition2D" => EdgePartition2D
-        case "CanonicalRandomVertexCut" => CanonicalRandomVertexCut
-        case _ => throw new IllegalArgumentException("Invalid 
PartitionStrategy: " + v)
-      }
-    }
-
-    val conf = new SparkConf()
-      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
-      .set("spark.kryo.registrator", 
"org.apache.spark.graphx.GraphKryoRegistrator")
-      .set("spark.locality.wait", "100000")
-
-    val numEPart = options.remove("numEPart").map(_.toInt).getOrElse {
-      println("Set the number of edge partitions using --numEPart.")
-      sys.exit(1)
-    }
-    val partitionStrategy: Option[PartitionStrategy] = 
options.remove("partStrategy")
-      .map(pickPartitioner(_))
-    val edgeStorageLevel = options.remove("edgeStorageLevel")
-      .map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY)
-    val vertexStorageLevel = options.remove("vertexStorageLevel")
-      .map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY)
-
-    taskType match {
-      case "pagerank" =>
-        val tol = options.remove("tol").map(_.toFloat).getOrElse(0.001F)
-        val outFname = options.remove("output").getOrElse("")
-        val numIterOpt = options.remove("numIter").map(_.toInt)
-
-        options.foreach {
-          case (opt, _) => throw new IllegalArgumentException("Invalid option: 
" + opt)
-        }
-
-        println("======================================")
-        println("|             PageRank               |")
-        println("======================================")
-
-        val sc = new SparkContext(conf.setAppName("PageRank(" + fname + ")"))
-
-        val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
-          minEdgePartitions = numEPart,
-          edgeStorageLevel = edgeStorageLevel,
-          vertexStorageLevel = vertexStorageLevel).cache()
-        val graph = 
partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
-
-        println("GRAPHX: Number of vertices " + graph.vertices.count)
-        println("GRAPHX: Number of edges " + graph.edges.count)
-
-        val pr = (numIterOpt match {
-          case Some(numIter) => PageRank.run(graph, numIter)
-          case None => PageRank.runUntilConvergence(graph, tol)
-        }).vertices.cache()
-
-        println("GRAPHX: Total rank: " + pr.map(_._2).reduce(_ + _))
-
-        if (!outFname.isEmpty) {
-          logWarning("Saving pageranks of pages to " + outFname)
-          pr.map{case (id, r) => id + "\t" + r}.saveAsTextFile(outFname)
-        }
-
-        sc.stop()
-
-      case "cc" =>
-        options.foreach {
-          case (opt, _) => throw new IllegalArgumentException("Invalid option: 
" + opt)
-        }
-
-        println("======================================")
-        println("|      Connected Components          |")
-        println("======================================")
-
-        val sc = new SparkContext(conf.setAppName("ConnectedComponents(" + 
fname + ")"))
-        val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
-          minEdgePartitions = numEPart,
-          edgeStorageLevel = edgeStorageLevel,
-          vertexStorageLevel = vertexStorageLevel).cache()
-        val graph = 
partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
-
-        val cc = ConnectedComponents.run(graph)
-        println("Components: " + cc.vertices.map{ case (vid,data) => 
data}.distinct())
-        sc.stop()
-
-      case "triangles" =>
-        options.foreach {
-          case (opt, _) => throw new IllegalArgumentException("Invalid option: 
" + opt)
-        }
-
-        println("======================================")
-        println("|      Triangle Count                |")
-        println("======================================")
-
-        val sc = new SparkContext(conf.setAppName("TriangleCount(" + fname + 
")"))
-        val graph = GraphLoader.edgeListFile(sc, fname,
-          canonicalOrientation = true,
-          minEdgePartitions = numEPart,
-          edgeStorageLevel = edgeStorageLevel,
-          vertexStorageLevel = vertexStorageLevel)
-        // TriangleCount requires the graph to be partitioned
-          .partitionBy(partitionStrategy.getOrElse(RandomVertexCut)).cache()
-        val triangles = TriangleCount.run(graph)
-        println("Triangles: " + triangles.vertices.map {
-          case (vid, data) => data.toLong
-        }.reduce(_ + _) / 3)
-        sc.stop()
-
-      case _ =>
-        println("Invalid task type.")
-    }
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to