Repository: spark
Updated Branches:
  refs/heads/branch-1.4 8bde352bd -> bd057f8b5


[SPARK-7654][MLlib] Migrate MLlib to the DataFrame reader/writer API.

Author: Reynold Xin <r...@databricks.com>

Closes #6211 from rxin/mllib-reader and squashes the following commits:

79a2cb9 [Reynold Xin] [SPARK-7654][MLlib] Migrate MLlib to the DataFrame 
reader/writer API.

(cherry picked from commit 161d0b4a41f453b21adde46a86e16c2743752799)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bd057f8b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bd057f8b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bd057f8b

Branch: refs/heads/branch-1.4
Commit: bd057f8b557948c6a6b24cbec0fa2f7c4950d10a
Parents: 8bde352
Author: Reynold Xin <r...@databricks.com>
Authored: Sat May 16 15:03:57 2015 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Sat May 16 15:04:26 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/examples/mllib/DatasetExample.scala   | 2 +-
 .../main/scala/org/apache/spark/examples/sql/RDDRelation.scala   | 2 +-
 .../scala/org/apache/spark/mllib/classification/NaiveBayes.scala | 4 ++--
 .../spark/mllib/classification/impl/GLMClassificationModel.scala | 2 +-
 .../org/apache/spark/mllib/clustering/GaussianMixtureModel.scala | 2 +-
 .../scala/org/apache/spark/mllib/clustering/KMeansModel.scala    | 2 +-
 .../apache/spark/mllib/clustering/PowerIterationClustering.scala | 4 ++--
 .../src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala | 2 +-
 .../spark/mllib/recommendation/MatrixFactorizationModel.scala    | 4 ++--
 .../org/apache/spark/mllib/regression/IsotonicRegression.scala   | 2 +-
 .../apache/spark/mllib/regression/impl/GLMRegressionModel.scala  | 2 +-
 .../org/apache/spark/mllib/tree/model/DecisionTreeModel.scala    | 2 +-
 .../org/apache/spark/mllib/tree/model/treeEnsembleModels.scala   | 2 +-
 13 files changed, 16 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bd057f8b/examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala 
