Repository: spark Updated Branches: refs/heads/master 3ddf051ee -> 589b12f8e
[SPARK-7654] [MLLIB] Migrate MLlib to the DataFrame reader/writer API parquetFile -> read.parquet rxin Author: Xiangrui Meng <m...@databricks.com> Closes #6281 from mengxr/SPARK-7654 and squashes the following commits: a79b612 [Xiangrui Meng] parquetFile -> read.parquet Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/589b12f8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/589b12f8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/589b12f8 Branch: refs/heads/master Commit: 589b12f8e62ec5d10713ce057756ebc791e7ddc6 Parents: 3ddf051 Author: Xiangrui Meng <m...@databricks.com> Authored: Wed May 20 07:46:17 2015 -0700 Committer: Xiangrui Meng <m...@databricks.com> Committed: Wed May 20 07:46:17 2015 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/mllib/classification/NaiveBayes.scala | 4 ++-- .../spark/mllib/classification/impl/GLMClassificationModel.scala | 2 +- .../org/apache/spark/mllib/clustering/GaussianMixtureModel.scala | 2 +- .../scala/org/apache/spark/mllib/clustering/KMeansModel.scala | 2 +- .../src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala | 2 +- .../spark/mllib/recommendation/MatrixFactorizationModel.scala | 4 ++-- .../org/apache/spark/mllib/regression/IsotonicRegression.scala | 2 +- .../apache/spark/mllib/regression/impl/GLMRegressionModel.scala | 2 +- .../org/apache/spark/mllib/tree/model/DecisionTreeModel.scala | 2 +- .../org/apache/spark/mllib/tree/model/treeEnsembleModels.scala | 2 +- 10 files changed, 12 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/589b12f8/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala index 53fb2cb..cffe9ef 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -153,7 +153,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] { def load(sc: SparkContext, path: String): NaiveBayesModel = { val sqlContext = new SQLContext(sc) // Load Parquet data. - val dataRDD = sqlContext.parquetFile(dataPath(path)) + val dataRDD = sqlContext.read.parquet(dataPath(path)) // Check schema explicitly since erasure makes it hard to use match-case for checking. checkSchema[Data](dataRDD.schema) val dataArray = dataRDD.select("labels", "pi", "theta", "modelType").take(1) @@ -199,7 +199,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] { def load(sc: SparkContext, path: String): NaiveBayesModel = { val sqlContext = new SQLContext(sc) // Load Parquet data. - val dataRDD = sqlContext.parquetFile(dataPath(path)) + val dataRDD = sqlContext.read.parquet(dataPath(path)) // Check schema explicitly since erasure makes it hard to use match-case for checking. checkSchema[Data](dataRDD.schema) val dataArray = dataRDD.select("labels", "pi", "theta").take(1) http://git-wip-us.apache.org/repos/asf/spark/blob/589b12f8/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala index d842ec5..fe09f6b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala @@ -75,7 +75,7 @@ private[classification] object GLMClassificationModel { def loadData(sc: SparkContext, path: String, modelClass: String): Data = { val datapath = Loader.dataPath(path) val sqlContext = new SQLContext(sc) - val dataRDD = sqlContext.parquetFile(datapath) + val dataRDD = sqlContext.read.parquet(datapath) val dataArray = dataRDD.select("weights", "intercept", "threshold").take(1) assert(dataArray.size == 1, s"Unable to load $modelClass data from: $datapath") val data = dataArray(0) http://git-wip-us.apache.org/repos/asf/spark/blob/589b12f8/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala index 731b43a..86353ae 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala @@ -132,7 +132,7 @@ object GaussianMixtureModel extends Loader[GaussianMixtureModel] { def load(sc: SparkContext, path: String): GaussianMixtureModel = { val dataPath = Loader.dataPath(path) val sqlContext = new SQLContext(sc) - val dataFrame = sqlContext.parquetFile(dataPath) + val dataFrame = sqlContext.read.parquet(dataPath) val dataArray = dataFrame.select("weight", "mu", "sigma").collect() // Check schema explicitly since erasure makes it hard to use match-case for checking. http://git-wip-us.apache.org/repos/asf/spark/blob/589b12f8/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala index 252e166..8ecb3df 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala @@ -120,7 +120,7 @@ object KMeansModel extends Loader[KMeansModel] { assert(className == thisClassName) assert(formatVersion == thisFormatVersion) val k = (metadata \ "k").extract[Int] - val centriods = sqlContext.parquetFile(Loader.dataPath(path)) + val centriods = sqlContext.read.parquet(Loader.dataPath(path)) Loader.checkSchema[Cluster](centriods.schema) val localCentriods = centriods.map(Cluster.apply).collect() assert(k == localCentriods.size) http://git-wip-us.apache.org/repos/asf/spark/blob/589b12f8/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala index f65f782..9106b73 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala @@ -559,7 +559,7 @@ object Word2VecModel extends Loader[Word2VecModel] { def load(sc: SparkContext, path: String): Word2VecModel = { val dataPath = Loader.dataPath(path) val sqlContext = new SQLContext(sc) - val dataFrame = sqlContext.parquetFile(dataPath) + val dataFrame = sqlContext.read.parquet(dataPath) val dataArray = dataFrame.select("word", "vector").collect() http://git-wip-us.apache.org/repos/asf/spark/blob/589b12f8/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index b960fbc..93aa41e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -292,11 +292,11 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { assert(className == thisClassName) assert(formatVersion == thisFormatVersion) val rank = (metadata \ "rank").extract[Int] - val userFeatures = sqlContext.parquetFile(userPath(path)) + val userFeatures = sqlContext.read.parquet(userPath(path)) .map { case Row(id: Int, features: Seq[_]) => (id, features.asInstanceOf[Seq[Double]].toArray) } - val productFeatures = sqlContext.parquetFile(productPath(path)) + val productFeatures = sqlContext.read.parquet(productPath(path)) .map { case Row(id: Int, features: Seq[_]) => (id, features.asInstanceOf[Seq[Double]].toArray) } http://git-wip-us.apache.org/repos/asf/spark/blob/589b12f8/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index 22b9b22..3ea63dd 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -189,7 +189,7 @@ object IsotonicRegressionModel extends Loader[IsotonicRegressionModel] { def load(sc: SparkContext, path: String): (Array[Double], Array[Double]) = { val sqlContext = new SQLContext(sc) - val dataRDD = sqlContext.parquetFile(dataPath(path)) + val dataRDD = sqlContext.read.parquet(dataPath(path)) checkSchema[Data](dataRDD.schema) val dataArray = dataRDD.select("boundary", "prediction").collect() http://git-wip-us.apache.org/repos/asf/spark/blob/589b12f8/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala index 2aa0e9e..317d3a5 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala @@ -72,7 +72,7 @@ private[regression] object GLMRegressionModel { def loadData(sc: SparkContext, path: String, modelClass: String, numFeatures: Int): Data = { val datapath = Loader.dataPath(path) val sqlContext = new SQLContext(sc) - val dataRDD = sqlContext.parquetFile(datapath) + val dataRDD = sqlContext.read.parquet(datapath) val dataArray = dataRDD.select("weights", "intercept").take(1) assert(dataArray.size == 1, s"Unable to load $modelClass data from: $datapath") val data = dataArray(0) http://git-wip-us.apache.org/repos/asf/spark/blob/589b12f8/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala index a558f84..25bb145 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala @@ -230,7 +230,7 @@ object DecisionTreeModel extends Loader[DecisionTreeModel] with Logging { val datapath = Loader.dataPath(path) val sqlContext = new SQLContext(sc) // Load Parquet data. - val dataRDD = sqlContext.parquetFile(datapath) + val dataRDD = sqlContext.read.parquet(datapath) // Check schema explicitly since erasure makes it hard to use match-case for checking. Loader.checkSchema[NodeData](dataRDD.schema) val nodes = dataRDD.map(NodeData.apply) http://git-wip-us.apache.org/repos/asf/spark/blob/589b12f8/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala index f9cd014..1e3333d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala @@ -437,7 +437,7 @@ private[tree] object TreeEnsembleModel extends Logging { treeAlgo: String): Array[DecisionTreeModel] = { val datapath = Loader.dataPath(path) val sqlContext = new SQLContext(sc) - val nodes = sqlContext.parquetFile(datapath).map(NodeData.apply) + val nodes = sqlContext.read.parquet(datapath).map(NodeData.apply) val trees = constructTrees(nodes) trees.map(new DecisionTreeModel(_, Algo.fromString(treeAlgo))) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org