[SPARK-5752][SQL] Don't implicitly convert RDDs directly to DataFrames - The old implicit would convert RDDs directly to DataFrames, and that added too many methods. - toDataFrame -> toDF - Dsl -> functions - implicits moved into SQLContext.implicits - addColumn -> withColumn - renameColumn -> withColumnRenamed
Python changes: - toDataFrame -> toDF - Dsl -> functions package - addColumn -> withColumn - renameColumn -> withColumnRenamed - add toDF functions to RDD on SQLContext init - add flatMap to DataFrame Author: Reynold Xin <r...@databricks.com> Author: Davies Liu <dav...@databricks.com> Closes #4556 from rxin/SPARK-5752 and squashes the following commits: 5ef9910 [Reynold Xin] More fix 61d3fca [Reynold Xin] Merge branch 'df5' of github.com:davies/spark into SPARK-5752 ff5832c [Reynold Xin] Fix python 749c675 [Reynold Xin] count(*) fixes. 5806df0 [Reynold Xin] Fix build break again. d941f3d [Reynold Xin] Fixed explode compilation break. fe1267a [Davies Liu] flatMap c4afb8e [Reynold Xin] style d9de47f [Davies Liu] add comment b783994 [Davies Liu] add comment for toDF e2154e5 [Davies Liu] schema() -> schema 3a1004f [Davies Liu] Dsl -> functions, toDF() fb256af [Reynold Xin] - toDataFrame -> toDF - Dsl -> functions - implicits moved into SQLContext.implicits - addColumn -> withColumn - renameColumn -> withColumnRenamed 0dd74eb [Reynold Xin] [SPARK-5752][SQL] Don't implicitly convert RDDs directly to DataFrames 97dd47c [Davies Liu] fix mistake 6168f74 [Davies Liu] fix test 1fc0199 [Davies Liu] fix test a075cd5 [Davies Liu] clean up, toPandas 663d314 [Davies Liu] add test for agg('*') 9e214d5 [Reynold Xin] count(*) fixes. 1ed7136 [Reynold Xin] Fix build break again. 921b2e3 [Reynold Xin] Fixed explode compilation break. 14698d4 [Davies Liu] flatMap ba3e12d [Reynold Xin] style d08c92d [Davies Liu] add comment 5c8b524 [Davies Liu] add comment for toDF a4e5e66 [Davies Liu] schema() -> schema d377fc9 [Davies Liu] Dsl -> functions, toDF() 6b3086c [Reynold Xin] - toDataFrame -> toDF - Dsl -> functions - implicits moved into SQLContext.implicits - addColumn -> withColumn - renameColumn -> withColumnRenamed 807e8b1 [Reynold Xin] [SPARK-5752][SQL] Don't implicitly convert RDDs directly to DataFrames (cherry picked from commit e98dfe627c5d0201464cdd0f363f391ea84c389a) Signed-off-by: Reynold Xin <r...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ba91bf5f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ba91bf5f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ba91bf5f Branch: refs/heads/branch-1.3 Commit: ba91bf5f4f048a721d97eb5779957ec39b15319f Parents: db57479 Author: Reynold Xin <r...@databricks.com> Authored: Fri Feb 13 23:03:22 2015 -0800 Committer: Reynold Xin <r...@databricks.com> Committed: Fri Feb 13 23:03:31 2015 -0800 ---------------------------------------------------------------------- .../examples/ml/CrossValidatorExample.scala | 4 +- .../spark/examples/ml/DeveloperApiExample.scala | 4 +- .../apache/spark/examples/ml/MovieLensALS.scala | 6 +- .../spark/examples/ml/SimpleParamsExample.scala | 6 +- .../ml/SimpleTextClassificationPipeline.scala | 4 +- .../spark/examples/mllib/DatasetExample.scala | 8 +- .../apache/spark/examples/sql/RDDRelation.scala | 10 +- .../spark/examples/sql/hive/HiveFromSpark.scala | 2 +- .../scala/org/apache/spark/ml/Transformer.scala | 6 +- .../spark/ml/classification/Classifier.scala | 16 +- .../ml/classification/LogisticRegression.scala | 33 +- .../ProbabilisticClassifier.scala | 6 +- .../spark/ml/feature/StandardScaler.scala | 4 +- .../spark/ml/impl/estimator/Predictor.scala | 4 +- .../apache/spark/ml/recommendation/ALS.scala | 6 +- .../spark/mllib/classification/NaiveBayes.scala | 2 +- .../impl/GLMClassificationModel.scala | 2 +- .../MatrixFactorizationModel.scala | 4 +- .../regression/impl/GLMRegressionModel.scala | 2 +- .../mllib/tree/model/DecisionTreeModel.scala | 2 +- .../mllib/tree/model/treeEnsembleModels.scala | 2 +- .../spark/ml/recommendation/ALSSuite.scala | 4 +- python/docs/pyspark.sql.rst | 8 + python/pyspark/mllib/tests.py | 2 +- python/pyspark/sql/__init__.py | 3 +- python/pyspark/sql/context.py | 34 +- python/pyspark/sql/dataframe.py | 221 +++------- python/pyspark/sql/functions.py | 170 ++++++++ python/pyspark/sql/tests.py | 38 +- python/run-tests | 3 +- .../org/apache/spark/repl/SparkILoopInit.scala | 2 +- .../scala/org/apache/spark/repl/ReplSuite.scala | 2 +- .../org/apache/spark/repl/SparkILoop.scala | 2 +- .../sql/catalyst/analysis/unresolved.scala | 2 +- .../scala/org/apache/spark/sql/Column.scala | 21 +- .../scala/org/apache/spark/sql/DataFrame.scala | 25 +- .../org/apache/spark/sql/DataFrameHolder.scala | 30 ++ .../org/apache/spark/sql/DataFrameImpl.scala | 6 +- .../main/scala/org/apache/spark/sql/Dsl.scala | 428 ------------------- .../org/apache/spark/sql/GroupedData.scala | 19 +- .../apache/spark/sql/IncomputableColumn.scala | 6 +- .../scala/org/apache/spark/sql/SQLContext.scala | 35 +- .../apache/spark/sql/UserDefinedFunction.scala | 4 +- .../scala/org/apache/spark/sql/functions.scala | 425 ++++++++++++++++++ .../apache/spark/sql/parquet/ParquetTest.scala | 2 +- .../org/apache/spark/sql/api/java/JavaDsl.java | 2 +- .../org/apache/spark/sql/CachedTableSuite.scala | 7 +- .../spark/sql/ColumnExpressionSuite.scala | 10 +- .../spark/sql/DataFrameImplicitsSuite.scala | 10 +- .../org/apache/spark/sql/DataFrameSuite.scala | 51 ++- .../scala/org/apache/spark/sql/JoinSuite.scala | 3 +- .../org/apache/spark/sql/ListTablesSuite.scala | 2 +- .../org/apache/spark/sql/SQLQuerySuite.scala | 8 +- .../sql/ScalaReflectionRelationSuite.scala | 10 +- .../scala/org/apache/spark/sql/TestData.scala | 46 +- .../scala/org/apache/spark/sql/UDFSuite.scala | 2 +- .../apache/spark/sql/UserDefinedTypeSuite.scala | 4 +- .../columnar/InMemoryColumnarQuerySuite.scala | 8 +- .../columnar/PartitionBatchPruningSuite.scala | 5 +- .../spark/sql/execution/PlannerSuite.scala | 3 +- .../org/apache/spark/sql/json/JsonSuite.scala | 7 +- .../spark/sql/parquet/ParquetIOSuite.scala | 6 +- .../sql/hive/InsertIntoHiveTableSuite.scala | 8 +- .../apache/spark/sql/hive/ListTablesSuite.scala | 2 +- .../sql/hive/MetastoreDataSourcesSuite.scala | 17 +- .../sql/hive/execution/HiveQuerySuite.scala | 12 +- .../hive/execution/HiveResolutionSuite.scala | 6 +- .../sql/hive/execution/HiveTableScanSuite.scala | 3 +- .../spark/sql/hive/execution/HiveUdfSuite.scala | 10 +- .../sql/hive/execution/SQLQuerySuite.scala | 11 +- .../spark/sql/parquet/parquetSuites.scala | 6 +- 71 files changed, 1012 insertions(+), 872 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala index a2893f7..f024194 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala @@ -90,7 +90,7 @@ object CrossValidatorExample { crossval.setNumFolds(2) // Use 3+ in practice // Run cross-validation, and choose the best set of parameters. - val cvModel = crossval.fit(training) + val cvModel = crossval.fit(training.toDF) // Prepare test documents, which are unlabeled. val test = sc.parallelize(Seq( @@ -100,7 +100,7 @@ object CrossValidatorExample { Document(7L, "apache hadoop"))) // Make predictions on test documents. cvModel uses the best model found (lrModel). - cvModel.transform(test) + cvModel.transform(test.toDF) .select("id", "text", "probability", "prediction") .collect() .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) => http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala index aed4423..54aadd2 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala @@ -58,7 +58,7 @@ object DeveloperApiExample { lr.setMaxIter(10) // Learn a LogisticRegression model. This uses the parameters stored in lr. - val model = lr.fit(training) + val model = lr.fit(training.toDF) // Prepare test data. val test = sc.parallelize(Seq( @@ -67,7 +67,7 @@ object DeveloperApiExample { LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5)))) // Make predictions on test data. - val sumPredictions: Double = model.transform(test) + val sumPredictions: Double = model.transform(test.toDF) .select("features", "label", "prediction") .collect() .map { case Row(features: Vector, label: Double, prediction: Double) => http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala b/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala index 836ea2e..adaf796 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala @@ -137,9 +137,9 @@ object MovieLensALS { .setRegParam(params.regParam) .setNumBlocks(params.numBlocks) - val model = als.fit(training) + val model = als.fit(training.toDF) - val predictions = model.transform(test).cache() + val predictions = model.transform(test.toDF).cache() // Evaluate the model. // TODO: Create an evaluator to compute RMSE. @@ -158,7 +158,7 @@ object MovieLensALS { // Inspect false positives. predictions.registerTempTable("prediction") - sc.textFile(params.movies).map(Movie.parseMovie).registerTempTable("movie") + sc.textFile(params.movies).map(Movie.parseMovie).toDF.registerTempTable("movie") sqlContext.sql( """ |SELECT userId, prediction.movieId, title, rating, prediction http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/examples/src/main/scala/org/apache/spark/examples/ml/SimpleParamsExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/SimpleParamsExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/SimpleParamsExample.scala index 80c9f5f..c5bb551 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/SimpleParamsExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/SimpleParamsExample.scala @@ -58,7 +58,7 @@ object SimpleParamsExample { .setRegParam(0.01) // Learn a LogisticRegression model. This uses the parameters stored in lr. - val model1 = lr.fit(training) + val model1 = lr.fit(training.toDF) // Since model1 is a Model (i.e., a Transformer produced by an Estimator), // we can view the parameters it used during fit(). // This prints the parameter (name: value) pairs, where names are unique IDs for this @@ -77,7 +77,7 @@ object SimpleParamsExample { // Now learn a new model using the paramMapCombined parameters. // paramMapCombined overrides all parameters set earlier via lr.set* methods. - val model2 = lr.fit(training, paramMapCombined) + val model2 = lr.fit(training.toDF, paramMapCombined) println("Model 2 was fit using parameters: " + model2.fittingParamMap) // Prepare test data. @@ -90,7 +90,7 @@ object SimpleParamsExample { // LogisticRegression.transform will only use the 'features' column. // Note that model2.transform() outputs a 'myProbability' column instead of the usual // 'probability' column since we renamed the lr.probabilityCol parameter previously. - model2.transform(test) + model2.transform(test.toDF) .select("features", "label", "myProbability", "prediction") .collect() .foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) => http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/examples/src/main/scala/org/apache/spark/examples/ml/SimpleTextClassificationPipeline.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/SimpleTextClassificationPipeline.scala b/examples/src/main/scala/org/apache/spark/examples/ml/SimpleTextClassificationPipeline.scala index 968cb29..8b47f88 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/SimpleTextClassificationPipeline.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/SimpleTextClassificationPipeline.scala @@ -69,7 +69,7 @@ object SimpleTextClassificationPipeline { .setStages(Array(tokenizer, hashingTF, lr)) // Fit the pipeline to training documents. - val model = pipeline.fit(training) + val model = pipeline.fit(training.toDF) // Prepare test documents, which are unlabeled. val test = sc.parallelize(Seq( @@ -79,7 +79,7 @@ object SimpleTextClassificationPipeline { Document(7L, "apache hadoop"))) // Make predictions on test documents. - model.transform(test) + model.transform(test.toDF) .select("id", "text", "probability", "prediction") .collect() .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) => http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala index 89b6255..c98c68a 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala @@ -81,18 +81,18 @@ object DatasetExample { println(s"Loaded ${origData.count()} instances from file: ${params.input}") // Convert input data to DataFrame explicitly. - val df: DataFrame = origData.toDataFrame + val df: DataFrame = origData.toDF println(s"Inferred schema:\n${df.schema.prettyJson}") println(s"Converted to DataFrame with ${df.count()} records") - // Select columns, using implicit conversion to DataFrames. - val labelsDf: DataFrame = origData.select("label") + // Select columns + val labelsDf: DataFrame = df.select("label") val labels: RDD[Double] = labelsDf.map { case Row(v: Double) => v } val numLabels = labels.count() val meanLabel = labels.fold(0.0)(_ + _) / numLabels println(s"Selected label column with average value $meanLabel") - val featuresDf: DataFrame = origData.select("features") + val featuresDf: DataFrame = df.select("features") val features: RDD[Vector] = featuresDf.map { case Row(v: Vector) => v } val featureSummary = features.aggregate(new MultivariateOnlineSummarizer())( (summary, feat) => summary.add(feat), http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala index 1eac3c8..79d3d5a 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala @@ -19,7 +19,7 @@ package org.apache.spark.examples.sql import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.Dsl._ +import org.apache.spark.sql.functions._ // One method for defining the schema of an RDD is to make a case class with the desired column // names and types. @@ -34,10 +34,10 @@ object RDDRelation { // Importing the SQL context gives access to all the SQL functions and implicit conversions. import sqlContext.implicits._ - val rdd = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i"))) + val df = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i"))).toDF // Any RDD containing case classes can be registered as a table. The schema of the table is // automatically inferred using scala reflection. - rdd.registerTempTable("records") + df.registerTempTable("records") // Once tables have been registered, you can run SQL queries over them. println("Result of SELECT *:") @@ -55,10 +55,10 @@ object RDDRelation { rddFromSql.map(row => s"Key: ${row(0)}, Value: ${row(1)}").collect().foreach(println) // Queries can also be written using a LINQ-like Scala DSL. - rdd.where($"key" === 1).orderBy($"value".asc).select($"key").collect().foreach(println) + df.where($"key" === 1).orderBy($"value".asc).select($"key").collect().foreach(println) // Write out an RDD as a parquet file. - rdd.saveAsParquetFile("pair.parquet") + df.saveAsParquetFile("pair.parquet") // Read in parquet file. Parquet files are self-describing so the schmema is preserved. val parquetFile = sqlContext.parquetFile("pair.parquet") http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala index 15754cd..7128deb 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala @@ -68,7 +68,7 @@ object HiveFromSpark { // You can also register RDDs as temporary tables within a HiveContext. val rdd = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i"))) - rdd.registerTempTable("records") + rdd.toDF.registerTempTable("records") // Queries can then join RDD data with data stored in Hive. println("Result of SELECT *:") http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala b/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala index 2ec2ccd..9a58486 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala @@ -23,7 +23,7 @@ import org.apache.spark.Logging import org.apache.spark.annotation.AlphaComponent import org.apache.spark.ml.param._ import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.Dsl._ +import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ /** @@ -100,7 +100,7 @@ private[ml] abstract class UnaryTransformer[IN, OUT, T <: UnaryTransformer[IN, O override def transform(dataset: DataFrame, paramMap: ParamMap): DataFrame = { transformSchema(dataset.schema, paramMap, logging = true) val map = this.paramMap ++ paramMap - dataset.select($"*", callUDF( - this.createTransformFunc(map), outputDataType, dataset(map(inputCol))).as(map(outputCol))) + dataset.withColumn(map(outputCol), + callUDF(this.createTransformFunc(map), outputDataType, dataset(map(inputCol)))) } } http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala index 124ab30..c5fc89f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala @@ -21,7 +21,7 @@ import org.apache.spark.annotation.{DeveloperApi, AlphaComponent} import org.apache.spark.ml.impl.estimator.{PredictionModel, Predictor, PredictorParams} import org.apache.spark.ml.param.{Params, ParamMap, HasRawPredictionCol} import org.apache.spark.mllib.linalg.{Vector, VectorUDT} -import org.apache.spark.sql.Dsl._ +import org.apache.spark.sql.functions._ import org.apache.spark.sql.DataFrame import org.apache.spark.sql.types.{DataType, DoubleType, StructType} @@ -182,24 +182,22 @@ private[ml] object ClassificationModel { if (map(model.rawPredictionCol) != "") { // output raw prediction val features2raw: FeaturesType => Vector = model.predictRaw - tmpData = tmpData.select($"*", - callUDF(features2raw, new VectorUDT, - col(map(model.featuresCol))).as(map(model.rawPredictionCol))) + tmpData = tmpData.withColumn(map(model.rawPredictionCol), + callUDF(features2raw, new VectorUDT, col(map(model.featuresCol)))) numColsOutput += 1 if (map(model.predictionCol) != "") { val raw2pred: Vector => Double = (rawPred) => { rawPred.toArray.zipWithIndex.maxBy(_._1)._2 } - tmpData = tmpData.select($"*", callUDF(raw2pred, DoubleType, - col(map(model.rawPredictionCol))).as(map(model.predictionCol))) + tmpData = tmpData.withColumn(map(model.predictionCol), + callUDF(raw2pred, DoubleType, col(map(model.rawPredictionCol)))) numColsOutput += 1 } } else if (map(model.predictionCol) != "") { // output prediction val features2pred: FeaturesType => Double = model.predict - tmpData = tmpData.select($"*", - callUDF(features2pred, DoubleType, - col(map(model.featuresCol))).as(map(model.predictionCol))) + tmpData = tmpData.withColumn(map(model.predictionCol), + callUDF(features2pred, DoubleType, col(map(model.featuresCol)))) numColsOutput += 1 } (numColsOutput, tmpData) http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index a9a5af5..21f61d8 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -22,7 +22,7 @@ import org.apache.spark.ml.param._ import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS import org.apache.spark.mllib.linalg.{VectorUDT, BLAS, Vector, Vectors} import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.Dsl._ +import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.DoubleType import org.apache.spark.storage.StorageLevel @@ -130,44 +130,39 @@ class LogisticRegressionModel private[ml] ( var numColsOutput = 0 if (map(rawPredictionCol) != "") { val features2raw: Vector => Vector = (features) => predictRaw(features) - tmpData = tmpData.select($"*", - callUDF(features2raw, new VectorUDT, col(map(featuresCol))).as(map(rawPredictionCol))) + tmpData = tmpData.withColumn(map(rawPredictionCol), + callUDF(features2raw, new VectorUDT, col(map(featuresCol)))) numColsOutput += 1 } if (map(probabilityCol) != "") { if (map(rawPredictionCol) != "") { - val raw2prob: Vector => Vector = { (rawPreds: Vector) => + val raw2prob = udf { (rawPreds: Vector) => val prob1 = 1.0 / (1.0 + math.exp(-rawPreds(1))) - Vectors.dense(1.0 - prob1, prob1) + Vectors.dense(1.0 - prob1, prob1): Vector } - tmpData = tmpData.select($"*", - callUDF(raw2prob, new VectorUDT, col(map(rawPredictionCol))).as(map(probabilityCol))) + tmpData = tmpData.withColumn(map(probabilityCol), raw2prob(col(map(rawPredictionCol)))) } else { - val features2prob: Vector => Vector = (features: Vector) => predictProbabilities(features) - tmpData = tmpData.select($"*", - callUDF(features2prob, new VectorUDT, col(map(featuresCol))).as(map(probabilityCol))) + val features2prob = udf { (features: Vector) => predictProbabilities(features) : Vector } + tmpData = tmpData.withColumn(map(probabilityCol), features2prob(col(map(featuresCol)))) } numColsOutput += 1 } if (map(predictionCol) != "") { val t = map(threshold) if (map(probabilityCol) != "") { - val predict: Vector => Double = { probs: Vector => + val predict = udf { probs: Vector => if (probs(1) > t) 1.0 else 0.0 } - tmpData = tmpData.select($"*", - callUDF(predict, DoubleType, col(map(probabilityCol))).as(map(predictionCol))) + tmpData = tmpData.withColumn(map(predictionCol), predict(col(map(probabilityCol)))) } else if (map(rawPredictionCol) != "") { - val predict: Vector => Double = { rawPreds: Vector => + val predict = udf { rawPreds: Vector => val prob1 = 1.0 / (1.0 + math.exp(-rawPreds(1))) if (prob1 > t) 1.0 else 0.0 } - tmpData = tmpData.select($"*", - callUDF(predict, DoubleType, col(map(rawPredictionCol))).as(map(predictionCol))) + tmpData = tmpData.withColumn(map(predictionCol), predict(col(map(rawPredictionCol)))) } else { - val predict: Vector => Double = (features: Vector) => this.predict(features) - tmpData = tmpData.select($"*", - callUDF(predict, DoubleType, col(map(featuresCol))).as(map(predictionCol))) + val predict = udf { features: Vector => this.predict(features) } + tmpData = tmpData.withColumn(map(predictionCol), predict(col(map(featuresCol)))) } numColsOutput += 1 } http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala index 3851878..bd8caac 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala @@ -21,7 +21,7 @@ import org.apache.spark.annotation.{AlphaComponent, DeveloperApi} import org.apache.spark.ml.param.{HasProbabilityCol, ParamMap, Params} import org.apache.spark.mllib.linalg.{Vector, VectorUDT} import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.Dsl._ +import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{DataType, StructType} @@ -122,8 +122,8 @@ private[spark] abstract class ProbabilisticClassificationModel[ val features2probs: FeaturesType => Vector = (features) => { tmpModel.predictProbabilities(features) } - outputData.select($"*", - callUDF(features2probs, new VectorUDT, col(map(featuresCol))).as(map(probabilityCol))) + outputData.withColumn(map(probabilityCol), + callUDF(features2probs, new VectorUDT, col(map(featuresCol)))) } else { if (numColsOutput == 0) { this.logWarning(s"$uid: ProbabilisticClassificationModel.transform() was called as NOOP" + http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala index 7623ec5..ddbd648 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala @@ -23,7 +23,7 @@ import org.apache.spark.ml.param._ import org.apache.spark.mllib.feature import org.apache.spark.mllib.linalg.{Vector, VectorUDT} import org.apache.spark.sql._ -import org.apache.spark.sql.Dsl._ +import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{StructField, StructType} /** @@ -88,7 +88,7 @@ class StandardScalerModel private[ml] ( transformSchema(dataset.schema, paramMap, logging = true) val map = this.paramMap ++ paramMap val scale = udf((v: Vector) => { scaler.transform(v) } : Vector) - dataset.select($"*", scale(col(map(inputCol))).as(map(outputCol))) + dataset.withColumn(map(outputCol), scale(col(map(inputCol)))) } private[ml] override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/Predictor.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/Predictor.scala b/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/Predictor.scala index e416c1e..7daeff9 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/Predictor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/Predictor.scala @@ -24,7 +24,7 @@ import org.apache.spark.mllib.linalg.{VectorUDT, Vector} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row} -import org.apache.spark.sql.Dsl._ +import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{DataType, DoubleType, StructType} @@ -216,7 +216,7 @@ private[spark] abstract class PredictionModel[FeaturesType, M <: PredictionModel val pred: FeaturesType => Double = (features) => { tmpModel.predict(features) } - dataset.select($"*", callUDF(pred, DoubleType, col(map(featuresCol))).as(map(predictionCol))) + dataset.withColumn(map(predictionCol), callUDF(pred, DoubleType, col(map(featuresCol)))) } else { this.logWarning(s"$uid: Predictor.transform() was called as NOOP" + " since no output columns were set.") http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index aac4877..8d70e43 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -36,7 +36,7 @@ import org.apache.spark.ml.param._ import org.apache.spark.mllib.optimization.NNLS import org.apache.spark.rdd.RDD import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.Dsl._ +import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{DoubleType, FloatType, IntegerType, StructField, StructType} import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils @@ -170,8 +170,8 @@ class ALSModel private[ml] ( override def transform(dataset: DataFrame, paramMap: ParamMap): DataFrame = { import dataset.sqlContext.implicits._ val map = this.paramMap ++ paramMap - val users = userFactors.toDataFrame("id", "features") - val items = itemFactors.toDataFrame("id", "features") + val users = userFactors.toDF("id", "features") + val items = itemFactors.toDF("id", "features") // Register a UDF for DataFrame, and then // create a new column named map(predictionCol) by running the predict UDF. http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala index f9142bc..dd7a946 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -102,7 +102,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] { sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path)) // Create Parquet data. - val dataRDD: DataFrame = sc.parallelize(Seq(data), 1) + val dataRDD: DataFrame = sc.parallelize(Seq(data), 1).toDF dataRDD.saveAsParquetFile(dataPath(path)) } http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala index 1d11896..0a358f2 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala @@ -62,7 +62,7 @@ private[classification] object GLMClassificationModel { // Create Parquet data. val data = Data(weights, intercept, threshold) - sc.parallelize(Seq(data), 1).saveAsParquetFile(Loader.dataPath(path)) + sc.parallelize(Seq(data), 1).toDF.saveAsParquetFile(Loader.dataPath(path)) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index a3a3b5d..c399496 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -187,8 +187,8 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("rank" -> model.rank))) sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path)) - model.userFeatures.toDataFrame("id", "features").saveAsParquetFile(userPath(path)) - model.productFeatures.toDataFrame("id", "features").saveAsParquetFile(productPath(path)) + model.userFeatures.toDF("id", "features").saveAsParquetFile(userPath(path)) + model.productFeatures.toDF("id", "features").saveAsParquetFile(productPath(path)) } def load(sc: SparkContext, path: String): MatrixFactorizationModel = { http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala index f75de6f..7b27aaa 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala @@ -58,7 +58,7 @@ private[regression] object GLMRegressionModel { // Create Parquet data. val data = Data(weights, intercept) - val dataRDD: DataFrame = sc.parallelize(Seq(data), 1) + val dataRDD: DataFrame = sc.parallelize(Seq(data), 1).toDF // TODO: repartition with 1 partition after SPARK-5532 gets fixed dataRDD.saveAsParquetFile(Loader.dataPath(path)) } http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala index 373192a..5dac62b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala @@ -197,7 +197,7 @@ object DecisionTreeModel extends Loader[DecisionTreeModel] { val nodes = model.topNode.subtreeIterator.toSeq val dataRDD: DataFrame = sc.parallelize(nodes) .map(NodeData.apply(0, _)) - .toDataFrame + .toDF dataRDD.saveAsParquetFile(Loader.dataPath(path)) } http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala index dbd69dc..e507f24 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala @@ -289,7 +289,7 @@ private[tree] object TreeEnsembleModel { // Create Parquet data. val dataRDD = sc.parallelize(model.trees.zipWithIndex).flatMap { case (tree, treeId) => tree.topNode.subtreeIterator.toSeq.map(node => NodeData(treeId, node)) - }.toDataFrame + }.toDF dataRDD.saveAsParquetFile(Loader.dataPath(path)) } http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala index cb7d57d..b118a8d 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala @@ -358,8 +358,8 @@ class ALSSuite extends FunSuite with MLlibTestSparkContext with Logging { .setNumUserBlocks(numUserBlocks) .setNumItemBlocks(numItemBlocks) val alpha = als.getAlpha - val model = als.fit(training) - val predictions = model.transform(test) + val model = als.fit(training.toDF) + val predictions = model.transform(test.toDF) .select("rating", "prediction") .map { case Row(rating: Float, prediction: Float) => (rating.toDouble, prediction.toDouble) http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/python/docs/pyspark.sql.rst ---------------------------------------------------------------------- diff --git a/python/docs/pyspark.sql.rst b/python/docs/pyspark.sql.rst index 80c6f02..e03379e 100644 --- a/python/docs/pyspark.sql.rst +++ b/python/docs/pyspark.sql.rst @@ -16,3 +16,11 @@ pyspark.sql.types module :members: :undoc-members: :show-inheritance: + + +pyspark.sql.functions module +------------------------ +.. automodule:: pyspark.sql.functions + :members: + :undoc-members: + :show-inheritance: http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/python/pyspark/mllib/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index 49e5c9d..06207a0 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -335,7 +335,7 @@ class VectorUDTTests(PySparkTestCase): sqlCtx = SQLContext(self.sc) rdd = self.sc.parallelize([LabeledPoint(1.0, self.dv1), LabeledPoint(0.0, self.sv1)]) srdd = sqlCtx.inferSchema(rdd) - schema = srdd.schema() + schema = srdd.schema field = [f for f in schema.fields if f.name == "features"][0] self.assertEqual(field.dataType, self.udt) vectors = srdd.map(lambda p: p.features).collect() http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/python/pyspark/sql/__init__.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/__init__.py b/python/pyspark/sql/__init__.py index 0a5ba00..b9ffd69 100644 --- a/python/pyspark/sql/__init__.py +++ b/python/pyspark/sql/__init__.py @@ -34,9 +34,8 @@ public classes of Spark SQL: from pyspark.sql.context import SQLContext, HiveContext from pyspark.sql.types import Row -from pyspark.sql.dataframe import DataFrame, GroupedData, Column, Dsl, SchemaRDD +from pyspark.sql.dataframe import DataFrame, GroupedData, Column, SchemaRDD __all__ = [ 'SQLContext', 'HiveContext', 'DataFrame', 'GroupedData', 'Column', 'Row', - 'Dsl', ] http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/python/pyspark/sql/context.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index 082f1b6..7683c1b 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -38,6 +38,25 @@ except ImportError: __all__ = ["SQLContext", "HiveContext"] +def _monkey_patch_RDD(sqlCtx): + def toDF(self, schema=None, sampleRatio=None): + """ + Convert current :class:`RDD` into a :class:`DataFrame` + + This is a shorthand for `sqlCtx.createDataFrame(rdd, schema, sampleRatio)` + + :param schema: a StructType or list of names of columns + :param samplingRatio: the sample ratio of rows used for inferring + :return: a DataFrame + + >>> rdd.toDF().collect() + [Row(name=u'Alice', age=1)] + """ + return sqlCtx.createDataFrame(self, schema, sampleRatio) + + RDD.toDF = toDF + + class SQLContext(object): """Main entry point for Spark SQL functionality. @@ -49,15 +68,20 @@ class SQLContext(object): def __init__(self, sparkContext, sqlContext=None): """Create a new SQLContext. + It will add a method called `toDF` to :class:`RDD`, which could be + used to convert an RDD into a DataFrame, it's a shorthand for + :func:`SQLContext.createDataFrame`. + :param sparkContext: The SparkContext to wrap. :param sqlContext: An optional JVM Scala SQLContext. If set, we do not instatiate a new SQLContext in the JVM, instead we make all calls to this object. >>> from datetime import datetime + >>> sqlCtx = SQLContext(sc) >>> allTypes = sc.parallelize([Row(i=1, s="string", d=1.0, l=1L, ... b=True, list=[1, 2, 3], dict={"s": 0}, row=Row(a=1), ... time=datetime(2014, 8, 1, 14, 1, 5))]) - >>> df = sqlCtx.createDataFrame(allTypes) + >>> df = allTypes.toDF() >>> df.registerTempTable("allTypes") >>> sqlCtx.sql('select i+1, d+1, not b, list[1], dict["s"], time, row.a ' ... 'from allTypes where b and i > 0').collect() @@ -70,6 +94,7 @@ class SQLContext(object): self._jsc = self._sc._jsc self._jvm = self._sc._jvm self._scala_SQLContext = sqlContext + _monkey_patch_RDD(self) @property def _ssql_ctx(self): @@ -442,7 +467,7 @@ class SQLContext(object): Row(f1=2, f2=None, f3=Row(field4=22,..., f4=[Row(field7=u'row2')]) Row(f1=None, f2=u'row3', f3=Row(field4=33, field5=[]), f4=None) - >>> df3 = sqlCtx.jsonFile(jsonFile, df1.schema()) + >>> df3 = sqlCtx.jsonFile(jsonFile, df1.schema) >>> sqlCtx.registerRDDAsTable(df3, "table2") >>> df4 = sqlCtx.sql( ... "SELECT field1 AS f1, field2 as f2, field3 as f3, " @@ -495,7 +520,7 @@ class SQLContext(object): Row(f1=2, f2=None, f3=Row(field4=22..., f4=[Row(field7=u'row2')]) Row(f1=None, f2=u'row3', f3=Row(field4=33, field5=[]), f4=None) - >>> df3 = sqlCtx.jsonRDD(json, df1.schema()) + >>> df3 = sqlCtx.jsonRDD(json, df1.schema) >>> sqlCtx.registerRDDAsTable(df3, "table2") >>> df4 = sqlCtx.sql( ... "SELECT field1 AS f1, field2 as f2, field3 as f3, " @@ -800,7 +825,8 @@ def _test(): Row(field1=2, field2="row2"), Row(field1=3, field2="row3")] ) - globs['df'] = sqlCtx.createDataFrame(rdd) + _monkey_patch_RDD(sqlCtx) + globs['df'] = rdd.toDF() jsonStrings = [ '{"field1": 1, "field2": "row1", "field3":{"field4":11}}', '{"field1" : 2, "field3":{"field4":22, "field5": [10, 11]},' http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/python/pyspark/sql/dataframe.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index b6f052e..1438fe5 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -21,21 +21,19 @@ import warnings import random import os from tempfile import NamedTemporaryFile -from itertools import imap from py4j.java_collections import ListConverter, MapConverter from pyspark.context import SparkContext -from pyspark.rdd import RDD, _prepare_for_python_RDD -from pyspark.serializers import BatchedSerializer, AutoBatchedSerializer, PickleSerializer, \ - UTF8Deserializer +from pyspark.rdd import RDD +from pyspark.serializers import BatchedSerializer, PickleSerializer, UTF8Deserializer from pyspark.storagelevel import StorageLevel from pyspark.traceback_utils import SCCallSiteSync from pyspark.sql.types import * from pyspark.sql.types import _create_cls, _parse_datatype_json_string -__all__ = ["DataFrame", "GroupedData", "Column", "Dsl", "SchemaRDD"] +__all__ = ["DataFrame", "GroupedData", "Column", "SchemaRDD"] class DataFrame(object): @@ -76,6 +74,7 @@ class DataFrame(object): self.sql_ctx = sql_ctx self._sc = sql_ctx and sql_ctx._sc self.is_cached = False + self._schema = None # initialized lazily @property def rdd(self): @@ -86,7 +85,7 @@ class DataFrame(object): if not hasattr(self, '_lazy_rdd'): jrdd = self._jdf.javaToPython() rdd = RDD(jrdd, self.sql_ctx._sc, BatchedSerializer(PickleSerializer())) - schema = self.schema() + schema = self.schema def applySchema(it): cls = _create_cls(schema) @@ -216,14 +215,17 @@ class DataFrame(object): self._sc._gateway._gateway_client) self._jdf.save(source, jmode, joptions) + @property def schema(self): """Returns the schema of this DataFrame (represented by a L{StructType}). - >>> df.schema() + >>> df.schema StructType(List(StructField(age,IntegerType,true),StructField(name,StringType,true))) """ - return _parse_datatype_json_string(self._jdf.schema().json()) + if self._schema is None: + self._schema = _parse_datatype_json_string(self._jdf.schema().json()) + return self._schema def printSchema(self): """Prints out the schema in the tree format. @@ -284,7 +286,7 @@ class DataFrame(object): with open(tempFile.name, 'rb') as tempFile: rs = list(BatchedSerializer(PickleSerializer()).load_stream(tempFile)) os.unlink(tempFile.name) - cls = _create_cls(self.schema()) + cls = _create_cls(self.schema) return [cls(r) for r in rs] def limit(self, num): @@ -310,14 +312,26 @@ class DataFrame(object): return self.limit(num).collect() def map(self, f): - """ Return a new RDD by applying a function to each Row, it's a - shorthand for df.rdd.map() + """ Return a new RDD by applying a function to each Row + + It's a shorthand for df.rdd.map() >>> df.map(lambda p: p.name).collect() [u'Alice', u'Bob'] """ return self.rdd.map(f) + def flatMap(self, f): + """ Return a new RDD by first applying a function to all elements of this, + and then flattening the results. + + It's a shorthand for df.rdd.flatMap() + + >>> df.flatMap(lambda p: p.name).collect() + [u'A', u'l', u'i', u'c', u'e', u'B', u'o', u'b'] + """ + return self.rdd.flatMap(f) + def mapPartitions(self, f, preservesPartitioning=False): """ Return a new RDD by applying a function to each partition. @@ -378,21 +392,6 @@ class DataFrame(object): rdd = self._jdf.sample(withReplacement, fraction, long(seed)) return DataFrame(rdd, self.sql_ctx) - # def takeSample(self, withReplacement, num, seed=None): - # """Return a fixed-size sampled subset of this DataFrame. - # - # >>> df = sqlCtx.inferSchema(rdd) - # >>> df.takeSample(False, 2, 97) - # [Row(field1=3, field2=u'row3'), Row(field1=1, field2=u'row1')] - # """ - # seed = seed if seed is not None else random.randint(0, sys.maxint) - # with SCCallSiteSync(self.context) as css: - # bytesInJava = self._jdf \ - # .takeSampleToPython(withReplacement, num, long(seed)) \ - # .iterator() - # cls = _create_cls(self.schema()) - # return map(cls, self._collect_iterator_through_file(bytesInJava)) - @property def dtypes(self): """Return all column names and their data types as a list. @@ -400,7 +399,7 @@ class DataFrame(object): >>> df.dtypes [('age', 'int'), ('name', 'string')] """ - return [(str(f.name), f.dataType.simpleString()) for f in self.schema().fields] + return [(str(f.name), f.dataType.simpleString()) for f in self.schema.fields] @property def columns(self): @@ -409,7 +408,7 @@ class DataFrame(object): >>> df.columns [u'age', u'name'] """ - return [f.name for f in self.schema().fields] + return [f.name for f in self.schema.fields] def join(self, other, joinExprs=None, joinType=None): """ @@ -586,8 +585,8 @@ class DataFrame(object): >>> df.agg({"age": "max"}).collect() [Row(MAX(age#0)=5)] - >>> from pyspark.sql import Dsl - >>> df.agg(Dsl.min(df.age)).collect() + >>> from pyspark.sql import functions as F + >>> df.agg(F.min(df.age)).collect() [Row(MIN(age#0)=2)] """ return self.groupBy().agg(*exprs) @@ -616,18 +615,18 @@ class DataFrame(object): """ return DataFrame(getattr(self._jdf, "except")(other._jdf), self.sql_ctx) - def addColumn(self, colName, col): + def withColumn(self, colName, col): """ Return a new :class:`DataFrame` by adding a column. - >>> df.addColumn('age2', df.age + 2).collect() + >>> df.withColumn('age2', df.age + 2).collect() [Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)] """ return self.select('*', col.alias(colName)) - def renameColumn(self, existing, new): + def withColumnRenamed(self, existing, new): """ Rename an existing column to a new name - >>> df.renameColumn('age', 'age2').collect() + >>> df.withColumnRenamed('age', 'age2').collect() [Row(age2=2, name=u'Alice'), Row(age2=5, name=u'Bob')] """ cols = [Column(_to_java_column(c), self.sql_ctx).alias(new) @@ -635,11 +634,11 @@ class DataFrame(object): for c in self.columns] return self.select(*cols) - def to_pandas(self): + def toPandas(self): """ Collect all the rows and return a `pandas.DataFrame`. - >>> df.to_pandas() # doctest: +SKIP + >>> df.toPandas() # doctest: +SKIP age name 0 2 Alice 1 5 Bob @@ -687,10 +686,11 @@ class GroupedData(object): name to aggregate methods. >>> gdf = df.groupBy(df.name) - >>> gdf.agg({"age": "max"}).collect() - [Row(name=u'Bob', MAX(age#0)=5), Row(name=u'Alice', MAX(age#0)=2)] - >>> from pyspark.sql import Dsl - >>> gdf.agg(Dsl.min(df.age)).collect() + >>> gdf.agg({"*": "count"}).collect() + [Row(name=u'Bob', COUNT(1)=1), Row(name=u'Alice', COUNT(1)=1)] + + >>> from pyspark.sql import functions as F + >>> gdf.agg(F.min(df.age)).collect() [Row(MIN(age#0)=5), Row(MIN(age#0)=2)] """ assert exprs, "exprs should not be empty" @@ -742,12 +742,12 @@ class GroupedData(object): def _create_column_from_literal(literal): sc = SparkContext._active_spark_context - return sc._jvm.Dsl.lit(literal) + return sc._jvm.functions.lit(literal) def _create_column_from_name(name): sc = SparkContext._active_spark_context - return sc._jvm.Dsl.col(name) + return sc._jvm.functions.col(name) def _to_java_column(col): @@ -767,9 +767,9 @@ def _unary_op(name, doc="unary operator"): return _ -def _dsl_op(name, doc=''): +def _func_op(name, doc=''): def _(self): - jc = getattr(self._sc._jvm.Dsl, name)(self._jc) + jc = getattr(self._sc._jvm.functions, name)(self._jc) return Column(jc, self.sql_ctx) _.__doc__ = doc return _ @@ -818,7 +818,7 @@ class Column(DataFrame): super(Column, self).__init__(jc, sql_ctx) # arithmetic operators - __neg__ = _dsl_op("negate") + __neg__ = _func_op("negate") __add__ = _bin_op("plus") __sub__ = _bin_op("minus") __mul__ = _bin_op("multiply") @@ -842,7 +842,7 @@ class Column(DataFrame): # so use bitwise operators as boolean operators __and__ = _bin_op('and') __or__ = _bin_op('or') - __invert__ = _dsl_op('not') + __invert__ = _func_op('not') __rand__ = _bin_op("and") __ror__ = _bin_op("or") @@ -920,11 +920,11 @@ class Column(DataFrame): else: return 'Column<%s>' % self._jdf.toString() - def to_pandas(self): + def toPandas(self): """ Return a pandas.Series from the column - >>> df.age.to_pandas() # doctest: +SKIP + >>> df.age.toPandas() # doctest: +SKIP 0 2 1 5 dtype: int64 @@ -934,123 +934,6 @@ class Column(DataFrame): return pd.Series(data) -def _aggregate_func(name, doc=""): - """ Create a function for aggregator by name""" - def _(col): - sc = SparkContext._active_spark_context - jc = getattr(sc._jvm.Dsl, name)(_to_java_column(col)) - return Column(jc) - _.__name__ = name - _.__doc__ = doc - return staticmethod(_) - - -class UserDefinedFunction(object): - def __init__(self, func, returnType): - self.func = func - self.returnType = returnType - self._broadcast = None - self._judf = self._create_judf() - - def _create_judf(self): - f = self.func # put it in closure `func` - func = lambda _, it: imap(lambda x: f(*x), it) - ser = AutoBatchedSerializer(PickleSerializer()) - command = (func, None, ser, ser) - sc = SparkContext._active_spark_context - pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command, self) - ssql_ctx = sc._jvm.SQLContext(sc._jsc.sc()) - jdt = ssql_ctx.parseDataType(self.returnType.json()) - judf = sc._jvm.UserDefinedPythonFunction(f.__name__, bytearray(pickled_command), env, - includes, sc.pythonExec, broadcast_vars, - sc._javaAccumulator, jdt) - return judf - - def __del__(self): - if self._broadcast is not None: - self._broadcast.unpersist() - self._broadcast = None - - def __call__(self, *cols): - sc = SparkContext._active_spark_context - jcols = ListConverter().convert([_to_java_column(c) for c in cols], - sc._gateway._gateway_client) - jc = self._judf.apply(sc._jvm.PythonUtils.toSeq(jcols)) - return Column(jc) - - -class Dsl(object): - """ - A collections of builtin aggregators - """ - DSLS = { - 'lit': 'Creates a :class:`Column` of literal value.', - 'col': 'Returns a :class:`Column` based on the given column name.', - 'column': 'Returns a :class:`Column` based on the given column name.', - 'upper': 'Converts a string expression to upper case.', - 'lower': 'Converts a string expression to upper case.', - 'sqrt': 'Computes the square root of the specified float value.', - 'abs': 'Computes the absolutle value.', - - 'max': 'Aggregate function: returns the maximum value of the expression in a group.', - 'min': 'Aggregate function: returns the minimum value of the expression in a group.', - 'first': 'Aggregate function: returns the first value in a group.', - 'last': 'Aggregate function: returns the last value in a group.', - 'count': 'Aggregate function: returns the number of items in a group.', - 'sum': 'Aggregate function: returns the sum of all values in the expression.', - 'avg': 'Aggregate function: returns the average of the values in a group.', - 'mean': 'Aggregate function: returns the average of the values in a group.', - 'sumDistinct': 'Aggregate function: returns the sum of distinct values in the expression.', - } - - for _name, _doc in DSLS.items(): - locals()[_name] = _aggregate_func(_name, _doc) - del _name, _doc - - @staticmethod - def countDistinct(col, *cols): - """ Return a new Column for distinct count of (col, *cols) - - >>> from pyspark.sql import Dsl - >>> df.agg(Dsl.countDistinct(df.age, df.name).alias('c')).collect() - [Row(c=2)] - - >>> df.agg(Dsl.countDistinct("age", "name").alias('c')).collect() - [Row(c=2)] - """ - sc = SparkContext._active_spark_context - jcols = ListConverter().convert([_to_java_column(c) for c in cols], - sc._gateway._gateway_client) - jc = sc._jvm.Dsl.countDistinct(_to_java_column(col), - sc._jvm.PythonUtils.toSeq(jcols)) - return Column(jc) - - @staticmethod - def approxCountDistinct(col, rsd=None): - """ Return a new Column for approxiate distinct count of (col, *cols) - - >>> from pyspark.sql import Dsl - >>> df.agg(Dsl.approxCountDistinct(df.age).alias('c')).collect() - [Row(c=2)] - """ - sc = SparkContext._active_spark_context - if rsd is None: - jc = sc._jvm.Dsl.approxCountDistinct(_to_java_column(col)) - else: - jc = sc._jvm.Dsl.approxCountDistinct(_to_java_column(col), rsd) - return Column(jc) - - @staticmethod - def udf(f, returnType=StringType()): - """Create a user defined function (UDF) - - >>> slen = Dsl.udf(lambda s: len(s), IntegerType()) - >>> df.select(slen(df.name).alias('slen')).collect() - [Row(slen=5), Row(slen=3)] - """ - return UserDefinedFunction(f, returnType) - - def _test(): import doctest from pyspark.context import SparkContext @@ -1059,11 +942,9 @@ def _test(): globs = pyspark.sql.dataframe.__dict__.copy() sc = SparkContext('local[4]', 'PythonTest') globs['sc'] = sc - globs['sqlCtx'] = sqlCtx = SQLContext(sc) - rdd2 = sc.parallelize([Row(name='Alice', age=2), Row(name='Bob', age=5)]) - rdd3 = sc.parallelize([Row(name='Tom', height=80), Row(name='Bob', height=85)]) - globs['df'] = sqlCtx.inferSchema(rdd2) - globs['df2'] = sqlCtx.inferSchema(rdd3) + globs['sqlCtx'] = SQLContext(sc) + globs['df'] = sc.parallelize([Row(name='Alice', age=2), Row(name='Bob', age=5)]).toDF() + globs['df2'] = sc.parallelize([Row(name='Tom', height=80), Row(name='Bob', height=85)]).toDF() (failure_count, test_count) = doctest.testmod( pyspark.sql.dataframe, globs=globs, optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE) http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/python/pyspark/sql/functions.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py new file mode 100644 index 0000000..39aa550 --- /dev/null +++ b/python/pyspark/sql/functions.py @@ -0,0 +1,170 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +A collections of builtin functions +""" + +from itertools import imap + +from py4j.java_collections import ListConverter + +from pyspark import SparkContext +from pyspark.rdd import _prepare_for_python_RDD +from pyspark.serializers import PickleSerializer, AutoBatchedSerializer +from pyspark.sql.types import StringType +from pyspark.sql.dataframe import Column, _to_java_column + + +__all__ = ['countDistinct', 'approxCountDistinct', 'udf'] + + +def _create_function(name, doc=""): + """ Create a function for aggregator by name""" + def _(col): + sc = SparkContext._active_spark_context + jc = getattr(sc._jvm.functions, name)(_to_java_column(col)) + return Column(jc) + _.__name__ = name + _.__doc__ = doc + return _ + + +_functions = { + 'lit': 'Creates a :class:`Column` of literal value.', + 'col': 'Returns a :class:`Column` based on the given column name.', + 'column': 'Returns a :class:`Column` based on the given column name.', + 'upper': 'Converts a string expression to upper case.', + 'lower': 'Converts a string expression to upper case.', + 'sqrt': 'Computes the square root of the specified float value.', + 'abs': 'Computes the absolutle value.', + + 'max': 'Aggregate function: returns the maximum value of the expression in a group.', + 'min': 'Aggregate function: returns the minimum value of the expression in a group.', + 'first': 'Aggregate function: returns the first value in a group.', + 'last': 'Aggregate function: returns the last value in a group.', + 'count': 'Aggregate function: returns the number of items in a group.', + 'sum': 'Aggregate function: returns the sum of all values in the expression.', + 'avg': 'Aggregate function: returns the average of the values in a group.', + 'mean': 'Aggregate function: returns the average of the values in a group.', + 'sumDistinct': 'Aggregate function: returns the sum of distinct values in the expression.', +} + + +for _name, _doc in _functions.items(): + globals()[_name] = _create_function(_name, _doc) +del _name, _doc +__all__ += _functions.keys() + + +def countDistinct(col, *cols): + """ Return a new Column for distinct count of `col` or `cols` + + >>> df.agg(countDistinct(df.age, df.name).alias('c')).collect() + [Row(c=2)] + + >>> df.agg(countDistinct("age", "name").alias('c')).collect() + [Row(c=2)] + """ + sc = SparkContext._active_spark_context + jcols = ListConverter().convert([_to_java_column(c) for c in cols], sc._gateway._gateway_client) + jc = sc._jvm.functions.countDistinct(_to_java_column(col), sc._jvm.PythonUtils.toSeq(jcols)) + return Column(jc) + + +def approxCountDistinct(col, rsd=None): + """ Return a new Column for approximate distinct count of `col` + + >>> df.agg(approxCountDistinct(df.age).alias('c')).collect() + [Row(c=2)] + """ + sc = SparkContext._active_spark_context + if rsd is None: + jc = sc._jvm.functions.approxCountDistinct(_to_java_column(col)) + else: + jc = sc._jvm.functions.approxCountDistinct(_to_java_column(col), rsd) + return Column(jc) + + +class UserDefinedFunction(object): + """ + User defined function in Python + """ + def __init__(self, func, returnType): + self.func = func + self.returnType = returnType + self._broadcast = None + self._judf = self._create_judf() + + def _create_judf(self): + f = self.func # put it in closure `func` + func = lambda _, it: imap(lambda x: f(*x), it) + ser = AutoBatchedSerializer(PickleSerializer()) + command = (func, None, ser, ser) + sc = SparkContext._active_spark_context + pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command, self) + ssql_ctx = sc._jvm.SQLContext(sc._jsc.sc()) + jdt = ssql_ctx.parseDataType(self.returnType.json()) + judf = sc._jvm.UserDefinedPythonFunction(f.__name__, bytearray(pickled_command), env, + includes, sc.pythonExec, broadcast_vars, + sc._javaAccumulator, jdt) + return judf + + def __del__(self): + if self._broadcast is not None: + self._broadcast.unpersist() + self._broadcast = None + + def __call__(self, *cols): + sc = SparkContext._active_spark_context + jcols = ListConverter().convert([_to_java_column(c) for c in cols], + sc._gateway._gateway_client) + jc = self._judf.apply(sc._jvm.PythonUtils.toSeq(jcols)) + return Column(jc) + + +def udf(f, returnType=StringType()): + """Create a user defined function (UDF) + + >>> slen = udf(lambda s: len(s), IntegerType()) + >>> df.select(slen(df.name).alias('slen')).collect() + [Row(slen=5), Row(slen=3)] + """ + return UserDefinedFunction(f, returnType) + + +def _test(): + import doctest + from pyspark.context import SparkContext + from pyspark.sql import Row, SQLContext + import pyspark.sql.dataframe + globs = pyspark.sql.dataframe.__dict__.copy() + sc = SparkContext('local[4]', 'PythonTest') + globs['sc'] = sc + globs['sqlCtx'] = SQLContext(sc) + globs['df'] = sc.parallelize([Row(name='Alice', age=2), Row(name='Bob', age=5)]).toDF() + globs['df2'] = sc.parallelize([Row(name='Tom', height=80), Row(name='Bob', height=85)]).toDF() + (failure_count, test_count) = doctest.testmod( + pyspark.sql.dataframe, globs=globs, + optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE) + globs['sc'].stop() + if failure_count: + exit(-1) + + +if __name__ == "__main__": + _test() http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/python/pyspark/sql/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 43e5c3a..aa80bca 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -96,7 +96,7 @@ class SQLTests(ReusedPySparkTestCase): cls.sqlCtx = SQLContext(cls.sc) cls.testData = [Row(key=i, value=str(i)) for i in range(100)] rdd = cls.sc.parallelize(cls.testData) - cls.df = cls.sqlCtx.createDataFrame(rdd) + cls.df = rdd.toDF() @classmethod def tearDownClass(cls): @@ -138,7 +138,7 @@ class SQLTests(ReusedPySparkTestCase): df = self.sqlCtx.jsonRDD(rdd) df.count() df.collect() - df.schema() + df.schema # cache and checkpoint self.assertFalse(df.is_cached) @@ -155,11 +155,11 @@ class SQLTests(ReusedPySparkTestCase): def test_apply_schema_to_row(self): df = self.sqlCtx.jsonRDD(self.sc.parallelize(["""{"a":2}"""])) - df2 = self.sqlCtx.createDataFrame(df.map(lambda x: x), df.schema()) + df2 = self.sqlCtx.createDataFrame(df.map(lambda x: x), df.schema) self.assertEqual(df.collect(), df2.collect()) rdd = self.sc.parallelize(range(10)).map(lambda x: Row(a=x)) - df3 = self.sqlCtx.createDataFrame(rdd, df.schema()) + df3 = self.sqlCtx.createDataFrame(rdd, df.schema) self.assertEqual(10, df3.count()) def test_serialize_nested_array_and_map(self): @@ -195,7 +195,7 @@ class SQLTests(ReusedPySparkTestCase): self.assertEqual(1, result.head()[0]) df2 = self.sqlCtx.createDataFrame(rdd, samplingRatio=1.0) - self.assertEqual(df.schema(), df2.schema()) + self.assertEqual(df.schema, df2.schema) self.assertEqual({}, df2.map(lambda r: r.d).first()) self.assertEqual([None, ""], df2.map(lambda r: r.s).collect()) df2.registerTempTable("test2") @@ -204,8 +204,7 @@ class SQLTests(ReusedPySparkTestCase): def test_struct_in_map(self): d = [Row(m={Row(i=1): Row(s="")})] - rdd = self.sc.parallelize(d) - df = self.sqlCtx.createDataFrame(rdd) + df = self.sc.parallelize(d).toDF() k, v = df.head().m.items()[0] self.assertEqual(1, k.i) self.assertEqual("", v.s) @@ -213,8 +212,7 @@ class SQLTests(ReusedPySparkTestCase): def test_convert_row_to_dict(self): row = Row(l=[Row(a=1, b='s')], d={"key": Row(c=1.0, d="2")}) self.assertEqual(1, row.asDict()['l'][0].a) - rdd = self.sc.parallelize([row]) - df = self.sqlCtx.createDataFrame(rdd) + df = self.sc.parallelize([row]).toDF() df.registerTempTable("test") row = self.sqlCtx.sql("select l, d from test").head() self.assertEqual(1, row.asDict()["l"][0].a) @@ -223,9 +221,8 @@ class SQLTests(ReusedPySparkTestCase): def test_infer_schema_with_udt(self): from pyspark.sql.tests import ExamplePoint, ExamplePointUDT row = Row(label=1.0, point=ExamplePoint(1.0, 2.0)) - rdd = self.sc.parallelize([row]) - df = self.sqlCtx.createDataFrame(rdd) - schema = df.schema() + df = self.sc.parallelize([row]).toDF() + schema = df.schema field = [f for f in schema.fields if f.name == "point"][0] self.assertEqual(type(field.dataType), ExamplePointUDT) df.registerTempTable("labeled_point") @@ -238,15 +235,14 @@ class SQLTests(ReusedPySparkTestCase): rdd = self.sc.parallelize([row]) schema = StructType([StructField("label", DoubleType(), False), StructField("point", ExamplePointUDT(), False)]) - df = self.sqlCtx.createDataFrame(rdd, schema) + df = rdd.toDF(schema) point = df.head().point self.assertEquals(point, ExamplePoint(1.0, 2.0)) def test_parquet_with_udt(self): from pyspark.sql.tests import ExamplePoint row = Row(label=1.0, point=ExamplePoint(1.0, 2.0)) - rdd = self.sc.parallelize([row]) - df0 = self.sqlCtx.createDataFrame(rdd) + df0 = self.sc.parallelize([row]).toDF() output_dir = os.path.join(self.tempdir.name, "labeled_point") df0.saveAsParquetFile(output_dir) df1 = self.sqlCtx.parquetFile(output_dir) @@ -280,10 +276,11 @@ class SQLTests(ReusedPySparkTestCase): self.assertEqual([99, 100], sorted(g.agg({'key': 'max', 'value': 'count'}).collect()[0])) self.assertEqual([Row(**{"AVG(key#0)": 49.5})], g.mean().collect()) - from pyspark.sql import Dsl - self.assertEqual((0, u'99'), tuple(g.agg(Dsl.first(df.key), Dsl.last(df.value)).first())) - self.assertTrue(95 < g.agg(Dsl.approxCountDistinct(df.key)).first()[0]) - self.assertEqual(100, g.agg(Dsl.countDistinct(df.value)).first()[0]) + from pyspark.sql import functions + self.assertEqual((0, u'99'), + tuple(g.agg(functions.first(df.key), functions.last(df.value)).first())) + self.assertTrue(95 < g.agg(functions.approxCountDistinct(df.key)).first()[0]) + self.assertEqual(100, g.agg(functions.countDistinct(df.value)).first()[0]) def test_save_and_load(self): df = self.df @@ -339,8 +336,7 @@ class HiveContextSQLTests(ReusedPySparkTestCase): cls.sc._jvm.org.apache.spark.sql.hive.test.TestHiveContext(cls.sc._jsc.sc()) cls.sqlCtx = HiveContext(cls.sc, _scala_HiveContext) cls.testData = [Row(key=i, value=str(i)) for i in range(100)] - rdd = cls.sc.parallelize(cls.testData) - cls.df = cls.sqlCtx.inferSchema(rdd) + cls.df = cls.sc.parallelize(cls.testData).toDF() @classmethod def tearDownClass(cls): http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/python/run-tests ---------------------------------------------------------------------- diff --git a/python/run-tests b/python/run-tests index 077ad60..a2c2f37 100755 --- a/python/run-tests +++ b/python/run-tests @@ -35,7 +35,7 @@ rm -rf metastore warehouse function run_test() { echo "Running test: $1" | tee -a $LOG_FILE - SPARK_TESTING=1 time "$FWDIR"/bin/pyspark $1 >> $LOG_FILE 2>&1 + SPARK_TESTING=1 time "$FWDIR"/bin/pyspark $1 > $LOG_FILE 2>&1 FAILED=$((PIPESTATUS[0]||$FAILED)) @@ -67,6 +67,7 @@ function run_sql_tests() { run_test "pyspark/sql/types.py" run_test "pyspark/sql/context.py" run_test "pyspark/sql/dataframe.py" + run_test "pyspark/sql/functions.py" run_test "pyspark/sql/tests.py" } http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala ---------------------------------------------------------------------- diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala index 0cf2de6..05faef8 100644 --- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala @@ -137,7 +137,7 @@ private[repl] trait SparkILoopInit { command("import org.apache.spark.SparkContext._") command("import sqlContext.implicits._") command("import sqlContext.sql") - command("import org.apache.spark.sql.Dsl._") + command("import org.apache.spark.sql.functions._") } } http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala ---------------------------------------------------------------------- diff --git a/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala index 201f267..529914a 100644 --- a/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala +++ b/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala @@ -262,7 +262,7 @@ class ReplSuite extends FunSuite { |val sqlContext = new org.apache.spark.sql.SQLContext(sc) |import sqlContext.implicits._ |case class TestCaseClass(value: Int) - |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toDataFrame.collect() + |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toDF.collect() """.stripMargin) assertDoesNotContain("error:", output) assertDoesNotContain("Exception", output) http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala ---------------------------------------------------------------------- diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala index 1bd2a69..7a5e94d 100644 --- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -77,7 +77,7 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter) command("import org.apache.spark.SparkContext._") command("import sqlContext.implicits._") command("import sqlContext.sql") - command("import org.apache.spark.sql.Dsl._") + command("import org.apache.spark.sql.functions._") } } http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index f959a50..a7cd412 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -152,7 +152,7 @@ case class MultiAlias(child: Expression, names: Seq[String]) override lazy val resolved = false - override def newInstance = this + override def newInstance() = this override def withNullability(newNullability: Boolean) = this http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/sql/core/src/main/scala/org/apache/spark/sql/Column.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index 9d5d6e7..f6ecee1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -17,12 +17,11 @@ package org.apache.spark.sql -import scala.annotation.tailrec import scala.language.implicitConversions -import org.apache.spark.sql.Dsl.lit +import org.apache.spark.sql.functions.lit import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.logical.{Subquery, Project, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.{Project, LogicalPlan} import org.apache.spark.sql.catalyst.analysis.UnresolvedGetField import org.apache.spark.sql.types._ @@ -127,7 +126,7 @@ trait Column extends DataFrame { * df.select( -df("amount") ) * * // Java: - * import static org.apache.spark.sql.Dsl.*; + * import static org.apache.spark.sql.functions.*; * df.select( negate(col("amount") ); * }}} */ @@ -140,7 +139,7 @@ trait Column extends DataFrame { * df.filter( !df("isActive") ) * * // Java: - * import static org.apache.spark.sql.Dsl.*; + * import static org.apache.spark.sql.functions.*; * df.filter( not(df.col("isActive")) ); * }} */ @@ -153,7 +152,7 @@ trait Column extends DataFrame { * df.filter( df("colA") === df("colB") ) * * // Java - * import static org.apache.spark.sql.Dsl.*; + * import static org.apache.spark.sql.functions.*; * df.filter( col("colA").equalTo(col("colB")) ); * }}} */ @@ -168,7 +167,7 @@ trait Column extends DataFrame { * df.filter( df("colA") === df("colB") ) * * // Java - * import static org.apache.spark.sql.Dsl.*; + * import static org.apache.spark.sql.functions.*; * df.filter( col("colA").equalTo(col("colB")) ); * }}} */ @@ -182,7 +181,7 @@ trait Column extends DataFrame { * df.select( !(df("colA") === df("colB")) ) * * // Java: - * import static org.apache.spark.sql.Dsl.*; + * import static org.apache.spark.sql.functions.*; * df.filter( col("colA").notEqual(col("colB")) ); * }}} */ @@ -198,7 +197,7 @@ trait Column extends DataFrame { * df.select( !(df("colA") === df("colB")) ) * * // Java: - * import static org.apache.spark.sql.Dsl.*; + * import static org.apache.spark.sql.functions.*; * df.filter( col("colA").notEqual(col("colB")) ); * }}} */ @@ -213,7 +212,7 @@ trait Column extends DataFrame { * people.select( people("age") > 21 ) * * // Java: - * import static org.apache.spark.sql.Dsl.*; + * import static org.apache.spark.sql.functions.*; * people.select( people("age").gt(21) ); * }}} */ @@ -228,7 +227,7 @@ trait Column extends DataFrame { * people.select( people("age") > lit(21) ) * * // Java: - * import static org.apache.spark.sql.Dsl.*; + * import static org.apache.spark.sql.functions.*; * people.select( people("age").gt(21) ); * }}} */ http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 4f8f19e..e21e989 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -48,7 +48,7 @@ private[sql] object DataFrame { * }}} * * Once created, it can be manipulated using the various domain-specific-language (DSL) functions - * defined in: [[DataFrame]] (this class), [[Column]], [[Dsl]] for the DSL. + * defined in: [[DataFrame]] (this class), [[Column]], [[functions]] for the DSL. * * To select a column from the data frame, use the apply method: * {{{ @@ -94,27 +94,27 @@ trait DataFrame extends RDDApi[Row] with Serializable { } /** Left here for backward compatibility. */ - @deprecated("1.3.0", "use toDataFrame") + @deprecated("1.3.0", "use toDF") def toSchemaRDD: DataFrame = this /** * Returns the object itself. Used to force an implicit conversion from RDD to DataFrame in Scala. */ // This is declared with parentheses to prevent the Scala compiler from treating - // `rdd.toDataFrame("1")` as invoking this toDataFrame and then apply on the returned DataFrame. - def toDataFrame(): DataFrame = this + // `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame. + def toDF(): DataFrame = this /** * Returns a new [[DataFrame]] with columns renamed. This can be quite convenient in conversion * from a RDD of tuples into a [[DataFrame]] with meaningful names. For example: * {{{ * val rdd: RDD[(Int, String)] = ... - * rdd.toDataFrame // this implicit conversion creates a DataFrame with column name _1 and _2 - * rdd.toDataFrame("id", "name") // this creates a DataFrame with column name "id" and "name" + * rdd.toDF // this implicit conversion creates a DataFrame with column name _1 and _2 + * rdd.toDF("id", "name") // this creates a DataFrame with column name "id" and "name" * }}} */ @scala.annotation.varargs - def toDataFrame(colNames: String*): DataFrame + def toDF(colNames: String*): DataFrame /** Returns the schema of this [[DataFrame]]. */ def schema: StructType @@ -132,7 +132,7 @@ trait DataFrame extends RDDApi[Row] with Serializable { def explain(extended: Boolean): Unit /** Only prints the physical plan to the console for debugging purpose. */ - def explain(): Unit = explain(false) + def explain(): Unit = explain(extended = false) /** * Returns true if the `collect` and `take` methods can be run locally @@ -179,11 +179,11 @@ trait DataFrame extends RDDApi[Row] with Serializable { * * {{{ * // Scala: - * import org.apache.spark.sql.dsl._ + * import org.apache.spark.sql.functions._ * df1.join(df2, "outer", $"df1Key" === $"df2Key") * * // Java: - * import static org.apache.spark.sql.Dsl.*; + * import static org.apache.spark.sql.functions.*; * df1.join(df2, "outer", col("df1Key") === col("df2Key")); * }}} * @@ -483,12 +483,12 @@ trait DataFrame extends RDDApi[Row] with Serializable { /** * Returns a new [[DataFrame]] by adding a column. */ - def addColumn(colName: String, col: Column): DataFrame + def withColumn(colName: String, col: Column): DataFrame /** * Returns a new [[DataFrame]] with a column renamed. */ - def renameColumn(existingName: String, newName: String): DataFrame + def withColumnRenamed(existingName: String, newName: String): DataFrame /** * Returns the first `n` rows. @@ -520,6 +520,7 @@ trait DataFrame extends RDDApi[Row] with Serializable { * Returns a new RDD by applying a function to each partition of this DataFrame. */ override def mapPartitions[R: ClassTag](f: Iterator[Row] => Iterator[R]): RDD[R] + /** * Applies a function `f` to all rows. */ http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/sql/core/src/main/scala/org/apache/spark/sql/DataFrameHolder.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameHolder.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameHolder.scala new file mode 100644 index 0000000..a3187fe --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameHolder.scala @@ -0,0 +1,30 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql + +/** + * A container for a [[DataFrame]], used for implicit conversions. + */ +private[sql] case class DataFrameHolder(df: DataFrame) { + + // This is declared with parentheses to prevent the Scala compiler from treating + // `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame. + def toDF(): DataFrame = df + + def toDF(colNames: String*): DataFrame = df.toDF(colNames :_*) +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org