Repository: spark Updated Branches: refs/heads/master 5c3912e5c -> 99dfcedbf
[SPARK-13457][SQL] Removes DataFrame RDD operations ## What changes were proposed in this pull request? This is another try of PR #11323. This PR removes DataFrame RDD operations except for `foreach` and `foreachPartitions` (they are actions rather than transformations). Original calls are now replaced by calls to methods of `DataFrame.rdd`. PR #11323 was reverted because it introduced a regression: both `DataFrame.foreach` and `DataFrame.foreachPartitions` wrap underlying RDD operations with `withNewExecutionId` to track Spark jobs. But they are removed in #11323. ## How was the this patch tested? No extra tests are added. Existing tests should do the work. Author: Cheng Lian <l...@databricks.com> Closes #11388 from liancheng/remove-df-rdd-ops. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/99dfcedb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/99dfcedb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/99dfcedb Branch: refs/heads/master Commit: 99dfcedbfd4c83c7b6a343456f03e8c6e29968c5 Parents: 5c3912e Author: Cheng Lian <l...@databricks.com> Authored: Sat Feb 27 00:28:30 2016 +0800 Committer: Cheng Lian <l...@databricks.com> Committed: Sat Feb 27 00:28:30 2016 +0800 ---------------------------------------------------------------------- .../spark/examples/ml/DataFrameExample.scala | 2 +- .../spark/examples/ml/DecisionTreeExample.scala | 8 +++---- .../spark/examples/ml/OneVsRestExample.scala | 2 +- .../spark/examples/mllib/LDAExample.scala | 1 + .../apache/spark/examples/sql/RDDRelation.scala | 2 +- .../spark/examples/sql/hive/HiveFromSpark.scala | 2 +- .../scala/org/apache/spark/ml/Predictor.scala | 6 +++-- .../ml/classification/LogisticRegression.scala | 13 +++++++---- .../spark/ml/clustering/BisectingKMeans.scala | 4 ++-- .../org/apache/spark/ml/clustering/KMeans.scala | 6 ++--- .../org/apache/spark/ml/clustering/LDA.scala | 1 + .../BinaryClassificationEvaluator.scala | 9 ++++---- .../MulticlassClassificationEvaluator.scala | 6 ++--- .../ml/evaluation/RegressionEvaluator.scala | 3 ++- .../apache/spark/ml/feature/ChiSqSelector.scala | 2 +- .../spark/ml/feature/CountVectorizer.scala | 2 +- .../scala/org/apache/spark/ml/feature/IDF.scala | 2 +- .../apache/spark/ml/feature/MaxAbsScaler.scala | 2 +- .../apache/spark/ml/feature/MinMaxScaler.scala | 2 +- .../apache/spark/ml/feature/OneHotEncoder.scala | 2 +- .../scala/org/apache/spark/ml/feature/PCA.scala | 2 +- .../spark/ml/feature/StandardScaler.scala | 2 +- .../apache/spark/ml/feature/StringIndexer.scala | 1 + .../apache/spark/ml/feature/VectorIndexer.scala | 2 +- .../org/apache/spark/ml/feature/Word2Vec.scala | 2 +- .../apache/spark/ml/recommendation/ALS.scala | 1 + .../ml/regression/AFTSurvivalRegression.scala | 2 +- .../ml/regression/IsotonicRegression.scala | 6 ++--- .../spark/ml/regression/LinearRegression.scala | 16 ++++++++----- .../spark/mllib/api/python/PythonMLLibAPI.scala | 8 +++---- .../spark/mllib/clustering/KMeansModel.scala | 2 +- .../spark/mllib/clustering/LDAModel.scala | 4 ++-- .../clustering/PowerIterationClustering.scala | 2 +- .../BinaryClassificationMetrics.scala | 2 +- .../mllib/evaluation/MulticlassMetrics.scala | 2 +- .../mllib/evaluation/MultilabelMetrics.scala | 4 +++- .../mllib/evaluation/RegressionMetrics.scala | 2 +- .../spark/mllib/feature/ChiSqSelector.scala | 2 +- .../org/apache/spark/mllib/fpm/FPGrowth.scala | 2 +- .../MatrixFactorizationModel.scala | 12 +++++----- .../mllib/tree/model/DecisionTreeModel.scala | 2 +- .../mllib/tree/model/treeEnsembleModels.scala | 2 +- .../LogisticRegressionSuite.scala | 2 +- .../MultilayerPerceptronClassifierSuite.scala | 5 ++-- .../ml/classification/OneVsRestSuite.scala | 6 ++--- .../ml/clustering/BisectingKMeansSuite.scala | 3 ++- .../spark/ml/clustering/KMeansSuite.scala | 3 ++- .../apache/spark/ml/clustering/LDASuite.scala | 2 +- .../spark/ml/feature/OneHotEncoderSuite.scala | 4 ++-- .../spark/ml/feature/StringIndexerSuite.scala | 6 ++--- .../spark/ml/feature/VectorIndexerSuite.scala | 5 ++-- .../apache/spark/ml/feature/Word2VecSuite.scala | 8 +++---- .../spark/ml/recommendation/ALSSuite.scala | 7 +++--- .../spark/ml/regression/GBTRegressorSuite.scala | 2 +- .../ml/regression/IsotonicRegressionSuite.scala | 6 ++--- .../ml/regression/LinearRegressionSuite.scala | 17 +++++++------- .../scala/org/apache/spark/sql/DataFrame.scala | 24 -------------------- .../org/apache/spark/sql/GroupedData.scala | 1 + .../org/apache/spark/sql/api/r/SQLUtils.scala | 2 +- .../spark/sql/DataFrameAggregateSuite.scala | 2 +- .../parquet/ParquetFilterSuite.scala | 2 +- .../datasources/parquet/ParquetIOSuite.scala | 2 +- .../spark/sql/hive/HiveSparkSubmitSuite.scala | 2 +- .../sql/hive/execution/HiveQuerySuite.scala | 2 ++ .../spark/sql/hive/orc/OrcQuerySuite.scala | 5 ++-- .../apache/spark/sql/hive/parquetSuites.scala | 2 +- 66 files changed, 138 insertions(+), 139 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/examples/src/main/scala/org/apache/spark/examples/ml/DataFrameExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/DataFrameExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/DataFrameExample.scala index 0a477ab..7e608a2 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/DataFrameExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/DataFrameExample.scala @@ -79,7 +79,7 @@ object DataFrameExample { labelSummary.show() // Convert features column to an RDD of vectors. - val features = df.select("features").map { case Row(v: Vector) => v } + val features = df.select("features").rdd.map { case Row(v: Vector) => v } val featureSummary = features.aggregate(new MultivariateOnlineSummarizer())( (summary, feat) => summary.add(feat), (sum1, sum2) => sum1.merge(sum2)) http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala index a37d12a..d2560cc 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala @@ -310,8 +310,8 @@ object DecisionTreeExample { data: DataFrame, labelColName: String): Unit = { val fullPredictions = model.transform(data).cache() - val predictions = fullPredictions.select("prediction").map(_.getDouble(0)) - val labels = fullPredictions.select(labelColName).map(_.getDouble(0)) + val predictions = fullPredictions.select("prediction").rdd.map(_.getDouble(0)) + val labels = fullPredictions.select(labelColName).rdd.map(_.getDouble(0)) // Print number of classes for reference val numClasses = MetadataUtils.getNumClasses(fullPredictions.schema(labelColName)) match { case Some(n) => n @@ -335,8 +335,8 @@ object DecisionTreeExample { data: DataFrame, labelColName: String): Unit = { val fullPredictions = model.transform(data).cache() - val predictions = fullPredictions.select("prediction").map(_.getDouble(0)) - val labels = fullPredictions.select(labelColName).map(_.getDouble(0)) + val predictions = fullPredictions.select("prediction").rdd.map(_.getDouble(0)) + val labels = fullPredictions.select(labelColName).rdd.map(_.getDouble(0)) val RMSE = new RegressionMetrics(predictions.zip(labels)).rootMeanSquaredError println(s" Root mean squared error (RMSE): $RMSE") } http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/examples/src/main/scala/org/apache/spark/examples/ml/OneVsRestExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/OneVsRestExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/OneVsRestExample.scala index ccee3b2..a0bb5da 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/OneVsRestExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/OneVsRestExample.scala @@ -155,7 +155,7 @@ object OneVsRestExample { // evaluate the model val predictionsAndLabels = predictions.select("prediction", "label") - .map(row => (row.getDouble(0), row.getDouble(1))) + .rdd.map(row => (row.getDouble(0), row.getDouble(1))) val metrics = new MulticlassMetrics(predictionsAndLabels) http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala index d283235..038b2fe 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala @@ -221,6 +221,7 @@ object LDAExample { val model = pipeline.fit(df) val documents = model.transform(df) .select("features") + .rdd .map { case Row(features: Vector) => features } .zipWithIndex() .map(_.swap) http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala index a2f0fcd..620ff07 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala @@ -52,7 +52,7 @@ object RDDRelation { val rddFromSql = sqlContext.sql("SELECT key, value FROM records WHERE key < 10") println("Result of RDD.map:") - rddFromSql.map(row => s"Key: ${row(0)}, Value: ${row(1)}").collect().foreach(println) + rddFromSql.rdd.map(row => s"Key: ${row(0)}, Value: ${row(1)}").collect().foreach(println) // Queries can also be written using a LINQ-like Scala DSL. df.where($"key" === 1).orderBy($"value".asc).select($"key").collect().foreach(println) http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala index 4e427f5..b654a2c 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala @@ -63,7 +63,7 @@ object HiveFromSpark { val rddFromSql = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key") println("Result of RDD.map:") - val rddAsStrings = rddFromSql.map { + val rddAsStrings = rddFromSql.rdd.map { case Row(key: Int, value: String) => s"Key: $key, Value: $value" } http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala b/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala index d1388b5..4b27ee6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala @@ -122,8 +122,10 @@ abstract class Predictor[ * and put it in an RDD with strong types. */ protected def extractLabeledPoints(dataset: DataFrame): RDD[LabeledPoint] = { - dataset.select($(labelCol), $(featuresCol)) - .map { case Row(label: Double, features: Vector) => LabeledPoint(label, features) } + dataset.select($(labelCol), $(featuresCol)).rdd.map { + case Row(label: Double, features: Vector) => + LabeledPoint(label, features) + } } } http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index ac01245..0d329d2 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -263,10 +263,11 @@ class LogisticRegression @Since("1.2.0") ( protected[spark] def train(dataset: DataFrame, handlePersistence: Boolean): LogisticRegressionModel = { val w = if ($(weightCol).isEmpty) lit(1.0) else col($(weightCol)) - val instances: RDD[Instance] = dataset.select(col($(labelCol)), w, col($(featuresCol))).map { - case Row(label: Double, weight: Double, features: Vector) => - Instance(label, weight, features) - } + val instances: RDD[Instance] = + dataset.select(col($(labelCol)), w, col($(featuresCol))).rdd.map { + case Row(label: Double, weight: Double, features: Vector) => + Instance(label, weight, features) + } if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) @@ -790,6 +791,7 @@ sealed trait LogisticRegressionSummary extends Serializable { /** * :: Experimental :: * Logistic regression training results. + * * @param predictions dataframe outputted by the model's `transform` method. * @param probabilityCol field in "predictions" which gives the calibrated probability of * each instance as a vector. @@ -813,6 +815,7 @@ class BinaryLogisticRegressionTrainingSummary private[classification] ( /** * :: Experimental :: * Binary Logistic regression results for a given model. + * * @param predictions dataframe outputted by the model's `transform` method. * @param probabilityCol field in "predictions" which gives the calibrated probability of * each instance. @@ -837,7 +840,7 @@ class BinaryLogisticRegressionSummary private[classification] ( // TODO: Allow the user to vary the number of bins using a setBins method in // BinaryClassificationMetrics. For now the default is set to 100. @transient private val binaryMetrics = new BinaryClassificationMetrics( - predictions.select(probabilityCol, labelCol).map { + predictions.select(probabilityCol, labelCol).rdd.map { case Row(score: Vector, label: Double) => (score(1), label) }, 100 ) http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala index 45d293b..f014a1d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala @@ -112,7 +112,7 @@ class BisectingKMeansModel private[ml] ( @Since("2.0.0") def computeCost(dataset: DataFrame): Double = { SchemaUtils.checkColumnType(dataset.schema, $(featuresCol), new VectorUDT) - val data = dataset.select(col($(featuresCol))).map { case Row(point: Vector) => point } + val data = dataset.select(col($(featuresCol))).rdd.map { case Row(point: Vector) => point } parentModel.computeCost(data) } } @@ -176,7 +176,7 @@ class BisectingKMeans @Since("2.0.0") ( @Since("2.0.0") override def fit(dataset: DataFrame): BisectingKMeansModel = { - val rdd = dataset.select(col($(featuresCol))).map { case Row(point: Vector) => point } + val rdd = dataset.select(col($(featuresCol))).rdd.map { case Row(point: Vector) => point } val bkm = new MLlibBisectingKMeans() .setK($(k)) http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala index c6a3eac..79332b0 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala @@ -130,7 +130,7 @@ class KMeansModel private[ml] ( @Since("1.6.0") def computeCost(dataset: DataFrame): Double = { SchemaUtils.checkColumnType(dataset.schema, $(featuresCol), new VectorUDT) - val data = dataset.select(col($(featuresCol))).map { case Row(point: Vector) => point } + val data = dataset.select(col($(featuresCol))).rdd.map { case Row(point: Vector) => point } parentModel.computeCost(data) } @@ -260,7 +260,7 @@ class KMeans @Since("1.5.0") ( @Since("1.5.0") override def fit(dataset: DataFrame): KMeansModel = { - val rdd = dataset.select(col($(featuresCol))).map { case Row(point: Vector) => point } + val rdd = dataset.select(col($(featuresCol))).rdd.map { case Row(point: Vector) => point } val algo = new MLlibKMeans() .setK($(k)) @@ -303,7 +303,7 @@ class KMeansSummary private[clustering] ( * Size of each cluster. */ @Since("2.0.0") - lazy val size: Array[Int] = cluster.map { + lazy val size: Array[Int] = cluster.rdd.map { case Row(clusterIdx: Int) => (clusterIdx, 1) }.reduceByKey(_ + _).collect().sortBy(_._1).map(_._2) } http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala index 99383e7..6304b20 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala @@ -803,6 +803,7 @@ private[clustering] object LDA extends DefaultParamsReadable[LDA] { dataset .withColumn("docId", monotonicallyIncreasingId()) .select("docId", featuresCol) + .rdd .map { case Row(docId: Long, features: Vector) => (docId, features) } http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala index a1d36c4..00f3125 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala @@ -84,11 +84,10 @@ class BinaryClassificationEvaluator @Since("1.4.0") (@Since("1.4.0") override va SchemaUtils.checkColumnType(schema, $(labelCol), DoubleType) // TODO: When dataset metadata has been implemented, check rawPredictionCol vector length = 2. - val scoreAndLabels = dataset.select($(rawPredictionCol), $(labelCol)) - .map { - case Row(rawPrediction: Vector, label: Double) => (rawPrediction(1), label) - case Row(rawPrediction: Double, label: Double) => (rawPrediction, label) - } + val scoreAndLabels = dataset.select($(rawPredictionCol), $(labelCol)).rdd.map { + case Row(rawPrediction: Vector, label: Double) => (rawPrediction(1), label) + case Row(rawPrediction: Double, label: Double) => (rawPrediction, label) + } val metrics = new BinaryClassificationMetrics(scoreAndLabels) val metric = $(metricName) match { case "areaUnderROC" => metrics.areaUnderROC() http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/main/scala/org/apache/spark/ml/evaluation/MulticlassClassificationEvaluator.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/MulticlassClassificationEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/MulticlassClassificationEvaluator.scala index a921153..55ff443 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/MulticlassClassificationEvaluator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/MulticlassClassificationEvaluator.scala @@ -74,9 +74,9 @@ class MulticlassClassificationEvaluator @Since("1.5.0") (@Since("1.5.0") overrid SchemaUtils.checkColumnType(schema, $(predictionCol), DoubleType) SchemaUtils.checkColumnType(schema, $(labelCol), DoubleType) - val predictionAndLabels = dataset.select($(predictionCol), $(labelCol)) - .map { case Row(prediction: Double, label: Double) => - (prediction, label) + val predictionAndLabels = dataset.select($(predictionCol), $(labelCol)).rdd.map { + case Row(prediction: Double, label: Double) => + (prediction, label) } val metrics = new MulticlassMetrics(predictionAndLabels) val metric = $(metricName) match { http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/main/scala/org/apache/spark/ml/evaluation/RegressionEvaluator.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/RegressionEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/RegressionEvaluator.scala index b6b25ec..adee61e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/RegressionEvaluator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/RegressionEvaluator.scala @@ -85,7 +85,8 @@ final class RegressionEvaluator @Since("1.4.0") (@Since("1.4.0") override val ui val predictionAndLabels = dataset .select(col($(predictionCol)).cast(DoubleType), col($(labelCol)).cast(DoubleType)) - .map { case Row(prediction: Double, label: Double) => + .rdd. + map { case Row(prediction: Double, label: Double) => (prediction, label) } val metrics = new RegressionMetrics(predictionAndLabels) http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/main/scala/org/apache/spark/ml/feature/ChiSqSelector.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/ChiSqSelector.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/ChiSqSelector.scala index 7b565ef..4abc459 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/ChiSqSelector.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/ChiSqSelector.scala @@ -79,7 +79,7 @@ final class ChiSqSelector(override val uid: String) override def fit(dataset: DataFrame): ChiSqSelectorModel = { transformSchema(dataset.schema, logging = true) - val input = dataset.select($(labelCol), $(featuresCol)).map { + val input = dataset.select($(labelCol), $(featuresCol)).rdd.map { case Row(label: Double, features: Vector) => LabeledPoint(label, features) } http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala index a6dfe58..cf15145 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala @@ -133,7 +133,7 @@ class CountVectorizer(override val uid: String) override def fit(dataset: DataFrame): CountVectorizerModel = { transformSchema(dataset.schema, logging = true) val vocSize = $(vocabSize) - val input = dataset.select($(inputCol)).map(_.getAs[Seq[String]](0)) + val input = dataset.select($(inputCol)).rdd.map(_.getAs[Seq[String]](0)) val minDf = if ($(minDF) >= 1.0) { $(minDF) } else { http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala index 9e7eee4..cebbe5c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala @@ -79,7 +79,7 @@ final class IDF(override val uid: String) extends Estimator[IDFModel] with IDFBa override def fit(dataset: DataFrame): IDFModel = { transformSchema(dataset.schema, logging = true) - val input = dataset.select($(inputCol)).map { case Row(v: Vector) => v } + val input = dataset.select($(inputCol)).rdd.map { case Row(v: Vector) => v } val idf = new feature.IDF($(minDocFreq)).fit(input) copyValues(new IDFModel(uid, idf).setParent(this)) } http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala index 15c308b..09fad23 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala @@ -69,7 +69,7 @@ class MaxAbsScaler @Since("2.0.0") (override val uid: String) override def fit(dataset: DataFrame): MaxAbsScalerModel = { transformSchema(dataset.schema, logging = true) - val input = dataset.select($(inputCol)).map { case Row(v: Vector) => v } + val input = dataset.select($(inputCol)).rdd.map { case Row(v: Vector) => v } val summary = Statistics.colStats(input) val minVals = summary.min.toArray val maxVals = summary.max.toArray http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala index ad0458d..18be5c0 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala @@ -108,7 +108,7 @@ class MinMaxScaler(override val uid: String) override def fit(dataset: DataFrame): MinMaxScalerModel = { transformSchema(dataset.schema, logging = true) - val input = dataset.select($(inputCol)).map { case Row(v: Vector) => v } + val input = dataset.select($(inputCol)).rdd.map { case Row(v: Vector) => v } val summary = Statistics.colStats(input) copyValues(new MinMaxScalerModel(uid, summary.min, summary.max).setParent(this)) } http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala index 3425404..e9df161 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala @@ -130,7 +130,7 @@ class OneHotEncoder(override val uid: String) extends Transformer transformSchema(dataset.schema)(outputColName)) if (outputAttrGroup.size < 0) { // If the number of attributes is unknown, we check the values from the input column. - val numAttrs = dataset.select(col(inputColName).cast(DoubleType)).map(_.getDouble(0)) + val numAttrs = dataset.select(col(inputColName).cast(DoubleType)).rdd.map(_.getDouble(0)) .aggregate(0.0)( (m, x) => { assert(x >=0.0 && x == x.toInt, http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala index 0e07dfa..80b124f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala @@ -70,7 +70,7 @@ class PCA (override val uid: String) extends Estimator[PCAModel] with PCAParams */ override def fit(dataset: DataFrame): PCAModel = { transformSchema(dataset.schema, logging = true) - val input = dataset.select($(inputCol)).map { case Row(v: Vector) => v} + val input = dataset.select($(inputCol)).rdd.map { case Row(v: Vector) => v} val pca = new feature.PCA(k = $(k)) val pcaModel = pca.fit(input) copyValues(new PCAModel(uid, pcaModel.pc, pcaModel.explainedVariance).setParent(this)) http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala index 6a0b6c2..9952d3b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala @@ -87,7 +87,7 @@ class StandardScaler(override val uid: String) extends Estimator[StandardScalerM override def fit(dataset: DataFrame): StandardScalerModel = { transformSchema(dataset.schema, logging = true) - val input = dataset.select($(inputCol)).map { case Row(v: Vector) => v } + val input = dataset.select($(inputCol)).rdd.map { case Row(v: Vector) => v } val scaler = new feature.StandardScaler(withMean = $(withMean), withStd = $(withStd)) val scalerModel = scaler.fit(input) copyValues(new StandardScalerModel(uid, scalerModel.std, scalerModel.mean).setParent(this)) http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala index 555f113..7dd794b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala @@ -83,6 +83,7 @@ class StringIndexer(override val uid: String) extends Estimator[StringIndexerMod override def fit(dataset: DataFrame): StringIndexerModel = { val counts = dataset.select(col($(inputCol)).cast(StringType)) + .rdd .map(_.getString(0)) .countByValue() val labels = counts.toSeq.sortBy(-_._2).map(_._1).toArray http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala index 2a52684..5c11760 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala @@ -113,7 +113,7 @@ class VectorIndexer(override val uid: String) extends Estimator[VectorIndexerMod val firstRow = dataset.select($(inputCol)).take(1) require(firstRow.length == 1, s"VectorIndexer cannot be fit on an empty dataset.") val numFeatures = firstRow(0).getAs[Vector](0).size - val vectorDataset = dataset.select($(inputCol)).map { case Row(v: Vector) => v } + val vectorDataset = dataset.select($(inputCol)).rdd.map { case Row(v: Vector) => v } val maxCats = $(maxCategories) val categoryStats: VectorIndexer.CategoryStats = vectorDataset.mapPartitions { iter => val localCatStats = new VectorIndexer.CategoryStats(numFeatures, maxCats) http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala index 2b6b3c3..a4c3d27 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala @@ -138,7 +138,7 @@ final class Word2Vec(override val uid: String) extends Estimator[Word2VecModel] override def fit(dataset: DataFrame): Word2VecModel = { transformSchema(dataset.schema, logging = true) - val input = dataset.select($(inputCol)).map(_.getAs[Seq[String]](0)) + val input = dataset.select($(inputCol)).rdd.map(_.getAs[Seq[String]](0)) val wordVectors = new feature.Word2Vec() .setLearningRate($(stepSize)) .setMinCount($(minCount)) http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 4be4d6a..dacdac9 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -392,6 +392,7 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel] val r = if ($(ratingCol) != "") col($(ratingCol)).cast(FloatType) else lit(1.0f) val ratings = dataset .select(col($(userCol)).cast(IntegerType), col($(itemCol)).cast(IntegerType), r) + .rdd .map { row => Rating(row.getInt(0), row.getInt(1), row.getFloat(2)) } http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala index 1e5b4cb..e4339d6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala @@ -184,7 +184,7 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S * and put it in an RDD with strong types. */ protected[ml] def extractAFTPoints(dataset: DataFrame): RDD[AFTPoint] = { - dataset.select($(featuresCol), $(labelCol), $(censorCol)).map { + dataset.select($(featuresCol), $(labelCol), $(censorCol)).rdd.map { case Row(features: Vector, label: Double, censor: Double) => AFTPoint(features, label, censor) } http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala index 1573bb4..36b006c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala @@ -90,9 +90,9 @@ private[regression] trait IsotonicRegressionBase extends Params with HasFeatures } else { lit(1.0) } - dataset.select(col($(labelCol)), f, w) - .map { case Row(label: Double, feature: Double, weight: Double) => - (label, feature, weight) + dataset.select(col($(labelCol)), f, w).rdd.map { + case Row(label: Double, feature: Double, weight: Double) => + (label, feature, weight) } } http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index ccfb5c4..8f78fd1 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -158,7 +158,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String override protected def train(dataset: DataFrame): LinearRegressionModel = { // Extract the number of features before deciding optimization solver. - val numFeatures = dataset.select(col($(featuresCol))).limit(1).map { + val numFeatures = dataset.select(col($(featuresCol))).limit(1).rdd.map { case Row(features: Vector) => features.size }.first() val w = if ($(weightCol).isEmpty) lit(1.0) else col($(weightCol)) @@ -170,7 +170,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String // For low dimensional data, WeightedLeastSquares is more efficiently since the // training algorithm only requires one pass through the data. (SPARK-10668) val instances: RDD[Instance] = dataset.select( - col($(labelCol)), w, col($(featuresCol))).map { + col($(labelCol)), w, col($(featuresCol))).rdd.map { case Row(label: Double, weight: Double, features: Vector) => Instance(label, weight, features) } @@ -196,10 +196,11 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String return lrModel.setSummary(trainingSummary) } - val instances: RDD[Instance] = dataset.select(col($(labelCol)), w, col($(featuresCol))).map { - case Row(label: Double, weight: Double, features: Vector) => - Instance(label, weight, features) - } + val instances: RDD[Instance] = + dataset.select(col($(labelCol)), w, col($(featuresCol))).rdd.map { + case Row(label: Double, weight: Double, features: Vector) => + Instance(label, weight, features) + } val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) @@ -513,6 +514,7 @@ object LinearRegressionModel extends MLReadable[LinearRegressionModel] { * :: Experimental :: * Linear regression training results. Currently, the training summary ignores the * training coefficients except for the objective trace. + * * @param predictions predictions outputted by the model's `transform` method. * @param objectiveHistory objective function (scaled loss + regularization) at each iteration. */ @@ -537,6 +539,7 @@ class LinearRegressionTrainingSummary private[regression] ( /** * :: Experimental :: * Linear regression results evaluated on a dataset. + * * @param predictions predictions outputted by the model's `transform` method. */ @Since("1.5.0") @@ -551,6 +554,7 @@ class LinearRegressionSummary private[regression] ( @transient private val metrics = new RegressionMetrics( predictions .select(predictionCol, labelCol) + .rdd .map { case Row(pred: Double, label: Double) => (pred, label) }, !model.getFitIntercept) http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 93cf16e..ca0ed95 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -1052,7 +1052,7 @@ private[python] class PythonMLLibAPI extends Serializable { * Java stub for the constructor of Python mllib RankingMetrics */ def newRankingMetrics(predictionAndLabels: DataFrame): RankingMetrics[Any] = { - new RankingMetrics(predictionAndLabels.map( + new RankingMetrics(predictionAndLabels.rdd.map( r => (r.getSeq(0).toArray[Any], r.getSeq(1).toArray[Any]))) } @@ -1135,7 +1135,7 @@ private[python] class PythonMLLibAPI extends Serializable { def createIndexedRowMatrix(rows: DataFrame, numRows: Long, numCols: Int): IndexedRowMatrix = { // We use DataFrames for serialization of IndexedRows from Python, // so map each Row in the DataFrame back to an IndexedRow. - val indexedRows = rows.map { + val indexedRows = rows.rdd.map { case Row(index: Long, vector: Vector) => IndexedRow(index, vector) } new IndexedRowMatrix(indexedRows, numRows, numCols) @@ -1147,7 +1147,7 @@ private[python] class PythonMLLibAPI extends Serializable { def createCoordinateMatrix(rows: DataFrame, numRows: Long, numCols: Long): CoordinateMatrix = { // We use DataFrames for serialization of MatrixEntry entries from // Python, so map each Row in the DataFrame back to a MatrixEntry. - val entries = rows.map { + val entries = rows.rdd.map { case Row(i: Long, j: Long, value: Double) => MatrixEntry(i, j, value) } new CoordinateMatrix(entries, numRows, numCols) @@ -1161,7 +1161,7 @@ private[python] class PythonMLLibAPI extends Serializable { // We use DataFrames for serialization of sub-matrix blocks from // Python, so map each Row in the DataFrame back to a // ((blockRowIndex, blockColIndex), sub-matrix) tuple. - val blockTuples = blocks.map { + val blockTuples = blocks.rdd.map { case Row(Row(blockRowIndex: Long, blockColIndex: Long), subMatrix: Matrix) => ((blockRowIndex.toInt, blockColIndex.toInt), subMatrix) } http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala index 26c6235..3b91fe8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala @@ -143,7 +143,7 @@ object KMeansModel extends Loader[KMeansModel] { val k = (metadata \ "k").extract[Int] val centroids = sqlContext.read.parquet(Loader.dataPath(path)) Loader.checkSchema[Cluster](centroids.schema) - val localCentroids = centroids.map(Cluster.apply).collect() + val localCentroids = centroids.rdd.map(Cluster.apply).collect() assert(k == localCentroids.size) new KMeansModel(localCentroids.sortBy(_.id).map(_.point)) } http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala index b30ecb8..25d67a3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala @@ -896,11 +896,11 @@ object DistributedLDAModel extends Loader[DistributedLDAModel] { Loader.checkSchema[EdgeData](edgeDataFrame.schema) val globalTopicTotals: LDA.TopicCounts = dataFrame.first().getAs[Vector](0).toBreeze.toDenseVector - val vertices: RDD[(VertexId, LDA.TopicCounts)] = vertexDataFrame.map { + val vertices: RDD[(VertexId, LDA.TopicCounts)] = vertexDataFrame.rdd.map { case Row(ind: Long, vec: Vector) => (ind, vec.toBreeze.toDenseVector) } - val edges: RDD[Edge[LDA.TokenCount]] = edgeDataFrame.map { + val edges: RDD[Edge[LDA.TokenCount]] = edgeDataFrame.rdd.map { case Row(srcId: Long, dstId: Long, prop: Double) => Edge(srcId, dstId, prop) } val graph: Graph[LDA.TopicCounts, LDA.TokenCount] = Graph(vertices, edges) http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala index feacafe..9732dfa 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala @@ -93,7 +93,7 @@ object PowerIterationClusteringModel extends Loader[PowerIterationClusteringMode val assignments = sqlContext.read.parquet(Loader.dataPath(path)) Loader.checkSchema[PowerIterationClustering.Assignment](assignments.schema) - val assignmentsRDD = assignments.map { + val assignmentsRDD = assignments.rdd.map { case Row(id: Long, cluster: Int) => PowerIterationClustering.Assignment(id, cluster) } http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala index 12cf220..319c547 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala @@ -58,7 +58,7 @@ class BinaryClassificationMetrics @Since("1.3.0") ( * @param scoreAndLabels a DataFrame with two double columns: score and label */ private[mllib] def this(scoreAndLabels: DataFrame) = - this(scoreAndLabels.map(r => (r.getDouble(0), r.getDouble(1)))) + this(scoreAndLabels.rdd.map(r => (r.getDouble(0), r.getDouble(1)))) /** * Unpersist intermediate RDDs used in the computation. http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MulticlassMetrics.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MulticlassMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MulticlassMetrics.scala index c510496..3029b15 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MulticlassMetrics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MulticlassMetrics.scala @@ -38,7 +38,7 @@ class MulticlassMetrics @Since("1.1.0") (predictionAndLabels: RDD[(Double, Doubl * @param predictionAndLabels a DataFrame with two double columns: prediction and label */ private[mllib] def this(predictionAndLabels: DataFrame) = - this(predictionAndLabels.map(r => (r.getDouble(0), r.getDouble(1)))) + this(predictionAndLabels.rdd.map(r => (r.getDouble(0), r.getDouble(1)))) private lazy val labelCountByClass: Map[Double, Long] = predictionAndLabels.values.countByValue() private lazy val labelCount: Long = labelCountByClass.values.sum http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MultilabelMetrics.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MultilabelMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MultilabelMetrics.scala index c100b3c..daf6ff4 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MultilabelMetrics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MultilabelMetrics.scala @@ -35,7 +35,9 @@ class MultilabelMetrics @Since("1.2.0") (predictionAndLabels: RDD[(Array[Double] * @param predictionAndLabels a DataFrame with two double array columns: prediction and label */ private[mllib] def this(predictionAndLabels: DataFrame) = - this(predictionAndLabels.map(r => (r.getSeq[Double](0).toArray, r.getSeq[Double](1).toArray))) + this(predictionAndLabels.rdd.map { r => + (r.getSeq[Double](0).toArray, r.getSeq[Double](1).toArray) + }) private lazy val numDocs: Long = predictionAndLabels.count() http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala index 18c90b2..0f4c97e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala @@ -46,7 +46,7 @@ class RegressionMetrics @Since("2.0.0") ( * prediction and observation */ private[mllib] def this(predictionAndObservations: DataFrame) = - this(predictionAndObservations.map(r => (r.getDouble(0), r.getDouble(1)))) + this(predictionAndObservations.rdd.map(r => (r.getDouble(0), r.getDouble(1)))) /** * Use MultivariateOnlineSummarizer to calculate summary statistics of observations and errors. http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala index 33728bf..4f0e13f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala @@ -161,7 +161,7 @@ object ChiSqSelectorModel extends Loader[ChiSqSelectorModel] { // Check schema explicitly since erasure makes it hard to use match-case for checking. Loader.checkSchema[Data](dataFrame.schema) - val features = dataArray.map { + val features = dataArray.rdd.map { case Row(feature: Int) => (feature) }.collect() http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala index 85d6093..b35d721 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala @@ -134,7 +134,7 @@ object FPGrowthModel extends Loader[FPGrowthModel[_]] { } def loadImpl[Item: ClassTag](freqItemsets: DataFrame, sample: Item): FPGrowthModel[Item] = { - val freqItemsetsRDD = freqItemsets.select("items", "freq").map { x => + val freqItemsetsRDD = freqItemsets.select("items", "freq").rdd.map { x => val items = x.getAs[Seq[Item]](0).toArray val freq = x.getLong(1) new FreqItemset(items, freq) http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index 628cf1d..c91729a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -369,13 +369,13 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { assert(className == thisClassName) assert(formatVersion == thisFormatVersion) val rank = (metadata \ "rank").extract[Int] - val userFeatures = sqlContext.read.parquet(userPath(path)) - .map { case Row(id: Int, features: Seq[_]) => + val userFeatures = sqlContext.read.parquet(userPath(path)).rdd.map { + case Row(id: Int, features: Seq[_]) => + (id, features.asInstanceOf[Seq[Double]].toArray) + } + val productFeatures = sqlContext.read.parquet(productPath(path)).rdd.map { + case Row(id: Int, features: Seq[_]) => (id, features.asInstanceOf[Seq[Double]].toArray) - } - val productFeatures = sqlContext.read.parquet(productPath(path)) - .map { case Row(id: Int, features: Seq[_]) => - (id, features.asInstanceOf[Seq[Double]].toArray) } new MatrixFactorizationModel(rank, userFeatures, productFeatures) } http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala index 89c470d..ec5d7b9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala @@ -247,7 +247,7 @@ object DecisionTreeModel extends Loader[DecisionTreeModel] with Logging { val dataRDD = sqlContext.read.parquet(datapath) // Check schema explicitly since erasure makes it hard to use match-case for checking. Loader.checkSchema[NodeData](dataRDD.schema) - val nodes = dataRDD.map(NodeData.apply) + val nodes = dataRDD.rdd.map(NodeData.apply) // Build node data into a tree. val trees = constructTrees(nodes) assert(trees.size == 1, http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala index feabcee..59713c3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala @@ -473,7 +473,7 @@ private[tree] object TreeEnsembleModel extends Logging { treeAlgo: String): Array[DecisionTreeModel] = { val datapath = Loader.dataPath(path) val sqlContext = SQLContext.getOrCreate(sc) - val nodes = sqlContext.read.parquet(datapath).map(NodeData.apply) + val nodes = sqlContext.read.parquet(datapath).rdd.map(NodeData.apply) val trees = constructTrees(nodes) trees.map(new DecisionTreeModel(_, Algo.fromString(treeAlgo))) } http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala index 972c086..cfb9bbf 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala @@ -735,7 +735,7 @@ class LogisticRegressionSuite val model1 = trainer1.fit(binaryDataset) val model2 = trainer2.fit(binaryDataset) - val histogram = binaryDataset.map { case Row(label: Double, features: Vector) => label } + val histogram = binaryDataset.rdd.map { case Row(label: Double, features: Vector) => label } .treeAggregate(new MultiClassSummarizer)( seqOp = (c, v) => (c, v) match { case (classSummarizer: MultiClassSummarizer, label: Double) => classSummarizer.add(label) http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala index a326432..602b5a8 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala @@ -76,8 +76,9 @@ class MultilayerPerceptronClassifierSuite extends SparkFunSuite with MLlibTestSp val model = trainer.fit(dataFrame) val numFeatures = dataFrame.select("features").first().getAs[Vector](0).size assert(model.numFeatures === numFeatures) - val mlpPredictionAndLabels = model.transform(dataFrame).select("prediction", "label") - .map { case Row(p: Double, l: Double) => (p, l) } + val mlpPredictionAndLabels = model.transform(dataFrame).select("prediction", "label").rdd.map { + case Row(p: Double, l: Double) => (p, l) + } // train multinomial logistic regression val lr = new LogisticRegressionWithLBFGS() .setIntercept(true) http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala index 445e50d..2ae74a2 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala @@ -81,9 +81,9 @@ class OneVsRestSuite extends SparkFunSuite with MLlibTestSparkContext { val predictionColSchema = transformedDataset.schema(ovaModel.getPredictionCol) assert(MetadataUtils.getNumClasses(predictionColSchema) === Some(3)) - val ovaResults = transformedDataset - .select("prediction", "label") - .map(row => (row.getDouble(0), row.getDouble(1))) + val ovaResults = transformedDataset.select("prediction", "label").rdd.map { + row => (row.getDouble(0), row.getDouble(1)) + } val lr = new LogisticRegressionWithLBFGS().setIntercept(true).setNumClasses(numClasses) lr.optimizer.setRegParam(0.1).setNumIterations(100) http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala index fc4a4ad..b719a8c 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala @@ -77,7 +77,8 @@ class BisectingKMeansSuite extends SparkFunSuite with MLlibTestSparkContext { expectedColumns.foreach { column => assert(transformed.columns.contains(column)) } - val clusters = transformed.select(predictionColName).map(_.getInt(0)).distinct().collect().toSet + val clusters = + transformed.select(predictionColName).rdd.map(_.getInt(0)).distinct().collect().toSet assert(clusters.size === k) assert(clusters === Set(0, 1, 2, 3, 4)) assert(model.computeCost(dataset) < 0.1) http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala index e5357ba..c684bc1 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala @@ -93,7 +93,8 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultR expectedColumns.foreach { column => assert(transformed.columns.contains(column)) } - val clusters = transformed.select(predictionColName).map(_.getInt(0)).distinct().collect().toSet + val clusters = + transformed.select(predictionColName).rdd.map(_.getInt(0)).distinct().collect().toSet assert(clusters.size === k) assert(clusters === Set(0, 1, 2, 3, 4)) assert(model.computeCost(dataset) < 0.1) http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala index 97dbfd9..0327040 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala @@ -199,7 +199,7 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead // describeTopics val topics = model.describeTopics(3) assert(topics.count() === k) - assert(topics.select("topic").map(_.getInt(0)).collect().toSet === Range(0, k).toSet) + assert(topics.select("topic").rdd.map(_.getInt(0)).collect().toSet === Range(0, k).toSet) topics.select("termIndices").collect().foreach { case r: Row => val termIndices = r.getAs[Seq[Int]](0) assert(termIndices.length === 3 && termIndices.toSet.size === 3) http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderSuite.scala index 76d1205..e238b33 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderSuite.scala @@ -51,7 +51,7 @@ class OneHotEncoderSuite .setDropLast(false) val encoded = encoder.transform(transformed) - val output = encoded.select("id", "labelVec").map { r => + val output = encoded.select("id", "labelVec").rdd.map { r => val vec = r.getAs[Vector](1) (r.getInt(0), vec(0), vec(1), vec(2)) }.collect().toSet @@ -68,7 +68,7 @@ class OneHotEncoderSuite .setOutputCol("labelVec") val encoded = encoder.transform(transformed) - val output = encoded.select("id", "labelVec").map { r => + val output = encoded.select("id", "labelVec").rdd.map { r => val vec = r.getAs[Vector](1) (r.getInt(0), vec(0), vec(1)) }.collect().toSet http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala index 0dbaed2..b9533f8 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala @@ -52,7 +52,7 @@ class StringIndexerSuite val attr = Attribute.fromStructField(transformed.schema("labelIndex")) .asInstanceOf[NominalAttribute] assert(attr.values.get === Array("a", "c", "b")) - val output = transformed.select("id", "labelIndex").map { r => + val output = transformed.select("id", "labelIndex").rdd.map { r => (r.getInt(0), r.getDouble(1)) }.collect().toSet // a -> 0, b -> 2, c -> 1 @@ -83,7 +83,7 @@ class StringIndexerSuite val attr = Attribute.fromStructField(transformed.schema("labelIndex")) .asInstanceOf[NominalAttribute] assert(attr.values.get === Array("b", "a")) - val output = transformed.select("id", "labelIndex").map { r => + val output = transformed.select("id", "labelIndex").rdd.map { r => (r.getInt(0), r.getDouble(1)) }.collect().toSet // a -> 1, b -> 0 @@ -102,7 +102,7 @@ class StringIndexerSuite val attr = Attribute.fromStructField(transformed.schema("labelIndex")) .asInstanceOf[NominalAttribute] assert(attr.values.get === Array("100", "300", "200")) - val output = transformed.select("id", "labelIndex").map { r => + val output = transformed.select("id", "labelIndex").rdd.map { r => (r.getInt(0), r.getDouble(1)) }.collect().toSet // 100 -> 0, 200 -> 2, 300 -> 1 http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala index 67817fa..d4f836e 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala @@ -159,7 +159,7 @@ class VectorIndexerSuite extends SparkFunSuite with MLlibTestSparkContext // Chose correct categorical features assert(categoryMaps.keys.toSet === categoricalFeatures) val transformed = model.transform(data).select("indexed") - val indexedRDD: RDD[Vector] = transformed.map(_.getAs[Vector](0)) + val indexedRDD: RDD[Vector] = transformed.rdd.map(_.getAs[Vector](0)) val featureAttrs = AttributeGroup.fromStructField(transformed.schema("indexed")) assert(featureAttrs.name === "indexed") assert(featureAttrs.attributes.get.length === model.numFeatures) @@ -216,7 +216,8 @@ class VectorIndexerSuite extends SparkFunSuite with MLlibTestSparkContext val points = data.collect().map(_.getAs[Vector](0)) val vectorIndexer = getIndexer.setMaxCategories(maxCategories) val model = vectorIndexer.fit(data) - val indexedPoints = model.transform(data).select("indexed").map(_.getAs[Vector](0)).collect() + val indexedPoints = + model.transform(data).select("indexed").rdd.map(_.getAs[Vector](0)).collect() points.zip(indexedPoints).foreach { case (orig: SparseVector, indexed: SparseVector) => assert(orig.indices.length == indexed.indices.length) http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/test/scala/org/apache/spark/ml/feature/Word2VecSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/Word2VecSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/Word2VecSuite.scala index f094c55..1671fb6 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/Word2VecSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/Word2VecSuite.scala @@ -100,7 +100,7 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul .setSeed(42L) .fit(docDF) - val realVectors = model.getVectors.sort("word").select("vector").map { + val realVectors = model.getVectors.sort("word").select("vector").rdd.map { case Row(v: Vector) => v }.collect() // These expectations are just magic values, characterizing the current @@ -134,7 +134,7 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul .fit(docDF) val expectedSimilarity = Array(0.2608488929093532, -0.8271274846926078) - val (synonyms, similarity) = model.findSynonyms("a", 2).map { + val (synonyms, similarity) = model.findSynonyms("a", 2).rdd.map { case Row(w: String, sim: Double) => (w, sim) }.collect().unzip @@ -161,7 +161,7 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul .setSeed(42L) .fit(docDF) - val (synonyms, similarity) = model.findSynonyms("a", 6).map { + val (synonyms, similarity) = model.findSynonyms("a", 6).rdd.map { case Row(w: String, sim: Double) => (w, sim) }.collect().unzip @@ -174,7 +174,7 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul .setWindowSize(10) .fit(docDF) - val (synonymsLarger, similarityLarger) = model.findSynonyms("a", 6).map { + val (synonymsLarger, similarityLarger) = model.findSynonyms("a", 6).rdd.map { case Row(w: String, sim: Double) => (w, sim) }.collect().unzip // The similarity score should be very different with the larger window http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala index ff0d8f5..2bedd70 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala @@ -342,11 +342,10 @@ class ALSSuite .setSeed(0) val alpha = als.getAlpha val model = als.fit(training.toDF()) - val predictions = model.transform(test.toDF()) - .select("rating", "prediction") - .map { case Row(rating: Float, prediction: Float) => + val predictions = model.transform(test.toDF()).select("rating", "prediction").rdd.map { + case Row(rating: Float, prediction: Float) => (rating.toDouble, prediction.toDouble) - } + } val rmse = if (implicitPrefs) { // TODO: Use a better (rank-based?) evaluation metric for implicit feedback. http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/test/scala/org/apache/spark/ml/regression/GBTRegressorSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/GBTRegressorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/GBTRegressorSuite.scala index 0932660..244db86 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/GBTRegressorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/GBTRegressorSuite.scala @@ -87,7 +87,7 @@ class GBTRegressorSuite extends SparkFunSuite with MLlibTestSparkContext { // copied model must have the same parent. MLTestingUtils.checkCopy(model) val preds = model.transform(df) - val predictions = preds.select("prediction").map(_.getDouble(0)) + val predictions = preds.select("prediction").rdd.map(_.getDouble(0)) // Checks based on SPARK-8736 (to ensure it is not doing classification) assert(predictions.max() > 2) assert(predictions.min() < -1) http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala index f067c29..b8874b4 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala @@ -46,7 +46,7 @@ class IsotonicRegressionSuite val predictions = model .transform(dataset) - .select("prediction").map { case Row(pred) => + .select("prediction").rdd.map { case Row(pred) => pred }.collect() @@ -66,7 +66,7 @@ class IsotonicRegressionSuite val predictions = model .transform(features) - .select("prediction").map { + .select("prediction").rdd.map { case Row(pred) => pred }.collect() @@ -160,7 +160,7 @@ class IsotonicRegressionSuite val predictions = model .transform(features) - .select("prediction").map { + .select("prediction").rdd.map { case Row(pred) => pred }.collect() http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala index 3ae108d..9dee04c 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala @@ -686,17 +686,18 @@ class LinearRegressionSuite // Residuals in [[LinearRegressionResults]] should equal those manually computed val expectedResiduals = datasetWithDenseFeature.select("features", "label") + .rdd .map { case Row(features: DenseVector, label: Double) => - val prediction = - features(0) * model.coefficients(0) + features(1) * model.coefficients(1) + - model.intercept - label - prediction - } - .zip(model.summary.residuals.map(_.getDouble(0))) + val prediction = + features(0) * model.coefficients(0) + features(1) * model.coefficients(1) + + model.intercept + label - prediction + } + .zip(model.summary.residuals.rdd.map(_.getDouble(0))) .collect() .foreach { case (manualResidual: Double, resultResidual: Double) => - assert(manualResidual ~== resultResidual relTol 1E-5) - } + assert(manualResidual ~== resultResidual relTol 1E-5) + } /* # Use the following R code to generate model training results. http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index abb8fe5..5f5b7f4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1427,30 +1427,6 @@ class DataFrame private[sql]( def transform[U](t: DataFrame => DataFrame): DataFrame = t(this) /** - * Returns a new RDD by applying a function to all rows of this DataFrame. - * @group rdd - * @since 1.3.0 - */ - def map[R: ClassTag](f: Row => R): RDD[R] = rdd.map(f) - - /** - * Returns a new RDD by first applying a function to all rows of this [[DataFrame]], - * and then flattening the results. - * @group rdd - * @since 1.3.0 - */ - def flatMap[R: ClassTag](f: Row => TraversableOnce[R]): RDD[R] = rdd.flatMap(f) - - /** - * Returns a new RDD by applying a function to each partition of this DataFrame. - * @group rdd - * @since 1.3.0 - */ - def mapPartitions[R: ClassTag](f: Iterator[Row] => Iterator[R]): RDD[R] = { - rdd.mapPartitions(f) - } - - /** * Applies a function `f` to all rows. * @group rdd * @since 1.3.0 http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala index f06d161..a7258d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala @@ -306,6 +306,7 @@ class GroupedData protected[sql]( val values = df.select(pivotColumn) .distinct() .sort(pivotColumn) // ensure that the output columns are in a consistent logical order + .rdd .map(_.get(0)) .take(maxValues + 1) .toSeq http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala index d912aeb..68a2517 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala @@ -100,7 +100,7 @@ private[r] object SQLUtils { } def dfToRowRDD(df: DataFrame): JavaRDD[Array[Byte]] = { - df.map(r => rowToRBytes(r)) + df.rdd.map(r => rowToRBytes(r)) } private[this] def doConversion(data: Object, dataType: DataType): Object = { http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala index f54bff9..7d96ef6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -257,7 +257,7 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext { } test("count") { - assert(testData2.count() === testData2.map(_ => 1).count()) + assert(testData2.count() === testData2.rdd.map(_ => 1).count()) checkAnswer( testData2.agg(count('a), sumDistinct('a)), // non-partial http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index fbffe86..bd51154 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -101,7 +101,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex (implicit df: DataFrame): Unit = { def checkBinaryAnswer(df: DataFrame, expected: Seq[Row]) = { assertResult(expected.map(_.getAs[Array[Byte]](0).mkString(",")).sorted) { - df.map(_.getAs[Array[Byte]](0).mkString(",")).collect().toSeq.sorted + df.rdd.map(_.getAs[Array[Byte]](0).mkString(",")).collect().toSeq.sorted } } http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 3c74464..c85eedd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -599,7 +599,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { test("null and non-null strings") { // Create a dataset where the first values are NULL and then some non-null values. The // number of non-nulls needs to be bigger than the ParquetReader batch size. - val data = sqlContext.range(200).map { i => + val data = sqlContext.range(200).rdd.map { i => if (i.getLong(0) < 150) Row(None) else Row("a") } http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala index f141a9b..12a5542 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala @@ -210,7 +210,7 @@ object SparkSubmitClassLoaderTest extends Logging { } // Second, we load classes at the executor side. logInfo("Testing load classes at the executor side.") - val result = df.mapPartitions { x => + val result = df.rdd.mapPartitions { x => var exception: String = null try { Utils.classForName(args(0)) http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 3208ebc..1002487 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -664,11 +664,13 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { test("implement identity function using case statement") { val actual = sql("SELECT (CASE key WHEN key THEN key END) FROM src") + .rdd .map { case Row(i: Int) => i } .collect() .toSet val expected = sql("SELECT key FROM src") + .rdd .map { case Row(i: Int) => i } .collect() .toSet http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index b11d1d9..6824951 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -119,6 +119,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { // expr = (not leaf-0) assertResult(10) { sql("SELECT name, contacts FROM t where age > 5") + .rdd .flatMap(_.getAs[Seq[_]]("contacts")) .count() } @@ -131,7 +132,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { val df = sql("SELECT name, contacts FROM t WHERE age > 5 AND age < 8") assert(df.count() === 2) assertResult(4) { - df.flatMap(_.getAs[Seq[_]]("contacts")).count() + df.rdd.flatMap(_.getAs[Seq[_]]("contacts")).count() } } @@ -143,7 +144,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { val df = sql("SELECT name, contacts FROM t WHERE age < 2 OR age > 8") assert(df.count() === 3) assertResult(6) { - df.flatMap(_.getAs[Seq[_]]("contacts")).count() + df.rdd.flatMap(_.getAs[Seq[_]]("contacts")).count() } } } http://git-wip-us.apache.org/repos/asf/spark/blob/99dfcedb/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 68d5c7d..a127cf6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -854,7 +854,7 @@ abstract class ParquetPartitioningTest extends QueryTest with SQLTestUtils with test(s"hive udfs $table") { checkAnswer( sql(s"SELECT concat(stringField, stringField) FROM $table"), - sql(s"SELECT stringField FROM $table").map { + sql(s"SELECT stringField FROM $table").rdd.map { case Row(s: String) => Row(s + s) }.collect().toSeq) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org