Repository: spark Updated Branches: refs/heads/branch-1.4 8bde352bd -> bd057f8b5
[SPARK-7654][MLlib] Migrate MLlib to the DataFrame reader/writer API. Author: Reynold Xin <r...@databricks.com> Closes #6211 from rxin/mllib-reader and squashes the following commits: 79a2cb9 [Reynold Xin] [SPARK-7654][MLlib] Migrate MLlib to the DataFrame reader/writer API. (cherry picked from commit 161d0b4a41f453b21adde46a86e16c2743752799) Signed-off-by: Reynold Xin <r...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bd057f8b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bd057f8b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bd057f8b Branch: refs/heads/branch-1.4 Commit: bd057f8b557948c6a6b24cbec0fa2f7c4950d10a Parents: 8bde352 Author: Reynold Xin <r...@databricks.com> Authored: Sat May 16 15:03:57 2015 -0700 Committer: Reynold Xin <r...@databricks.com> Committed: Sat May 16 15:04:26 2015 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/examples/mllib/DatasetExample.scala | 2 +- .../main/scala/org/apache/spark/examples/sql/RDDRelation.scala | 2 +- .../scala/org/apache/spark/mllib/classification/NaiveBayes.scala | 4 ++-- .../spark/mllib/classification/impl/GLMClassificationModel.scala | 2 +- .../org/apache/spark/mllib/clustering/GaussianMixtureModel.scala | 2 +- .../scala/org/apache/spark/mllib/clustering/KMeansModel.scala | 2 +- .../apache/spark/mllib/clustering/PowerIterationClustering.scala | 4 ++-- .../src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala | 2 +- .../spark/mllib/recommendation/MatrixFactorizationModel.scala | 4 ++-- .../org/apache/spark/mllib/regression/IsotonicRegression.scala | 2 +- .../apache/spark/mllib/regression/impl/GLMRegressionModel.scala | 2 +- .../org/apache/spark/mllib/tree/model/DecisionTreeModel.scala | 2 +- .../org/apache/spark/mllib/tree/model/treeEnsembleModels.scala | 2 +- 13 files changed, 16 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/bd057f8b/examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala index c95cca7..520893b 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala @@ -103,7 +103,7 @@ object DatasetExample { tmpDir.deleteOnExit() val outputDir = new File(tmpDir, "dataset").toString println(s"Saving to $outputDir as Parquet file.") - df.saveAsParquetFile(outputDir) + df.write.parquet(outputDir) println(s"Loading Parquet file with UDT from $outputDir.") val newDataset = sqlContext.read.parquet(outputDir) http://git-wip-us.apache.org/repos/asf/spark/blob/bd057f8b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala index acc8919..b11e320 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala @@ -58,7 +58,7 @@ object RDDRelation { df.where($"key" === 1).orderBy($"value".asc).select($"key").collect().foreach(println) // Write out an RDD as a parquet file. - df.saveAsParquetFile("pair.parquet") + df.write.parquet("pair.parquet") // Read in parquet file. Parquet files are self-describing so the schmema is preserved. val parquetFile = sqlContext.read.parquet("pair.parquet") http://git-wip-us.apache.org/repos/asf/spark/blob/bd057f8b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala index af24ab6..ac0ebec 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -140,7 +140,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] { // Create Parquet data. val dataRDD: DataFrame = sc.parallelize(Seq(data), 1).toDF() - dataRDD.saveAsParquetFile(dataPath(path)) + dataRDD.write.parquet(dataPath(path)) } def load(sc: SparkContext, path: String): NaiveBayesModel = { @@ -186,7 +186,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] { // Create Parquet data. val dataRDD: DataFrame = sc.parallelize(Seq(data), 1).toDF() - dataRDD.saveAsParquetFile(dataPath(path)) + dataRDD.write.parquet(dataPath(path)) } def load(sc: SparkContext, path: String): NaiveBayesModel = { http://git-wip-us.apache.org/repos/asf/spark/blob/bd057f8b/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala index 3b6790c..d842ec5 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala @@ -62,7 +62,7 @@ private[classification] object GLMClassificationModel { // Create Parquet data. val data = Data(weights, intercept, threshold) - sc.parallelize(Seq(data), 1).toDF().saveAsParquetFile(Loader.dataPath(path)) + sc.parallelize(Seq(data), 1).toDF().write.parquet(Loader.dataPath(path)) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/bd057f8b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala index c22862c..731b43a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala @@ -126,7 +126,7 @@ object GaussianMixtureModel extends Loader[GaussianMixtureModel] { val dataArray = Array.tabulate(weights.length) { i => Data(weights(i), gaussians(i).mu, gaussians(i).sigma) } - sc.parallelize(dataArray, 1).toDF().saveAsParquetFile(Loader.dataPath(path)) + sc.parallelize(dataArray, 1).toDF().write.parquet(Loader.dataPath(path)) } def load(sc: SparkContext, path: String): GaussianMixtureModel = { http://git-wip-us.apache.org/repos/asf/spark/blob/bd057f8b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala index ba228b1..252e166 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala @@ -110,7 +110,7 @@ object KMeansModel extends Loader[KMeansModel] { val dataRDD = sc.parallelize(model.clusterCenters.zipWithIndex).map { case (point, id) => Cluster(id, point) }.toDF() - dataRDD.saveAsParquetFile(Loader.dataPath(path)) + dataRDD.write.parquet(Loader.dataPath(path)) } def load(sc: SparkContext, path: String): KMeansModel = { http://git-wip-us.apache.org/repos/asf/spark/blob/bd057f8b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala index aa53e88..1ed01c9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala @@ -74,7 +74,7 @@ object PowerIterationClusteringModel extends Loader[PowerIterationClusteringMode sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) val dataRDD = model.assignments.toDF() - dataRDD.saveAsParquetFile(Loader.dataPath(path)) + dataRDD.write.parquet(Loader.dataPath(path)) } def load(sc: SparkContext, path: String): PowerIterationClusteringModel = { @@ -86,7 +86,7 @@ object PowerIterationClusteringModel extends Loader[PowerIterationClusteringMode assert(formatVersion == thisFormatVersion) val k = (metadata \ "k").extract[Int] - val assignments = sqlContext.parquetFile(Loader.dataPath(path)) + val assignments = sqlContext.read.parquet(Loader.dataPath(path)) Loader.checkSchema[PowerIterationClustering.Assignment](assignments.schema) val assignmentsRDD = assignments.map { http://git-wip-us.apache.org/repos/asf/spark/blob/bd057f8b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala index 98e8311..731f757 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala @@ -580,7 +580,7 @@ object Word2VecModel extends Loader[Word2VecModel] { sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) val dataArray = model.toSeq.map { case (w, v) => Data(w, v) } - sc.parallelize(dataArray.toSeq, 1).toDF().saveAsParquetFile(Loader.dataPath(path)) + sc.parallelize(dataArray.toSeq, 1).toDF().write.parquet(Loader.dataPath(path)) } } http://git-wip-us.apache.org/repos/asf/spark/blob/bd057f8b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index 88c2148..b960fbc 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -281,8 +281,8 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("rank" -> model.rank))) sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path)) - model.userFeatures.toDF("id", "features").saveAsParquetFile(userPath(path)) - model.productFeatures.toDF("id", "features").saveAsParquetFile(productPath(path)) + model.userFeatures.toDF("id", "features").write.parquet(userPath(path)) + model.productFeatures.toDF("id", "features").write.parquet(productPath(path)) } def load(sc: SparkContext, path: String): MatrixFactorizationModel = { http://git-wip-us.apache.org/repos/asf/spark/blob/bd057f8b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index 4ce541a..22b9b22 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -184,7 +184,7 @@ object IsotonicRegressionModel extends Loader[IsotonicRegressionModel] { sqlContext.createDataFrame( boundaries.toSeq.zip(predictions).map { case (b, p) => Data(b, p) } - ).saveAsParquetFile(dataPath(path)) + ).write.parquet(dataPath(path)) } def load(sc: SparkContext, path: String): (Array[Double], Array[Double]) = { http://git-wip-us.apache.org/repos/asf/spark/blob/bd057f8b/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala index b55944f..2aa0e9e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala @@ -60,7 +60,7 @@ private[regression] object GLMRegressionModel { val data = Data(weights, intercept) val dataRDD: DataFrame = sc.parallelize(Seq(data), 1).toDF() // TODO: repartition with 1 partition after SPARK-5532 gets fixed - dataRDD.saveAsParquetFile(Loader.dataPath(path)) + dataRDD.write.parquet(Loader.dataPath(path)) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/bd057f8b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala index 331af42..a558f84 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala @@ -223,7 +223,7 @@ object DecisionTreeModel extends Loader[DecisionTreeModel] with Logging { val dataRDD: DataFrame = sc.parallelize(nodes) .map(NodeData.apply(0, _)) .toDF() - dataRDD.saveAsParquetFile(Loader.dataPath(path)) + dataRDD.write.parquet(Loader.dataPath(path)) } def load(sc: SparkContext, path: String, algo: String, numNodes: Int): DecisionTreeModel = { http://git-wip-us.apache.org/repos/asf/spark/blob/bd057f8b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala index 8341219..f9cd014 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala @@ -414,7 +414,7 @@ private[tree] object TreeEnsembleModel extends Logging { val dataRDD = sc.parallelize(model.trees.zipWithIndex).flatMap { case (tree, treeId) => tree.topNode.subtreeIterator.toSeq.map(node => NodeData(treeId, node)) }.toDF() - dataRDD.saveAsParquetFile(Loader.dataPath(path)) + dataRDD.write.parquet(Loader.dataPath(path)) } /** --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org