b/examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala
index c95cca7..520893b 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala
@@ -103,7 +103,7 @@ object DatasetExample {
     tmpDir.deleteOnExit()
     val outputDir = new File(tmpDir, "dataset").toString
     println(s"Saving to $outputDir as Parquet file.")
-    df.saveAsParquetFile(outputDir)
+    df.write.parquet(outputDir)
 
     println(s"Loading Parquet file with UDT from $outputDir.")
     val newDataset = sqlContext.read.parquet(outputDir)

http://git-wip-us.apache.org/repos/asf/spark/blob/bd057f8b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala 
b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala
index acc8919..b11e320 100644
--- a/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala
@@ -58,7 +58,7 @@ object RDDRelation {
     df.where($"key" === 
1).orderBy($"value".asc).select($"key").collect().foreach(println)
 
     // Write out an RDD as a parquet file.
-    df.saveAsParquetFile("pair.parquet")
+    df.write.parquet("pair.parquet")
 
     // Read in parquet file.  Parquet files are self-describing so the schmema 
is preserved.
     val parquetFile = sqlContext.read.parquet("pair.parquet")

http://git-wip-us.apache.org/repos/asf/spark/blob/bd057f8b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
index af24ab6..ac0ebec 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
@@ -140,7 +140,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
 
       // Create Parquet data.
       val dataRDD: DataFrame = sc.parallelize(Seq(data), 1).toDF()
-      dataRDD.saveAsParquetFile(dataPath(path))
+      dataRDD.write.parquet(dataPath(path))
     }
 
     def load(sc: SparkContext, path: String): NaiveBayesModel = {
@@ -186,7 +186,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
 
       // Create Parquet data.
       val dataRDD: DataFrame = sc.parallelize(Seq(data), 1).toDF()
-      dataRDD.saveAsParquetFile(dataPath(path))
+      dataRDD.write.parquet(dataPath(path))
     }
 
     def load(sc: SparkContext, path: String): NaiveBayesModel = {

http://git-wip-us.apache.org/repos/asf/spark/blob/bd057f8b/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala
index 3b6790c..d842ec5 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala
@@ -62,7 +62,7 @@ private[classification] object GLMClassificationModel {
 
       // Create Parquet data.
       val data = Data(weights, intercept, threshold)
-      sc.parallelize(Seq(data), 
1).toDF().saveAsParquetFile(Loader.dataPath(path))
+      sc.parallelize(Seq(data), 1).toDF().write.parquet(Loader.dataPath(path))
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/spark/blob/bd057f8b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
index c22862c..731b43a 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
@@ -126,7 +126,7 @@ object GaussianMixtureModel extends 
Loader[GaussianMixtureModel] {
       val dataArray = Array.tabulate(weights.length) { i =>
         Data(weights(i), gaussians(i).mu, gaussians(i).sigma)
       }
-      sc.parallelize(dataArray, 
1).toDF().saveAsParquetFile(Loader.dataPath(path))
+      sc.parallelize(dataArray, 1).toDF().write.parquet(Loader.dataPath(path))
     }
 
     def load(sc: SparkContext, path: String): GaussianMixtureModel = {

http://git-wip-us.apache.org/repos/asf/spark/blob/bd057f8b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
index ba228b1..252e166 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
@@ -110,7 +110,7 @@ object KMeansModel extends Loader[KMeansModel] {
       val dataRDD = sc.parallelize(model.clusterCenters.zipWithIndex).map { 
case (point, id) =>
         Cluster(id, point)
       }.toDF()
-      dataRDD.saveAsParquetFile(Loader.dataPath(path))
+      dataRDD.write.parquet(Loader.dataPath(path))
     }
 
     def load(sc: SparkContext, path: String): KMeansModel = {

http://git-wip-us.apache.org/repos/asf/spark/blob/bd057f8b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
index aa53e88..1ed01c9 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
@@ -74,7 +74,7 @@ object PowerIterationClusteringModel extends 
Loader[PowerIterationClusteringMode
       sc.parallelize(Seq(metadata), 
1).saveAsTextFile(Loader.metadataPath(path))
 
       val dataRDD = model.assignments.toDF()
-      dataRDD.saveAsParquetFile(Loader.dataPath(path))
+      dataRDD.write.parquet(Loader.dataPath(path))
     }
 
     def load(sc: SparkContext, path: String): PowerIterationClusteringModel = {
@@ -86,7 +86,7 @@ object PowerIterationClusteringModel extends 
Loader[PowerIterationClusteringMode
       assert(formatVersion == thisFormatVersion)
 
       val k = (metadata \ "k").extract[Int]
-      val assignments = sqlContext.parquetFile(Loader.dataPath(path))
+      val assignments = sqlContext.read.parquet(Loader.dataPath(path))
       
Loader.checkSchema[PowerIterationClustering.Assignment](assignments.schema)
 
       val assignmentsRDD = assignments.map {

http://git-wip-us.apache.org/repos/asf/spark/blob/bd057f8b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
index 98e8311..731f757 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
@@ -580,7 +580,7 @@ object Word2VecModel extends Loader[Word2VecModel] {
       sc.parallelize(Seq(metadata), 
1).saveAsTextFile(Loader.metadataPath(path))
 
       val dataArray = model.toSeq.map { case (w, v) => Data(w, v) }
-      sc.parallelize(dataArray.toSeq, 
1).toDF().saveAsParquetFile(Loader.dataPath(path))
+      sc.parallelize(dataArray.toSeq, 
1).toDF().write.parquet(Loader.dataPath(path))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bd057f8b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
index 88c2148..b960fbc 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
@@ -281,8 +281,8 @@ object MatrixFactorizationModel extends 
Loader[MatrixFactorizationModel] {
       val metadata = compact(render(
         ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ 
("rank" -> model.rank)))
       sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path))
-      model.userFeatures.toDF("id", 
"features").saveAsParquetFile(userPath(path))
-      model.productFeatures.toDF("id", 
"features").saveAsParquetFile(productPath(path))
+      model.userFeatures.toDF("id", "features").write.parquet(userPath(path))
+      model.productFeatures.toDF("id", 
"features").write.parquet(productPath(path))
     }
 
     def load(sc: SparkContext, path: String): MatrixFactorizationModel = {

http://git-wip-us.apache.org/repos/asf/spark/blob/bd057f8b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
index 4ce541a..22b9b22 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
@@ -184,7 +184,7 @@ object IsotonicRegressionModel extends 
Loader[IsotonicRegressionModel] {
 
       sqlContext.createDataFrame(
         boundaries.toSeq.zip(predictions).map { case (b, p) => Data(b, p) }
-      ).saveAsParquetFile(dataPath(path))
+      ).write.parquet(dataPath(path))
     }
 
     def load(sc: SparkContext, path: String): (Array[Double], Array[Double]) = 
{

http://git-wip-us.apache.org/repos/asf/spark/blob/bd057f8b/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala
index b55944f..2aa0e9e 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala
@@ -60,7 +60,7 @@ private[regression] object GLMRegressionModel {
       val data = Data(weights, intercept)
       val dataRDD: DataFrame = sc.parallelize(Seq(data), 1).toDF()
       // TODO: repartition with 1 partition after SPARK-5532 gets fixed
-      dataRDD.saveAsParquetFile(Loader.dataPath(path))
+      dataRDD.write.parquet(Loader.dataPath(path))
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/spark/blob/bd057f8b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
index 331af42..a558f84 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
@@ -223,7 +223,7 @@ object DecisionTreeModel extends Loader[DecisionTreeModel] 
with Logging {
       val dataRDD: DataFrame = sc.parallelize(nodes)
         .map(NodeData.apply(0, _))
         .toDF()
-      dataRDD.saveAsParquetFile(Loader.dataPath(path))
+      dataRDD.write.parquet(Loader.dataPath(path))
     }
 
     def load(sc: SparkContext, path: String, algo: String, numNodes: Int): 
DecisionTreeModel = {

http://git-wip-us.apache.org/repos/asf/spark/blob/bd057f8b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala
index 8341219..f9cd014 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala
@@ -414,7 +414,7 @@ private[tree] object TreeEnsembleModel extends Logging {
       val dataRDD = sc.parallelize(model.trees.zipWithIndex).flatMap { case 
(tree, treeId) =>
         tree.topNode.subtreeIterator.toSeq.map(node => NodeData(treeId, node))
       }.toDF()
-      dataRDD.saveAsParquetFile(Loader.dataPath(path))
+      dataRDD.write.parquet(Loader.dataPath(path))
     }
 
     /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to