[Minor] [SQL] Cleans up DataFrame variable names and toDF() calls Although we've migrated to the DataFrame API, lots of code still uses `rdd` or `srdd` as local variable names. This PR tries to address these naming inconsistencies and some other minor DataFrame related style issues.
<!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/4670) <!-- Reviewable:end --> Author: Cheng Lian <l...@databricks.com> Closes #4670 from liancheng/df-cleanup and squashes the following commits: 3e14448 [Cheng Lian] Cleans up DataFrame variable names and toDF() calls (cherry picked from commit 61ab08549cb6fceb6de1b5c490c55a89d4bd28fa) Signed-off-by: Reynold Xin <r...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2bd33ce6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2bd33ce6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2bd33ce6 Branch: refs/heads/branch-1.3 Commit: 2bd33ce62b4c43be4dbac6b46f979c36a0e9aba8 Parents: f8f9a64 Author: Cheng Lian <l...@databricks.com> Authored: Tue Feb 17 23:36:20 2015 -0800 Committer: Reynold Xin <r...@databricks.com> Committed: Tue Feb 17 23:36:30 2015 -0800 ---------------------------------------------------------------------- .../examples/ml/CrossValidatorExample.scala | 2 +- .../spark/examples/ml/DeveloperApiExample.scala | 4 +- .../apache/spark/examples/ml/MovieLensALS.scala | 6 +-- .../spark/examples/ml/SimpleParamsExample.scala | 6 +-- .../ml/SimpleTextClassificationPipeline.scala | 4 +- .../spark/examples/mllib/DatasetExample.scala | 2 +- .../apache/spark/examples/sql/RDDRelation.scala | 2 +- .../spark/examples/sql/hive/HiveFromSpark.scala | 2 +- .../spark/mllib/classification/NaiveBayes.scala | 2 +- .../impl/GLMClassificationModel.scala | 2 +- .../regression/impl/GLMRegressionModel.scala | 2 +- .../mllib/tree/model/DecisionTreeModel.scala | 2 +- .../mllib/tree/model/treeEnsembleModels.scala | 2 +- .../spark/ml/recommendation/ALSSuite.scala | 4 +- .../scala/org/apache/spark/sql/DataFrame.scala | 2 +- .../scala/org/apache/spark/sql/SQLContext.scala | 4 +- .../apache/spark/sql/parquet/ParquetTest.scala | 6 +-- .../org/apache/spark/sql/CachedTableSuite.scala | 14 +++--- .../org/apache/spark/sql/DataFrameSuite.scala | 26 +++++----- .../scala/org/apache/spark/sql/JoinSuite.scala | 8 +-- .../scala/org/apache/spark/sql/QueryTest.scala | 46 ++++++++--------- .../org/apache/spark/sql/SQLQuerySuite.scala | 6 +-- .../sql/ScalaReflectionRelationSuite.scala | 10 ++-- .../scala/org/apache/spark/sql/TestData.scala | 48 ++++++++++-------- .../apache/spark/sql/jdbc/JDBCWriteSuite.scala | 46 ++++++++--------- .../spark/sql/jdbc/MySQLIntegration.scala | 53 +++++++++----------- .../spark/sql/jdbc/PostgresIntegration.scala | 30 ++++++----- .../spark/sql/parquet/ParquetFilterSuite.scala | 40 +++++++-------- .../spark/sql/parquet/ParquetIOSuite.scala | 28 +++++------ .../spark/sql/parquet/ParquetQuerySuite.scala | 2 +- .../spark/sql/parquet/ParquetSchemaSuite.scala | 4 +- .../sql/hive/InsertIntoHiveTableSuite.scala | 4 +- .../sql/hive/MetastoreDataSourcesSuite.scala | 8 +-- .../apache/spark/sql/hive/StatisticsSuite.scala | 38 +++++++------- .../sql/hive/execution/HiveQuerySuite.scala | 20 ++++---- .../hive/execution/HiveResolutionSuite.scala | 6 +-- .../spark/sql/hive/execution/HiveUdfSuite.scala | 18 +++---- 37 files changed, 250 insertions(+), 259 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/2bd33ce6/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala index f024194..7ab892c 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala @@ -90,7 +90,7 @@ object CrossValidatorExample { crossval.setNumFolds(2) // Use 3+ in practice // Run cross-validation, and choose the best set of parameters. - val cvModel = crossval.fit(training.toDF) + val cvModel = crossval.fit(training.toDF()) // Prepare test documents, which are unlabeled. val test = sc.parallelize(Seq( http://git-wip-us.apache.org/repos/asf/spark/blob/2bd33ce6/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala index 54aadd2..df26798 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala @@ -58,7 +58,7 @@ object DeveloperApiExample { lr.setMaxIter(10) // Learn a LogisticRegression model. This uses the parameters stored in lr. - val model = lr.fit(training.toDF) + val model = lr.fit(training.toDF()) // Prepare test data. val test = sc.parallelize(Seq( @@ -67,7 +67,7 @@ object DeveloperApiExample { LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5)))) // Make predictions on test data. - val sumPredictions: Double = model.transform(test.toDF) + val sumPredictions: Double = model.transform(test.toDF()) .select("features", "label", "prediction") .collect() .map { case Row(features: Vector, label: Double, prediction: Double) => http://git-wip-us.apache.org/repos/asf/spark/blob/2bd33ce6/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala b/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala index adaf796..96b2dd4 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala @@ -137,9 +137,9 @@ object MovieLensALS { .setRegParam(params.regParam) .setNumBlocks(params.numBlocks) - val model = als.fit(training.toDF) + val model = als.fit(training.toDF()) - val predictions = model.transform(test.toDF).cache() + val predictions = model.transform(test.toDF()).cache() // Evaluate the model. // TODO: Create an evaluator to compute RMSE. @@ -158,7 +158,7 @@ object MovieLensALS { // Inspect false positives. predictions.registerTempTable("prediction") - sc.textFile(params.movies).map(Movie.parseMovie).toDF.registerTempTable("movie") + sc.textFile(params.movies).map(Movie.parseMovie).toDF().registerTempTable("movie") sqlContext.sql( """ |SELECT userId, prediction.movieId, title, rating, prediction http://git-wip-us.apache.org/repos/asf/spark/blob/2bd33ce6/examples/src/main/scala/org/apache/spark/examples/ml/SimpleParamsExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/SimpleParamsExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/SimpleParamsExample.scala index c5bb551..e8af5c1 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/SimpleParamsExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/SimpleParamsExample.scala @@ -58,7 +58,7 @@ object SimpleParamsExample { .setRegParam(0.01) // Learn a LogisticRegression model. This uses the parameters stored in lr. - val model1 = lr.fit(training.toDF) + val model1 = lr.fit(training.toDF()) // Since model1 is a Model (i.e., a Transformer produced by an Estimator), // we can view the parameters it used during fit(). // This prints the parameter (name: value) pairs, where names are unique IDs for this @@ -77,7 +77,7 @@ object SimpleParamsExample { // Now learn a new model using the paramMapCombined parameters. // paramMapCombined overrides all parameters set earlier via lr.set* methods. - val model2 = lr.fit(training.toDF, paramMapCombined) + val model2 = lr.fit(training.toDF(), paramMapCombined) println("Model 2 was fit using parameters: " + model2.fittingParamMap) // Prepare test data. @@ -90,7 +90,7 @@ object SimpleParamsExample { // LogisticRegression.transform will only use the 'features' column. // Note that model2.transform() outputs a 'myProbability' column instead of the usual // 'probability' column since we renamed the lr.probabilityCol parameter previously. - model2.transform(test.toDF) + model2.transform(test.toDF()) .select("features", "label", "myProbability", "prediction") .collect() .foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) => http://git-wip-us.apache.org/repos/asf/spark/blob/2bd33ce6/examples/src/main/scala/org/apache/spark/examples/ml/SimpleTextClassificationPipeline.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/SimpleTextClassificationPipeline.scala b/examples/src/main/scala/org/apache/spark/examples/ml/SimpleTextClassificationPipeline.scala index 8b47f88..a11db6f 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/SimpleTextClassificationPipeline.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/SimpleTextClassificationPipeline.scala @@ -69,7 +69,7 @@ object SimpleTextClassificationPipeline { .setStages(Array(tokenizer, hashingTF, lr)) // Fit the pipeline to training documents. - val model = pipeline.fit(training.toDF) + val model = pipeline.fit(training.toDF()) // Prepare test documents, which are unlabeled. val test = sc.parallelize(Seq( @@ -79,7 +79,7 @@ object SimpleTextClassificationPipeline { Document(7L, "apache hadoop"))) // Make predictions on test documents. - model.transform(test.toDF) + model.transform(test.toDF()) .select("id", "text", "probability", "prediction") .collect() .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) => http://git-wip-us.apache.org/repos/asf/spark/blob/2bd33ce6/examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala index c98c68a..e943d6c 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala @@ -81,7 +81,7 @@ object DatasetExample { println(s"Loaded ${origData.count()} instances from file: ${params.input}") // Convert input data to DataFrame explicitly. - val df: DataFrame = origData.toDF + val df: DataFrame = origData.toDF() println(s"Inferred schema:\n${df.schema.prettyJson}") println(s"Converted to DataFrame with ${df.count()} records") http://git-wip-us.apache.org/repos/asf/spark/blob/2bd33ce6/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala index 79d3d5a..6331d1c 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala @@ -34,7 +34,7 @@ object RDDRelation { // Importing the SQL context gives access to all the SQL functions and implicit conversions. import sqlContext.implicits._ - val df = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i"))).toDF + val df = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i"))).toDF() // Any RDD containing case classes can be registered as a table. The schema of the table is // automatically inferred using scala reflection. df.registerTempTable("records") http://git-wip-us.apache.org/repos/asf/spark/blob/2bd33ce6/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala index 7128deb..b7ba60e 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala @@ -68,7 +68,7 @@ object HiveFromSpark { // You can also register RDDs as temporary tables within a HiveContext. val rdd = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i"))) - rdd.toDF.registerTempTable("records") + rdd.toDF().registerTempTable("records") // Queries can then join RDD data with data stored in Hive. println("Result of SELECT *:") http://git-wip-us.apache.org/repos/asf/spark/blob/2bd33ce6/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala index dd7a946..b11fd4f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -102,7 +102,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] { sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path)) // Create Parquet data. - val dataRDD: DataFrame = sc.parallelize(Seq(data), 1).toDF + val dataRDD: DataFrame = sc.parallelize(Seq(data), 1).toDF() dataRDD.saveAsParquetFile(dataPath(path)) } http://git-wip-us.apache.org/repos/asf/spark/blob/2bd33ce6/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala index 0a358f2..8956189 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala @@ -62,7 +62,7 @@ private[classification] object GLMClassificationModel { // Create Parquet data. val data = Data(weights, intercept, threshold) - sc.parallelize(Seq(data), 1).toDF.saveAsParquetFile(Loader.dataPath(path)) + sc.parallelize(Seq(data), 1).toDF().saveAsParquetFile(Loader.dataPath(path)) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/2bd33ce6/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala index 7b27aaa..bd7e340 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala @@ -58,7 +58,7 @@ private[regression] object GLMRegressionModel { // Create Parquet data. val data = Data(weights, intercept) - val dataRDD: DataFrame = sc.parallelize(Seq(data), 1).toDF + val dataRDD: DataFrame = sc.parallelize(Seq(data), 1).toDF() // TODO: repartition with 1 partition after SPARK-5532 gets fixed dataRDD.saveAsParquetFile(Loader.dataPath(path)) } http://git-wip-us.apache.org/repos/asf/spark/blob/2bd33ce6/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala index 5dac62b..060fd5b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala @@ -197,7 +197,7 @@ object DecisionTreeModel extends Loader[DecisionTreeModel] { val nodes = model.topNode.subtreeIterator.toSeq val dataRDD: DataFrame = sc.parallelize(nodes) .map(NodeData.apply(0, _)) - .toDF + .toDF() dataRDD.saveAsParquetFile(Loader.dataPath(path)) } http://git-wip-us.apache.org/repos/asf/spark/blob/2bd33ce6/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala index e507f24..4897906 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala @@ -289,7 +289,7 @@ private[tree] object TreeEnsembleModel { // Create Parquet data. val dataRDD = sc.parallelize(model.trees.zipWithIndex).flatMap { case (tree, treeId) => tree.topNode.subtreeIterator.toSeq.map(node => NodeData(treeId, node)) - }.toDF + }.toDF() dataRDD.saveAsParquetFile(Loader.dataPath(path)) } http://git-wip-us.apache.org/repos/asf/spark/blob/2bd33ce6/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala index b118a8d..376c362 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala @@ -358,8 +358,8 @@ class ALSSuite extends FunSuite with MLlibTestSparkContext with Logging { .setNumUserBlocks(numUserBlocks) .setNumItemBlocks(numItemBlocks) val alpha = als.getAlpha - val model = als.fit(training.toDF) - val predictions = model.transform(test.toDF) + val model = als.fit(training.toDF()) + val predictions = model.transform(test.toDF()) .select("rating", "prediction") .map { case Row(rating: Float, prediction: Float) => (rating.toDouble, prediction.toDouble) http://git-wip-us.apache.org/repos/asf/spark/blob/2bd33ce6/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index fa5fe84..5007a5a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -124,7 +124,7 @@ trait DataFrame extends RDDApi[Row] with Serializable { * from a RDD of tuples into a [[DataFrame]] with meaningful names. For example: * {{{ * val rdd: RDD[(Int, String)] = ... - * rdd.toDF // this implicit conversion creates a DataFrame with column name _1 and _2 + * rdd.toDF() // this implicit conversion creates a DataFrame with column name _1 and _2 * rdd.toDF("id", "name") // this creates a DataFrame with column name "id" and "name" * }}} * @group basic http://git-wip-us.apache.org/repos/asf/spark/blob/2bd33ce6/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 709b350..db32fa8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -887,8 +887,8 @@ class SQLContext(@transient val sparkContext: SparkContext) * Registers the given [[DataFrame]] as a temporary table in the catalog. Temporary tables exist * only during the lifetime of this instance of SQLContext. */ - private[sql] def registerDataFrameAsTable(rdd: DataFrame, tableName: String): Unit = { - catalog.registerTable(Seq(tableName), rdd.logicalPlan) + private[sql] def registerDataFrameAsTable(df: DataFrame, tableName: String): Unit = { + catalog.registerTable(Seq(tableName), df.logicalPlan) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/2bd33ce6/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala index 052728c..0fa2fe9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala @@ -99,7 +99,7 @@ private[sql] trait ParquetTest { * Writes `data` to a Parquet file and reads it back as a [[DataFrame]], * which is then passed to `f`. The Parquet file will be deleted after `f` returns. */ - protected def withParquetRDD[T <: Product: ClassTag: TypeTag] + protected def withParquetDataFrame[T <: Product: ClassTag: TypeTag] (data: Seq[T]) (f: DataFrame => Unit): Unit = { withParquetFile(data)(path => f(sqlContext.parquetFile(path))) @@ -120,8 +120,8 @@ private[sql] trait ParquetTest { protected def withParquetTable[T <: Product: ClassTag: TypeTag] (data: Seq[T], tableName: String) (f: => Unit): Unit = { - withParquetRDD(data) { rdd => - sqlContext.registerDataFrameAsTable(rdd, tableName) + withParquetDataFrame(data) { df => + sqlContext.registerDataFrameAsTable(df, tableName) withTempTable(tableName)(f) } } http://git-wip-us.apache.org/repos/asf/spark/blob/2bd33ce6/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 691dae0..e70e866 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -18,17 +18,15 @@ package org.apache.spark.sql import scala.concurrent.duration._ -import scala.language.implicitConversions -import scala.language.postfixOps +import scala.language.{implicitConversions, postfixOps} import org.scalatest.concurrent.Eventually._ import org.apache.spark.sql.TestData._ import org.apache.spark.sql.columnar._ -import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.TestSQLContext._ import org.apache.spark.sql.test.TestSQLContext.implicits._ -import org.apache.spark.storage.{StorageLevel, RDDBlockId} +import org.apache.spark.storage.{RDDBlockId, StorageLevel} case class BigData(s: String) @@ -59,15 +57,15 @@ class CachedTableSuite extends QueryTest { test("unpersist an uncached table will not raise exception") { assert(None == cacheManager.lookupCachedData(testData)) - testData.unpersist(true) + testData.unpersist(blocking = true) assert(None == cacheManager.lookupCachedData(testData)) - testData.unpersist(false) + testData.unpersist(blocking = false) assert(None == cacheManager.lookupCachedData(testData)) testData.persist() assert(None != cacheManager.lookupCachedData(testData)) - testData.unpersist(true) + testData.unpersist(blocking = true) assert(None == cacheManager.lookupCachedData(testData)) - testData.unpersist(false) + testData.unpersist(blocking = false) assert(None == cacheManager.lookupCachedData(testData)) } http://git-wip-us.apache.org/repos/asf/spark/blob/2bd33ce6/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 0da619d..f31bc38 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql -import org.apache.spark.sql.TestData._ - import scala.language.postfixOps import org.apache.spark.sql.functions._ @@ -251,20 +249,20 @@ class DataFrameSuite extends QueryTest { Seq(Row(3,1), Row(3,2), Row(2,1), Row(2,2), Row(1,1), Row(1,2))) checkAnswer( - arrayData.toDF.orderBy('data.getItem(0).asc), - arrayData.toDF.collect().sortBy(_.getAs[Seq[Int]](0)(0)).toSeq) + arrayData.toDF().orderBy('data.getItem(0).asc), + arrayData.toDF().collect().sortBy(_.getAs[Seq[Int]](0)(0)).toSeq) checkAnswer( - arrayData.toDF.orderBy('data.getItem(0).desc), - arrayData.toDF.collect().sortBy(_.getAs[Seq[Int]](0)(0)).reverse.toSeq) + arrayData.toDF().orderBy('data.getItem(0).desc), + arrayData.toDF().collect().sortBy(_.getAs[Seq[Int]](0)(0)).reverse.toSeq) checkAnswer( - arrayData.toDF.orderBy('data.getItem(1).asc), - arrayData.toDF.collect().sortBy(_.getAs[Seq[Int]](0)(1)).toSeq) + arrayData.toDF().orderBy('data.getItem(1).asc), + arrayData.toDF().collect().sortBy(_.getAs[Seq[Int]](0)(1)).toSeq) checkAnswer( - arrayData.toDF.orderBy('data.getItem(1).desc), - arrayData.toDF.collect().sortBy(_.getAs[Seq[Int]](0)(1)).reverse.toSeq) + arrayData.toDF().orderBy('data.getItem(1).desc), + arrayData.toDF().collect().sortBy(_.getAs[Seq[Int]](0)(1)).reverse.toSeq) } test("limit") { @@ -273,11 +271,11 @@ class DataFrameSuite extends QueryTest { testData.take(10).toSeq) checkAnswer( - arrayData.toDF.limit(1), + arrayData.toDF().limit(1), arrayData.take(1).map(r => Row.fromSeq(r.productIterator.toSeq))) checkAnswer( - mapData.toDF.limit(1), + mapData.toDF().limit(1), mapData.take(1).map(r => Row.fromSeq(r.productIterator.toSeq))) } @@ -411,7 +409,7 @@ class DataFrameSuite extends QueryTest { } test("addColumn") { - val df = testData.toDF.withColumn("newCol", col("key") + 1) + val df = testData.toDF().withColumn("newCol", col("key") + 1) checkAnswer( df, testData.collect().map { case Row(key: Int, value: String) => @@ -421,7 +419,7 @@ class DataFrameSuite extends QueryTest { } test("renameColumn") { - val df = testData.toDF.withColumn("newCol", col("key") + 1) + val df = testData.toDF().withColumn("newCol", col("key") + 1) .withColumnRenamed("value", "valueRenamed") checkAnswer( df, http://git-wip-us.apache.org/repos/asf/spark/blob/2bd33ce6/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index fd73065..dd0948a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -40,8 +40,8 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach { } def assertJoin(sqlString: String, c: Class[_]): Any = { - val rdd = sql(sqlString) - val physical = rdd.queryExecution.sparkPlan + val df = sql(sqlString) + val physical = df.queryExecution.sparkPlan val operators = physical.collect { case j: ShuffledHashJoin => j case j: HashOuterJoin => j @@ -410,8 +410,8 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach { } test("left semi join") { - val rdd = sql("SELECT * FROM testData2 LEFT SEMI JOIN testData ON key = a") - checkAnswer(rdd, + val df = sql("SELECT * FROM testData2 LEFT SEMI JOIN testData ON key = a") + checkAnswer(df, Row(1, 1) :: Row(1, 2) :: Row(2, 1) :: http://git-wip-us.apache.org/repos/asf/spark/blob/2bd33ce6/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index dfb6858..9b4dd6c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -35,36 +35,36 @@ class QueryTest extends PlanTest { /** * Runs the plan and makes sure the answer contains all of the keywords, or the * none of keywords are listed in the answer - * @param rdd the [[DataFrame]] to be executed + * @param df the [[DataFrame]] to be executed * @param exists true for make sure the keywords are listed in the output, otherwise * to make sure none of the keyword are not listed in the output * @param keywords keyword in string array */ - def checkExistence(rdd: DataFrame, exists: Boolean, keywords: String*) { - val outputs = rdd.collect().map(_.mkString).mkString + def checkExistence(df: DataFrame, exists: Boolean, keywords: String*) { + val outputs = df.collect().map(_.mkString).mkString for (key <- keywords) { if (exists) { - assert(outputs.contains(key), s"Failed for $rdd ($key doens't exist in result)") + assert(outputs.contains(key), s"Failed for $df ($key doesn't exist in result)") } else { - assert(!outputs.contains(key), s"Failed for $rdd ($key existed in the result)") + assert(!outputs.contains(key), s"Failed for $df ($key existed in the result)") } } } /** * Runs the plan and makes sure the answer matches the expected result. - * @param rdd the [[DataFrame]] to be executed + * @param df the [[DataFrame]] to be executed * @param expectedAnswer the expected result in a [[Seq]] of [[Row]]s. */ - protected def checkAnswer(rdd: DataFrame, expectedAnswer: Seq[Row]): Unit = { - QueryTest.checkAnswer(rdd, expectedAnswer) match { + protected def checkAnswer(df: DataFrame, expectedAnswer: Seq[Row]): Unit = { + QueryTest.checkAnswer(df, expectedAnswer) match { case Some(errorMessage) => fail(errorMessage) case None => } } - protected def checkAnswer(rdd: DataFrame, expectedAnswer: Row): Unit = { - checkAnswer(rdd, Seq(expectedAnswer)) + protected def checkAnswer(df: DataFrame, expectedAnswer: Row): Unit = { + checkAnswer(df, Seq(expectedAnswer)) } def sqlTest(sqlString: String, expectedAnswer: Seq[Row])(implicit sqlContext: SQLContext): Unit = { @@ -95,11 +95,11 @@ object QueryTest { * If there was exception during the execution or the contents of the DataFrame does not * match the expected result, an error message will be returned. Otherwise, a [[None]] will * be returned. - * @param rdd the [[DataFrame]] to be executed + * @param df the [[DataFrame]] to be executed * @param expectedAnswer the expected result in a [[Seq]] of [[Row]]s. */ - def checkAnswer(rdd: DataFrame, expectedAnswer: Seq[Row]): Option[String] = { - val isSorted = rdd.logicalPlan.collect { case s: logical.Sort => s }.nonEmpty + def checkAnswer(df: DataFrame, expectedAnswer: Seq[Row]): Option[String] = { + val isSorted = df.logicalPlan.collect { case s: logical.Sort => s }.nonEmpty def prepareAnswer(answer: Seq[Row]): Seq[Row] = { // Converts data to types that we can do equality comparison using Scala collections. // For BigDecimal type, the Scala type has a better definition of equality test (similar to @@ -110,14 +110,14 @@ object QueryTest { case o => o }) } - if (!isSorted) converted.sortBy(_.toString) else converted + if (!isSorted) converted.sortBy(_.toString()) else converted } - val sparkAnswer = try rdd.collect().toSeq catch { + val sparkAnswer = try df.collect().toSeq catch { case e: Exception => val errorMessage = s""" |Exception thrown while executing query: - |${rdd.queryExecution} + |${df.queryExecution} |== Exception == |$e |${org.apache.spark.sql.catalyst.util.stackTraceToString(e)} @@ -129,17 +129,17 @@ object QueryTest { val errorMessage = s""" |Results do not match for query: - |${rdd.logicalPlan} + |${df.logicalPlan} |== Analyzed Plan == - |${rdd.queryExecution.analyzed} + |${df.queryExecution.analyzed} |== Physical Plan == - |${rdd.queryExecution.executedPlan} + |${df.queryExecution.executedPlan} |== Results == |${sideBySide( s"== Correct Answer - ${expectedAnswer.size} ==" +: - prepareAnswer(expectedAnswer).map(_.toString), + prepareAnswer(expectedAnswer).map(_.toString()), s"== Spark Answer - ${sparkAnswer.size} ==" +: - prepareAnswer(sparkAnswer).map(_.toString)).mkString("\n")} + prepareAnswer(sparkAnswer).map(_.toString())).mkString("\n")} """.stripMargin return Some(errorMessage) } @@ -147,8 +147,8 @@ object QueryTest { return None } - def checkAnswer(rdd: DataFrame, expectedAnswer: java.util.List[Row]): String = { - checkAnswer(rdd, expectedAnswer.toSeq) match { + def checkAnswer(df: DataFrame, expectedAnswer: java.util.List[Row]): String = { + checkAnswer(df, expectedAnswer.toSeq) match { case Some(errorMessage) => errorMessage case None => null } http://git-wip-us.apache.org/repos/asf/spark/blob/2bd33ce6/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 97684f7..097bf0d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1034,10 +1034,10 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { test("Supporting relational operator '<=>' in Spark SQL") { val nullCheckData1 = TestData(1,"1") :: TestData(2,null) :: Nil val rdd1 = sparkContext.parallelize((0 to 1).map(i => nullCheckData1(i))) - rdd1.toDF.registerTempTable("nulldata1") + rdd1.toDF().registerTempTable("nulldata1") val nullCheckData2 = TestData(1,"1") :: TestData(2,null) :: Nil val rdd2 = sparkContext.parallelize((0 to 1).map(i => nullCheckData2(i))) - rdd2.toDF.registerTempTable("nulldata2") + rdd2.toDF().registerTempTable("nulldata2") checkAnswer(sql("SELECT nulldata1.key FROM nulldata1 join " + "nulldata2 on nulldata1.value <=> nulldata2.value"), (1 to 2).map(i => Row(i))) @@ -1046,7 +1046,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { test("Multi-column COUNT(DISTINCT ...)") { val data = TestData(1,"val_1") :: TestData(2,"val_2") :: Nil val rdd = sparkContext.parallelize((0 to 1).map(i => data(i))) - rdd.toDF.registerTempTable("distinctData") + rdd.toDF().registerTempTable("distinctData") checkAnswer(sql("SELECT COUNT(DISTINCT key,value) FROM distinctData"), Row(2)) } } http://git-wip-us.apache.org/repos/asf/spark/blob/2bd33ce6/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala index 9a48f8d..23df6e7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala @@ -82,7 +82,7 @@ class ScalaReflectionRelationSuite extends FunSuite { val data = ReflectData("a", 1, 1L, 1.toFloat, 1.toDouble, 1.toShort, 1.toByte, true, new java.math.BigDecimal(1), new Date(12345), new Timestamp(12345), Seq(1,2,3)) val rdd = sparkContext.parallelize(data :: Nil) - rdd.toDF.registerTempTable("reflectData") + rdd.toDF().registerTempTable("reflectData") assert(sql("SELECT * FROM reflectData").collect().head === Row("a", 1, 1L, 1.toFloat, 1.toDouble, 1.toShort, 1.toByte, true, @@ -93,7 +93,7 @@ class ScalaReflectionRelationSuite extends FunSuite { test("query case class RDD with nulls") { val data = NullReflectData(null, null, null, null, null, null, null) val rdd = sparkContext.parallelize(data :: Nil) - rdd.toDF.registerTempTable("reflectNullData") + rdd.toDF().registerTempTable("reflectNullData") assert(sql("SELECT * FROM reflectNullData").collect().head === Row.fromSeq(Seq.fill(7)(null))) } @@ -101,7 +101,7 @@ class ScalaReflectionRelationSuite extends FunSuite { test("query case class RDD with Nones") { val data = OptionalReflectData(None, None, None, None, None, None, None) val rdd = sparkContext.parallelize(data :: Nil) - rdd.toDF.registerTempTable("reflectOptionalData") + rdd.toDF().registerTempTable("reflectOptionalData") assert(sql("SELECT * FROM reflectOptionalData").collect().head === Row.fromSeq(Seq.fill(7)(null))) } @@ -109,7 +109,7 @@ class ScalaReflectionRelationSuite extends FunSuite { // Equality is broken for Arrays, so we test that separately. test("query binary data") { val rdd = sparkContext.parallelize(ReflectBinary(Array[Byte](1)) :: Nil) - rdd.toDF.registerTempTable("reflectBinary") + rdd.toDF().registerTempTable("reflectBinary") val result = sql("SELECT data FROM reflectBinary").collect().head(0).asInstanceOf[Array[Byte]] assert(result.toSeq === Seq[Byte](1)) @@ -128,7 +128,7 @@ class ScalaReflectionRelationSuite extends FunSuite { Map(10 -> Some(100L), 20 -> Some(200L), 30 -> None), Nested(None, "abc"))) val rdd = sparkContext.parallelize(data :: Nil) - rdd.toDF.registerTempTable("reflectComplexData") + rdd.toDF().registerTempTable("reflectComplexData") assert(sql("SELECT * FROM reflectComplexData").collect().head === new GenericRow(Array[Any]( http://git-wip-us.apache.org/repos/asf/spark/blob/2bd33ce6/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala index c511eb1..637f59b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala @@ -29,11 +29,11 @@ case class TestData(key: Int, value: String) object TestData { val testData = TestSQLContext.sparkContext.parallelize( - (1 to 100).map(i => TestData(i, i.toString))).toDF + (1 to 100).map(i => TestData(i, i.toString))).toDF() testData.registerTempTable("testData") val negativeData = TestSQLContext.sparkContext.parallelize( - (1 to 100).map(i => TestData(-i, (-i).toString))).toDF + (1 to 100).map(i => TestData(-i, (-i).toString))).toDF() negativeData.registerTempTable("negativeData") case class LargeAndSmallInts(a: Int, b: Int) @@ -44,7 +44,7 @@ object TestData { LargeAndSmallInts(2147483645, 1) :: LargeAndSmallInts(2, 2) :: LargeAndSmallInts(2147483646, 1) :: - LargeAndSmallInts(3, 2) :: Nil).toDF + LargeAndSmallInts(3, 2) :: Nil).toDF() largeAndSmallInts.registerTempTable("largeAndSmallInts") case class TestData2(a: Int, b: Int) @@ -55,7 +55,7 @@ object TestData { TestData2(2, 1) :: TestData2(2, 2) :: TestData2(3, 1) :: - TestData2(3, 2) :: Nil, 2).toDF + TestData2(3, 2) :: Nil, 2).toDF() testData2.registerTempTable("testData2") case class DecimalData(a: BigDecimal, b: BigDecimal) @@ -67,7 +67,7 @@ object TestData { DecimalData(2, 1) :: DecimalData(2, 2) :: DecimalData(3, 1) :: - DecimalData(3, 2) :: Nil).toDF + DecimalData(3, 2) :: Nil).toDF() decimalData.registerTempTable("decimalData") case class BinaryData(a: Array[Byte], b: Int) @@ -77,14 +77,14 @@ object TestData { BinaryData("22".getBytes(), 5) :: BinaryData("122".getBytes(), 3) :: BinaryData("121".getBytes(), 2) :: - BinaryData("123".getBytes(), 4) :: Nil).toDF + BinaryData("123".getBytes(), 4) :: Nil).toDF() binaryData.registerTempTable("binaryData") case class TestData3(a: Int, b: Option[Int]) val testData3 = TestSQLContext.sparkContext.parallelize( TestData3(1, None) :: - TestData3(2, Some(2)) :: Nil).toDF + TestData3(2, Some(2)) :: Nil).toDF() testData3.registerTempTable("testData3") val emptyTableData = logical.LocalRelation($"a".int, $"b".int) @@ -97,7 +97,7 @@ object TestData { UpperCaseData(3, "C") :: UpperCaseData(4, "D") :: UpperCaseData(5, "E") :: - UpperCaseData(6, "F") :: Nil).toDF + UpperCaseData(6, "F") :: Nil).toDF() upperCaseData.registerTempTable("upperCaseData") case class LowerCaseData(n: Int, l: String) @@ -106,7 +106,7 @@ object TestData { LowerCaseData(1, "a") :: LowerCaseData(2, "b") :: LowerCaseData(3, "c") :: - LowerCaseData(4, "d") :: Nil).toDF + LowerCaseData(4, "d") :: Nil).toDF() lowerCaseData.registerTempTable("lowerCaseData") case class ArrayData(data: Seq[Int], nestedData: Seq[Seq[Int]]) @@ -114,7 +114,7 @@ object TestData { TestSQLContext.sparkContext.parallelize( ArrayData(Seq(1,2,3), Seq(Seq(1,2,3))) :: ArrayData(Seq(2,3,4), Seq(Seq(2,3,4))) :: Nil) - arrayData.toDF.registerTempTable("arrayData") + arrayData.toDF().registerTempTable("arrayData") case class MapData(data: scala.collection.Map[Int, String]) val mapData = @@ -124,18 +124,18 @@ object TestData { MapData(Map(1 -> "a3", 2 -> "b3", 3 -> "c3")) :: MapData(Map(1 -> "a4", 2 -> "b4")) :: MapData(Map(1 -> "a5")) :: Nil) - mapData.toDF.registerTempTable("mapData") + mapData.toDF().registerTempTable("mapData") case class StringData(s: String) val repeatedData = TestSQLContext.sparkContext.parallelize(List.fill(2)(StringData("test"))) - repeatedData.toDF.registerTempTable("repeatedData") + repeatedData.toDF().registerTempTable("repeatedData") val nullableRepeatedData = TestSQLContext.sparkContext.parallelize( List.fill(2)(StringData(null)) ++ List.fill(2)(StringData("test"))) - nullableRepeatedData.toDF.registerTempTable("nullableRepeatedData") + nullableRepeatedData.toDF().registerTempTable("nullableRepeatedData") case class NullInts(a: Integer) val nullInts = @@ -144,7 +144,7 @@ object TestData { NullInts(2) :: NullInts(3) :: NullInts(null) :: Nil - ).toDF + ).toDF() nullInts.registerTempTable("nullInts") val allNulls = @@ -152,7 +152,7 @@ object TestData { NullInts(null) :: NullInts(null) :: NullInts(null) :: - NullInts(null) :: Nil).toDF + NullInts(null) :: Nil).toDF() allNulls.registerTempTable("allNulls") case class NullStrings(n: Int, s: String) @@ -160,11 +160,15 @@ object TestData { TestSQLContext.sparkContext.parallelize( NullStrings(1, "abc") :: NullStrings(2, "ABC") :: - NullStrings(3, null) :: Nil).toDF + NullStrings(3, null) :: Nil).toDF() nullStrings.registerTempTable("nullStrings") case class TableName(tableName: String) - TestSQLContext.sparkContext.parallelize(TableName("test") :: Nil).toDF.registerTempTable("tableName") + TestSQLContext + .sparkContext + .parallelize(TableName("test") :: Nil) + .toDF() + .registerTempTable("tableName") val unparsedStrings = TestSQLContext.sparkContext.parallelize( @@ -177,22 +181,22 @@ object TestData { val timestamps = TestSQLContext.sparkContext.parallelize((1 to 3).map { i => TimestampField(new Timestamp(i)) }) - timestamps.toDF.registerTempTable("timestamps") + timestamps.toDF().registerTempTable("timestamps") case class IntField(i: Int) // An RDD with 4 elements and 8 partitions val withEmptyParts = TestSQLContext.sparkContext.parallelize((1 to 4).map(IntField), 8) - withEmptyParts.toDF.registerTempTable("withEmptyParts") + withEmptyParts.toDF().registerTempTable("withEmptyParts") case class Person(id: Int, name: String, age: Int) case class Salary(personId: Int, salary: Double) val person = TestSQLContext.sparkContext.parallelize( Person(0, "mike", 30) :: - Person(1, "jim", 20) :: Nil).toDF + Person(1, "jim", 20) :: Nil).toDF() person.registerTempTable("person") val salary = TestSQLContext.sparkContext.parallelize( Salary(0, 2000.0) :: - Salary(1, 1000.0) :: Nil).toDF + Salary(1, 1000.0) :: Nil).toDF() salary.registerTempTable("salary") case class ComplexData(m: Map[Int, String], s: TestData, a: Seq[Int], b: Boolean) @@ -200,6 +204,6 @@ object TestData { TestSQLContext.sparkContext.parallelize( ComplexData(Map(1 -> "1"), TestData(1, "1"), Seq(1), true) :: ComplexData(Map(2 -> "2"), TestData(2, "2"), Seq(2), false) - :: Nil).toDF + :: Nil).toDF() complexData.registerTempTable("complexData") } http://git-wip-us.apache.org/repos/asf/spark/blob/2bd33ce6/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index ad2fbc3..ee5c762 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -17,13 +17,13 @@ package org.apache.spark.sql.jdbc -import java.math.BigDecimal +import java.sql.DriverManager + +import org.scalatest.{BeforeAndAfter, FunSuite} + import org.apache.spark.sql.Row -import org.apache.spark.sql.types._ import org.apache.spark.sql.test._ -import org.scalatest.{FunSuite, BeforeAndAfter} -import java.sql.DriverManager -import TestSQLContext._ +import org.apache.spark.sql.types._ class JDBCWriteSuite extends FunSuite with BeforeAndAfter { val url = "jdbc:h2:mem:testdb2" @@ -54,53 +54,53 @@ class JDBCWriteSuite extends FunSuite with BeforeAndAfter { StructField("seq", IntegerType) :: Nil) test("Basic CREATE") { - val srdd = TestSQLContext.createDataFrame(sc.parallelize(arr2x2), schema2) + val df = TestSQLContext.createDataFrame(sc.parallelize(arr2x2), schema2) - srdd.createJDBCTable(url, "TEST.BASICCREATETEST", false) + df.createJDBCTable(url, "TEST.BASICCREATETEST", false) assert(2 == TestSQLContext.jdbc(url, "TEST.BASICCREATETEST").count) assert(2 == TestSQLContext.jdbc(url, "TEST.BASICCREATETEST").collect()(0).length) } test("CREATE with overwrite") { - val srdd = TestSQLContext.createDataFrame(sc.parallelize(arr2x3), schema3) - val srdd2 = TestSQLContext.createDataFrame(sc.parallelize(arr1x2), schema2) + val df = TestSQLContext.createDataFrame(sc.parallelize(arr2x3), schema3) + val df2 = TestSQLContext.createDataFrame(sc.parallelize(arr1x2), schema2) - srdd.createJDBCTable(url, "TEST.DROPTEST", false) + df.createJDBCTable(url, "TEST.DROPTEST", false) assert(2 == TestSQLContext.jdbc(url, "TEST.DROPTEST").count) assert(3 == TestSQLContext.jdbc(url, "TEST.DROPTEST").collect()(0).length) - srdd2.createJDBCTable(url, "TEST.DROPTEST", true) + df2.createJDBCTable(url, "TEST.DROPTEST", true) assert(1 == TestSQLContext.jdbc(url, "TEST.DROPTEST").count) assert(2 == TestSQLContext.jdbc(url, "TEST.DROPTEST").collect()(0).length) } test("CREATE then INSERT to append") { - val srdd = TestSQLContext.createDataFrame(sc.parallelize(arr2x2), schema2) - val srdd2 = TestSQLContext.createDataFrame(sc.parallelize(arr1x2), schema2) + val df = TestSQLContext.createDataFrame(sc.parallelize(arr2x2), schema2) + val df2 = TestSQLContext.createDataFrame(sc.parallelize(arr1x2), schema2) - srdd.createJDBCTable(url, "TEST.APPENDTEST", false) - srdd2.insertIntoJDBC(url, "TEST.APPENDTEST", false) + df.createJDBCTable(url, "TEST.APPENDTEST", false) + df2.insertIntoJDBC(url, "TEST.APPENDTEST", false) assert(3 == TestSQLContext.jdbc(url, "TEST.APPENDTEST").count) assert(2 == TestSQLContext.jdbc(url, "TEST.APPENDTEST").collect()(0).length) } test("CREATE then INSERT to truncate") { - val srdd = TestSQLContext.createDataFrame(sc.parallelize(arr2x2), schema2) - val srdd2 = TestSQLContext.createDataFrame(sc.parallelize(arr1x2), schema2) + val df = TestSQLContext.createDataFrame(sc.parallelize(arr2x2), schema2) + val df2 = TestSQLContext.createDataFrame(sc.parallelize(arr1x2), schema2) - srdd.createJDBCTable(url, "TEST.TRUNCATETEST", false) - srdd2.insertIntoJDBC(url, "TEST.TRUNCATETEST", true) + df.createJDBCTable(url, "TEST.TRUNCATETEST", false) + df2.insertIntoJDBC(url, "TEST.TRUNCATETEST", true) assert(1 == TestSQLContext.jdbc(url, "TEST.TRUNCATETEST").count) assert(2 == TestSQLContext.jdbc(url, "TEST.TRUNCATETEST").collect()(0).length) } test("Incompatible INSERT to append") { - val srdd = TestSQLContext.createDataFrame(sc.parallelize(arr2x2), schema2) - val srdd2 = TestSQLContext.createDataFrame(sc.parallelize(arr2x3), schema3) + val df = TestSQLContext.createDataFrame(sc.parallelize(arr2x2), schema2) + val df2 = TestSQLContext.createDataFrame(sc.parallelize(arr2x3), schema3) - srdd.createJDBCTable(url, "TEST.INCOMPATIBLETEST", false) + df.createJDBCTable(url, "TEST.INCOMPATIBLETEST", false) intercept[org.apache.spark.SparkException] { - srdd2.insertIntoJDBC(url, "TEST.INCOMPATIBLETEST", true) + df2.insertIntoJDBC(url, "TEST.INCOMPATIBLETEST", true) } } http://git-wip-us.apache.org/repos/asf/spark/blob/2bd33ce6/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegration.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegration.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegration.scala index 4f38110..5b8a76f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegration.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegration.scala @@ -18,18 +18,13 @@ package org.apache.spark.sql.jdbc import java.math.BigDecimal -import java.sql.{Date, DriverManager, Timestamp} -import com.spotify.docker.client.{DefaultDockerClient, DockerClient} +import java.sql.{Date, Timestamp} + +import com.spotify.docker.client.DockerClient import com.spotify.docker.client.messages.ContainerConfig -import org.scalatest.{FunSuite, BeforeAndAfterAll, Ignore} +import org.scalatest.{BeforeAndAfterAll, FunSuite, Ignore} -import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.SparkContext._ -import org.apache.spark.sql._ import org.apache.spark.sql.test._ -import TestSQLContext._ - -import org.apache.spark.sql.jdbc._ class MySQLDatabase { val docker: DockerClient = DockerClientFactory.get() @@ -37,9 +32,9 @@ class MySQLDatabase { println("Pulling mysql") docker.pull("mysql") println("Configuring container") - val config = (ContainerConfig.builder().image("mysql") - .env("MYSQL_ROOT_PASSWORD=rootpass") - .build()) + val config = ContainerConfig.builder().image("mysql") + .env("MYSQL_ROOT_PASSWORD=rootpass") + .build() println("Creating container") val id = docker.createContainer(config).id println("Starting container " + id) @@ -57,11 +52,10 @@ class MySQLDatabase { println("Closing docker client") DockerClientFactory.close(docker) } catch { - case e: Exception => { + case e: Exception => println(e) println("You may need to clean this up manually.") throw e - } } } } @@ -86,10 +80,9 @@ class MySQLDatabase { println("Database is up.") return; } catch { - case e: java.sql.SQLException => { + case e: java.sql.SQLException => lastException = e java.lang.Thread.sleep(250) - } } } } @@ -143,8 +136,8 @@ class MySQLDatabase { } test("Basic test") { - val rdd = TestSQLContext.jdbc(url(ip, "foo"), "tbl") - val rows = rdd.collect + val df = TestSQLContext.jdbc(url(ip, "foo"), "tbl") + val rows = df.collect() assert(rows.length == 2) val types = rows(0).toSeq.map(x => x.getClass.toString) assert(types.length == 2) @@ -153,8 +146,8 @@ class MySQLDatabase { } test("Numeric types") { - val rdd = TestSQLContext.jdbc(url(ip, "foo"), "numbers") - val rows = rdd.collect + val df = TestSQLContext.jdbc(url(ip, "foo"), "numbers") + val rows = df.collect() assert(rows.length == 1) val types = rows(0).toSeq.map(x => x.getClass.toString) assert(types.length == 9) @@ -181,8 +174,8 @@ class MySQLDatabase { } test("Date types") { - val rdd = TestSQLContext.jdbc(url(ip, "foo"), "dates") - val rows = rdd.collect + val df = TestSQLContext.jdbc(url(ip, "foo"), "dates") + val rows = df.collect() assert(rows.length == 1) val types = rows(0).toSeq.map(x => x.getClass.toString) assert(types.length == 5) @@ -199,8 +192,8 @@ class MySQLDatabase { } test("String types") { - val rdd = TestSQLContext.jdbc(url(ip, "foo"), "strings") - val rows = rdd.collect + val df = TestSQLContext.jdbc(url(ip, "foo"), "strings") + val rows = df.collect() assert(rows.length == 1) val types = rows(0).toSeq.map(x => x.getClass.toString) assert(types.length == 9) @@ -225,11 +218,11 @@ class MySQLDatabase { } test("Basic write test") { - val rdd1 = TestSQLContext.jdbc(url(ip, "foo"), "numbers") - val rdd2 = TestSQLContext.jdbc(url(ip, "foo"), "dates") - val rdd3 = TestSQLContext.jdbc(url(ip, "foo"), "strings") - rdd1.createJDBCTable(url(ip, "foo"), "numberscopy", false) - rdd2.createJDBCTable(url(ip, "foo"), "datescopy", false) - rdd3.createJDBCTable(url(ip, "foo"), "stringscopy", false) + val df1 = TestSQLContext.jdbc(url(ip, "foo"), "numbers") + val df2 = TestSQLContext.jdbc(url(ip, "foo"), "dates") + val df3 = TestSQLContext.jdbc(url(ip, "foo"), "strings") + df1.createJDBCTable(url(ip, "foo"), "numberscopy", false) + df2.createJDBCTable(url(ip, "foo"), "datescopy", false) + df3.createJDBCTable(url(ip, "foo"), "stringscopy", false) } } http://git-wip-us.apache.org/repos/asf/spark/blob/2bd33ce6/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegration.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegration.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegration.scala index 7b47fee..e17be99 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegration.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegration.scala @@ -17,13 +17,13 @@ package org.apache.spark.sql.jdbc -import java.math.BigDecimal -import org.apache.spark.sql.test._ -import org.scalatest.{FunSuite, BeforeAndAfterAll, Ignore} import java.sql.DriverManager -import TestSQLContext._ -import com.spotify.docker.client.{DefaultDockerClient, DockerClient} + +import com.spotify.docker.client.DockerClient import com.spotify.docker.client.messages.ContainerConfig +import org.scalatest.{BeforeAndAfterAll, FunSuite, Ignore} + +import org.apache.spark.sql.test._ class PostgresDatabase { val docker: DockerClient = DockerClientFactory.get() @@ -31,9 +31,9 @@ class PostgresDatabase { println("Pulling postgres") docker.pull("postgres") println("Configuring container") - val config = (ContainerConfig.builder().image("postgres") - .env("POSTGRES_PASSWORD=rootpass") - .build()) + val config = ContainerConfig.builder().image("postgres") + .env("POSTGRES_PASSWORD=rootpass") + .build() println("Creating container") val id = docker.createContainer(config).id println("Starting container " + id) @@ -51,11 +51,10 @@ class PostgresDatabase { println("Closing docker client") DockerClientFactory.close(docker) } catch { - case e: Exception => { + case e: Exception => println(e) println("You may need to clean this up manually.") throw e - } } } } @@ -79,10 +78,9 @@ class PostgresDatabase { println("Database is up.") return; } catch { - case e: java.sql.SQLException => { + case e: java.sql.SQLException => lastException = e java.lang.Thread.sleep(250) - } } } } @@ -113,8 +111,8 @@ class PostgresDatabase { } test("Type mapping for various types") { - val rdd = TestSQLContext.jdbc(url(db.ip), "public.bar") - val rows = rdd.collect + val df = TestSQLContext.jdbc(url(db.ip), "public.bar") + val rows = df.collect() assert(rows.length == 1) val types = rows(0).toSeq.map(x => x.getClass.toString) assert(types.length == 10) @@ -142,8 +140,8 @@ class PostgresDatabase { } test("Basic write test") { - val rdd = TestSQLContext.jdbc(url(db.ip), "public.bar") - rdd.createJDBCTable(url(db.ip), "public.barcopy", false) + val df = TestSQLContext.jdbc(url(db.ip), "public.bar") + df.createJDBCTable(url(db.ip), "public.barcopy", false) // Test only that it doesn't bomb out. } } http://git-wip-us.apache.org/repos/asf/spark/blob/2bd33ce6/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala index eb2d5f2..4d32e84 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala @@ -45,7 +45,7 @@ class ParquetFilterSuiteBase extends QueryTest with ParquetTest { val sqlContext = TestSQLContext private def checkFilterPredicate( - rdd: DataFrame, + df: DataFrame, predicate: Predicate, filterClass: Class[_ <: FilterPredicate], checker: (DataFrame, Seq[Row]) => Unit, @@ -53,7 +53,7 @@ class ParquetFilterSuiteBase extends QueryTest with ParquetTest { val output = predicate.collect { case a: Attribute => a }.distinct withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED -> "true") { - val query = rdd + val query = df .select(output.map(e => Column(e)): _*) .where(Column(predicate)) @@ -85,36 +85,36 @@ class ParquetFilterSuiteBase extends QueryTest with ParquetTest { private def checkFilterPredicate (predicate: Predicate, filterClass: Class[_ <: FilterPredicate], expected: Seq[Row]) - (implicit rdd: DataFrame): Unit = { - checkFilterPredicate(rdd, predicate, filterClass, checkAnswer(_, _: Seq[Row]), expected) + (implicit df: DataFrame): Unit = { + checkFilterPredicate(df, predicate, filterClass, checkAnswer(_, _: Seq[Row]), expected) } private def checkFilterPredicate[T] (predicate: Predicate, filterClass: Class[_ <: FilterPredicate], expected: T) - (implicit rdd: DataFrame): Unit = { - checkFilterPredicate(predicate, filterClass, Seq(Row(expected)))(rdd) + (implicit df: DataFrame): Unit = { + checkFilterPredicate(predicate, filterClass, Seq(Row(expected)))(df) } private def checkBinaryFilterPredicate (predicate: Predicate, filterClass: Class[_ <: FilterPredicate], expected: Seq[Row]) - (implicit rdd: DataFrame): Unit = { - def checkBinaryAnswer(rdd: DataFrame, expected: Seq[Row]) = { + (implicit df: DataFrame): Unit = { + def checkBinaryAnswer(df: DataFrame, expected: Seq[Row]) = { assertResult(expected.map(_.getAs[Array[Byte]](0).mkString(",")).toSeq.sorted) { - rdd.map(_.getAs[Array[Byte]](0).mkString(",")).collect().toSeq.sorted + df.map(_.getAs[Array[Byte]](0).mkString(",")).collect().toSeq.sorted } } - checkFilterPredicate(rdd, predicate, filterClass, checkBinaryAnswer _, expected) + checkFilterPredicate(df, predicate, filterClass, checkBinaryAnswer _, expected) } private def checkBinaryFilterPredicate (predicate: Predicate, filterClass: Class[_ <: FilterPredicate], expected: Array[Byte]) - (implicit rdd: DataFrame): Unit = { - checkBinaryFilterPredicate(predicate, filterClass, Seq(Row(expected)))(rdd) + (implicit df: DataFrame): Unit = { + checkBinaryFilterPredicate(predicate, filterClass, Seq(Row(expected)))(df) } test("filter pushdown - boolean") { - withParquetRDD((true :: false :: Nil).map(b => Tuple1.apply(Option(b)))) { implicit rdd => + withParquetDataFrame((true :: false :: Nil).map(b => Tuple1.apply(Option(b)))) { implicit df => checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row]) checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], Seq(Row(true), Row(false))) @@ -124,7 +124,7 @@ class ParquetFilterSuiteBase extends QueryTest with ParquetTest { } test("filter pushdown - short") { - withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toShort)))) { implicit rdd => + withParquetDataFrame((1 to 4).map(i => Tuple1(Option(i.toShort)))) { implicit df => checkFilterPredicate(Cast('_1, IntegerType) === 1, classOf[Eq[_]], 1) checkFilterPredicate( Cast('_1, IntegerType) !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_))) @@ -151,7 +151,7 @@ class ParquetFilterSuiteBase extends QueryTest with ParquetTest { } test("filter pushdown - integer") { - withParquetRDD((1 to 4).map(i => Tuple1(Option(i)))) { implicit rdd => + withParquetDataFrame((1 to 4).map(i => Tuple1(Option(i)))) { implicit df => checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row]) checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_))) @@ -176,7 +176,7 @@ class ParquetFilterSuiteBase extends QueryTest with ParquetTest { } test("filter pushdown - long") { - withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toLong)))) { implicit rdd => + withParquetDataFrame((1 to 4).map(i => Tuple1(Option(i.toLong)))) { implicit df => checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row]) checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_))) @@ -201,7 +201,7 @@ class ParquetFilterSuiteBase extends QueryTest with ParquetTest { } test("filter pushdown - float") { - withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toFloat)))) { implicit rdd => + withParquetDataFrame((1 to 4).map(i => Tuple1(Option(i.toFloat)))) { implicit df => checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row]) checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_))) @@ -226,7 +226,7 @@ class ParquetFilterSuiteBase extends QueryTest with ParquetTest { } test("filter pushdown - double") { - withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toDouble)))) { implicit rdd => + withParquetDataFrame((1 to 4).map(i => Tuple1(Option(i.toDouble)))) { implicit df => checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row]) checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_))) @@ -251,7 +251,7 @@ class ParquetFilterSuiteBase extends QueryTest with ParquetTest { } test("filter pushdown - string") { - withParquetRDD((1 to 4).map(i => Tuple1(i.toString))) { implicit rdd => + withParquetDataFrame((1 to 4).map(i => Tuple1(i.toString))) { implicit df => checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row]) checkFilterPredicate( '_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(i => Row.apply(i.toString))) @@ -282,7 +282,7 @@ class ParquetFilterSuiteBase extends QueryTest with ParquetTest { def b: Array[Byte] = int.toString.getBytes("UTF-8") } - withParquetRDD((1 to 4).map(i => Tuple1(i.b))) { implicit rdd => + withParquetDataFrame((1 to 4).map(i => Tuple1(i.b))) { implicit df => checkBinaryFilterPredicate('_1 === 1.b, classOf[Eq[_]], 1.b) checkBinaryFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row]) http://git-wip-us.apache.org/repos/asf/spark/blob/2bd33ce6/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala index 208f357..36f3406 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala @@ -73,7 +73,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { * Writes `data` to a Parquet file, reads it back and check file contents. */ protected def checkParquetFile[T <: Product : ClassTag: TypeTag](data: Seq[T]): Unit = { - withParquetRDD(data)(r => checkAnswer(r, data.map(Row.fromTuple))) + withParquetDataFrame(data)(r => checkAnswer(r, data.map(Row.fromTuple))) } test("basic data types (without binary)") { @@ -85,9 +85,9 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { test("raw binary") { val data = (1 to 4).map(i => Tuple1(Array.fill(3)(i.toByte))) - withParquetRDD(data) { rdd => + withParquetDataFrame(data) { df => assertResult(data.map(_._1.mkString(",")).sorted) { - rdd.collect().map(_.getAs[Array[Byte]](0).mkString(",")).sorted + df.collect().map(_.getAs[Array[Byte]](0).mkString(",")).sorted } } } @@ -106,7 +106,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { sparkContext .parallelize(0 to 1000) .map(i => Tuple1(i / 100.0)) - .toDF + .toDF() // Parquet doesn't allow column names with spaces, have to add an alias here .select($"_1" cast decimal as "dec") @@ -147,9 +147,9 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { test("struct") { val data = (1 to 4).map(i => Tuple1((i, s"val_$i"))) - withParquetRDD(data) { rdd => + withParquetDataFrame(data) { df => // Structs are converted to `Row`s - checkAnswer(rdd, data.map { case Tuple1(struct) => + checkAnswer(df, data.map { case Tuple1(struct) => Row(Row(struct.productIterator.toSeq: _*)) }) } @@ -157,9 +157,9 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { test("nested struct with array of array as field") { val data = (1 to 4).map(i => Tuple1((i, Seq(Seq(s"val_$i"))))) - withParquetRDD(data) { rdd => + withParquetDataFrame(data) { df => // Structs are converted to `Row`s - checkAnswer(rdd, data.map { case Tuple1(struct) => + checkAnswer(df, data.map { case Tuple1(struct) => Row(Row(struct.productIterator.toSeq: _*)) }) } @@ -167,8 +167,8 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { test("nested map with struct as value type") { val data = (1 to 4).map(i => Tuple1(Map(i -> (i, s"val_$i")))) - withParquetRDD(data) { rdd => - checkAnswer(rdd, data.map { case Tuple1(m) => + withParquetDataFrame(data) { df => + checkAnswer(df, data.map { case Tuple1(m) => Row(m.mapValues(struct => Row(struct.productIterator.toSeq: _*))) }) } @@ -182,8 +182,8 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { null.asInstanceOf[java.lang.Float], null.asInstanceOf[java.lang.Double]) - withParquetRDD(allNulls :: Nil) { rdd => - val rows = rdd.collect() + withParquetDataFrame(allNulls :: Nil) { df => + val rows = df.collect() assert(rows.size === 1) assert(rows.head === Row(Seq.fill(5)(null): _*)) } @@ -195,8 +195,8 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { None.asInstanceOf[Option[Long]], None.asInstanceOf[Option[String]]) - withParquetRDD(allNones :: Nil) { rdd => - val rows = rdd.collect() + withParquetDataFrame(allNones :: Nil) { df => + val rows = df.collect() assert(rows.size === 1) assert(rows.head === Row(Seq.fill(3)(null): _*)) } http://git-wip-us.apache.org/repos/asf/spark/blob/2bd33ce6/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index 8b4d05e..b98ba09 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -68,7 +68,7 @@ class ParquetQuerySuiteBase extends QueryTest with ParquetTest { val selfJoin = sql("SELECT * FROM t x JOIN t y WHERE x._1 = y._1") val queryOutput = selfJoin.queryExecution.analyzed.output - assertResult(4, "Field count mismatche")(queryOutput.size) + assertResult(4, "Field count mismatches")(queryOutput.size) assertResult(2, "Duplicated expression ID in query plan:\n $selfJoin") { queryOutput.filter(_.name == "_1").map(_.exprId).size } http://git-wip-us.apache.org/repos/asf/spark/blob/2bd33ce6/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala index 2e6c2d5..ad880e2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala @@ -36,8 +36,8 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest { private def testSchema[T <: Product: ClassTag: TypeTag]( testName: String, messageType: String, isThriftDerived: Boolean = false): Unit = { test(testName) { - val actual = ParquetTypesConverter.convertFromAttributes(ScalaReflection.attributesFor[T], - isThriftDerived) + val actual = ParquetTypesConverter.convertFromAttributes( + ScalaReflection.attributesFor[T], isThriftDerived) val expected = MessageTypeParser.parseMessageType(messageType) actual.checkContains(expected) expected.checkContains(actual) http://git-wip-us.apache.org/repos/asf/spark/blob/2bd33ce6/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index 9fcb04c..d4b175f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -37,7 +37,7 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter { import org.apache.spark.sql.hive.test.TestHive.implicits._ val testData = TestHive.sparkContext.parallelize( - (1 to 100).map(i => TestData(i, i.toString))).toDF + (1 to 100).map(i => TestData(i, i.toString))).toDF() before { // Since every we are doing tests for DDL statements, @@ -65,7 +65,7 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter { // Make sure the table has been updated. checkAnswer( sql("SELECT * FROM createAndInsertTest"), - testData.toDF.collect().toSeq ++ testData.toDF.collect().toSeq + testData.toDF().collect().toSeq ++ testData.toDF().collect().toSeq ) // Now overwrite. http://git-wip-us.apache.org/repos/asf/spark/blob/2bd33ce6/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index e5156ae..0bd8277 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -154,7 +154,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { test("check change without refresh") { val tempDir = File.createTempFile("sparksql", "json") tempDir.delete() - sparkContext.parallelize(("a", "b") :: Nil).toDF + sparkContext.parallelize(("a", "b") :: Nil).toDF() .toJSON.saveAsTextFile(tempDir.getCanonicalPath) sql( @@ -171,7 +171,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { Row("a", "b")) FileUtils.deleteDirectory(tempDir) - sparkContext.parallelize(("a1", "b1", "c1") :: Nil).toDF + sparkContext.parallelize(("a1", "b1", "c1") :: Nil).toDF() .toJSON.saveAsTextFile(tempDir.getCanonicalPath) // Schema is cached so the new column does not show. The updated values in existing columns @@ -192,7 +192,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { test("drop, change, recreate") { val tempDir = File.createTempFile("sparksql", "json") tempDir.delete() - sparkContext.parallelize(("a", "b") :: Nil).toDF + sparkContext.parallelize(("a", "b") :: Nil).toDF() .toJSON.saveAsTextFile(tempDir.getCanonicalPath) sql( @@ -209,7 +209,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { Row("a", "b")) FileUtils.deleteDirectory(tempDir) - sparkContext.parallelize(("a", "b", "c") :: Nil).toDF + sparkContext.parallelize(("a", "b", "c") :: Nil).toDF() .toJSON.saveAsTextFile(tempDir.getCanonicalPath) sql("DROP TABLE jsonTable") http://git-wip-us.apache.org/repos/asf/spark/blob/2bd33ce6/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 6f07fd5..1e05a02 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -127,11 +127,11 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { } test("estimates the size of a test MetastoreRelation") { - val rdd = sql("""SELECT * FROM src""") - val sizes = rdd.queryExecution.analyzed.collect { case mr: MetastoreRelation => + val df = sql("""SELECT * FROM src""") + val sizes = df.queryExecution.analyzed.collect { case mr: MetastoreRelation => mr.statistics.sizeInBytes } - assert(sizes.size === 1, s"Size wrong for:\n ${rdd.queryExecution}") + assert(sizes.size === 1, s"Size wrong for:\n ${df.queryExecution}") assert(sizes(0).equals(BigInt(5812)), s"expected exact size 5812 for test table 'src', got: ${sizes(0)}") } @@ -145,10 +145,10 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { ct: ClassTag[_]) = { before() - var rdd = sql(query) + var df = sql(query) // Assert src has a size smaller than the threshold. - val sizes = rdd.queryExecution.analyzed.collect { + val sizes = df.queryExecution.analyzed.collect { case r if ct.runtimeClass.isAssignableFrom(r.getClass) => r.statistics.sizeInBytes } assert(sizes.size === 2 && sizes(0) <= conf.autoBroadcastJoinThreshold @@ -157,21 +157,21 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { // Using `sparkPlan` because for relevant patterns in HashJoin to be // matched, other strategies need to be applied. - var bhj = rdd.queryExecution.sparkPlan.collect { case j: BroadcastHashJoin => j } + var bhj = df.queryExecution.sparkPlan.collect { case j: BroadcastHashJoin => j } assert(bhj.size === 1, - s"actual query plans do not contain broadcast join: ${rdd.queryExecution}") + s"actual query plans do not contain broadcast join: ${df.queryExecution}") - checkAnswer(rdd, expectedAnswer) // check correctness of output + checkAnswer(df, expectedAnswer) // check correctness of output TestHive.conf.settings.synchronized { val tmp = conf.autoBroadcastJoinThreshold sql(s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-1""") - rdd = sql(query) - bhj = rdd.queryExecution.sparkPlan.collect { case j: BroadcastHashJoin => j } + df = sql(query) + bhj = df.queryExecution.sparkPlan.collect { case j: BroadcastHashJoin => j } assert(bhj.isEmpty, "BroadcastHashJoin still planned even though it is switched off") - val shj = rdd.queryExecution.sparkPlan.collect { case j: ShuffledHashJoin => j } + val shj = df.queryExecution.sparkPlan.collect { case j: ShuffledHashJoin => j } assert(shj.size === 1, "ShuffledHashJoin should be planned when BroadcastHashJoin is turned off") @@ -199,10 +199,10 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { |left semi JOIN src b ON a.key=86 and a.key = b.key""".stripMargin val answer = Row(86, "val_86") - var rdd = sql(leftSemiJoinQuery) + var df = sql(leftSemiJoinQuery) // Assert src has a size smaller than the threshold. - val sizes = rdd.queryExecution.analyzed.collect { + val sizes = df.queryExecution.analyzed.collect { case r if implicitly[ClassTag[MetastoreRelation]].runtimeClass .isAssignableFrom(r.getClass) => r.statistics.sizeInBytes @@ -213,25 +213,25 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { // Using `sparkPlan` because for relevant patterns in HashJoin to be // matched, other strategies need to be applied. - var bhj = rdd.queryExecution.sparkPlan.collect { + var bhj = df.queryExecution.sparkPlan.collect { case j: BroadcastLeftSemiJoinHash => j } assert(bhj.size === 1, - s"actual query plans do not contain broadcast join: ${rdd.queryExecution}") + s"actual query plans do not contain broadcast join: ${df.queryExecution}") - checkAnswer(rdd, answer) // check correctness of output + checkAnswer(df, answer) // check correctness of output TestHive.conf.settings.synchronized { val tmp = conf.autoBroadcastJoinThreshold sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-1") - rdd = sql(leftSemiJoinQuery) - bhj = rdd.queryExecution.sparkPlan.collect { + df = sql(leftSemiJoinQuery) + bhj = df.queryExecution.sparkPlan.collect { case j: BroadcastLeftSemiJoinHash => j } assert(bhj.isEmpty, "BroadcastHashJoin still planned even though it is switched off") - val shj = rdd.queryExecution.sparkPlan.collect { + val shj = df.queryExecution.sparkPlan.collect { case j: LeftSemiJoinHash => j } assert(shj.size === 1, http://git-wip-us.apache.org/repos/asf/spark/blob/2bd33ce6/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 955f3f5..bb0a67d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -429,7 +429,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { |'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' WITH SERDEPROPERTIES |('serialization.last.column.takes.rest'='true') FROM src; """.stripMargin.replaceAll("\n", " ")) - + createQueryTest("LIKE", "SELECT * FROM src WHERE value LIKE '%1%'") @@ -567,7 +567,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { TestHive.sparkContext.parallelize( TestData(1, "str1") :: TestData(2, "str2") :: Nil) - testData.toDF.registerTempTable("REGisteredTABle") + testData.toDF().registerTempTable("REGisteredTABle") assertResult(Array(Row(2, "str2"))) { sql("SELECT tablealias.A, TABLEALIAS.b FROM reGisteredTABle TableAlias " + @@ -583,8 +583,8 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { test("SPARK-1704: Explain commands as a DataFrame") { sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") - val rdd = sql("explain select key, count(value) from src group by key") - assert(isExplanation(rdd)) + val df = sql("explain select key, count(value) from src group by key") + assert(isExplanation(df)) TestHive.reset() } @@ -592,7 +592,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { test("SPARK-2180: HAVING support in GROUP BY clauses (positive)") { val fixture = List(("foo", 2), ("bar", 1), ("foo", 4), ("bar", 3)) .zipWithIndex.map {case Pair(Pair(value, attr), key) => HavingRow(key, value, attr)} - TestHive.sparkContext.parallelize(fixture).toDF.registerTempTable("having_test") + TestHive.sparkContext.parallelize(fixture).toDF().registerTempTable("having_test") val results = sql("SELECT value, max(attr) AS attr FROM having_test GROUP BY value HAVING attr > 3") .collect() @@ -740,7 +740,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { TestHive.sparkContext.parallelize( TestData(1, "str1") :: TestData(1, "str2") :: Nil) - testData.toDF.registerTempTable("test_describe_commands2") + testData.toDF().registerTempTable("test_describe_commands2") assertResult( Array( @@ -900,8 +900,8 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { } test("SPARK-3414 regression: should store analyzed logical plan when registering a temp table") { - sparkContext.makeRDD(Seq.empty[LogEntry]).toDF.registerTempTable("rawLogs") - sparkContext.makeRDD(Seq.empty[LogFile]).toDF.registerTempTable("logFiles") + sparkContext.makeRDD(Seq.empty[LogEntry]).toDF().registerTempTable("rawLogs") + sparkContext.makeRDD(Seq.empty[LogFile]).toDF().registerTempTable("logFiles") sql( """ @@ -979,8 +979,8 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { val testVal = "test.val.0" val nonexistentKey = "nonexistent" val KV = "([^=]+)=([^=]*)".r - def collectResults(rdd: DataFrame): Set[(String, String)] = - rdd.collect().map { + def collectResults(df: DataFrame): Set[(String, String)] = + df.collect().map { case Row(key: String, value: String) => key -> value case Row(KV(key, value)) => key -> value }.toSet http://git-wip-us.apache.org/repos/asf/spark/blob/2bd33ce6/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala index 6fc4cc1..f4440e5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala @@ -77,7 +77,7 @@ class HiveResolutionSuite extends HiveComparisonTest { test("case insensitivity with scala reflection") { // Test resolution with Scala Reflection sparkContext.parallelize(Data(1, 2, Nested(1,2), Seq(Nested(1,2))) :: Nil) - .toDF.registerTempTable("caseSensitivityTest") + .toDF().registerTempTable("caseSensitivityTest") val query = sql("SELECT a, b, A, B, n.a, n.b, n.A, n.B FROM caseSensitivityTest") assert(query.schema.fields.map(_.name) === Seq("a", "b", "A", "B", "a", "b", "A", "B"), @@ -88,14 +88,14 @@ class HiveResolutionSuite extends HiveComparisonTest { ignore("case insensitivity with scala reflection joins") { // Test resolution with Scala Reflection sparkContext.parallelize(Data(1, 2, Nested(1,2), Seq(Nested(1,2))) :: Nil) - .toDF.registerTempTable("caseSensitivityTest") + .toDF().registerTempTable("caseSensitivityTest") sql("SELECT * FROM casesensitivitytest a JOIN casesensitivitytest b ON a.a = b.a").collect() } test("nested repeated resolution") { sparkContext.parallelize(Data(1, 2, Nested(1,2), Seq(Nested(1,2))) :: Nil) - .toDF.registerTempTable("nestedRepeatedTest") + .toDF().registerTempTable("nestedRepeatedTest") assert(sql("SELECT nestedArray[0].a FROM nestedRepeatedTest").collect().head(0) === 1) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org