Repository: spark Updated Branches: refs/heads/master fa6bdc6e8 -> d931b01dc
[SQL] Two DataFrame fixes. - Removed DataFrame.apply for projection & filtering since they are extremely confusing. - Added implicits for RDD[Int], RDD[Long], and RDD[String] Author: Reynold Xin <r...@databricks.com> Closes #4543 from rxin/df-cleanup and squashes the following commits: 81ec915 [Reynold Xin] [SQL] More DataFrame fixes. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d931b01d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d931b01d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d931b01d Branch: refs/heads/master Commit: d931b01dcaaf009dcf68dcfe83428bd7f9e857cc Parents: fa6bdc6 Author: Reynold Xin <r...@databricks.com> Authored: Wed Feb 11 18:32:48 2015 -0800 Committer: Reynold Xin <r...@databricks.com> Committed: Wed Feb 11 18:32:48 2015 -0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/sql/DataFrame.scala | 39 ++++---------- .../org/apache/spark/sql/DataFrameImpl.scala | 24 +++------ .../apache/spark/sql/IncomputableColumn.scala | 4 -- .../scala/org/apache/spark/sql/SQLContext.scala | 54 ++++++++++++++++--- .../spark/sql/DataFrameImplicitsSuite.scala | 55 ++++++++++++++++++++ 5 files changed, 119 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d931b01d/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 17900c5..327cf87 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -85,6 +85,14 @@ trait DataFrame extends RDDApi[Row] { protected[sql] def logicalPlan: LogicalPlan + override def toString = + try { + schema.map(f => s"${f.name}: ${f.dataType.simpleString}").mkString("[", ", ", "]") + } catch { + case NonFatal(e) => + s"Invalid tree; ${e.getMessage}:\n$queryExecution" + } + /** Left here for backward compatibility. */ @deprecated("1.3.0", "use toDataFrame") def toSchemaRDD: DataFrame = this @@ -92,13 +100,9 @@ trait DataFrame extends RDDApi[Row] { /** * Returns the object itself. Used to force an implicit conversion from RDD to DataFrame in Scala. */ - def toDataFrame: DataFrame = this - - override def toString = - try schema.map(f => s"${f.name}: ${f.dataType.simpleString}").mkString("[", ", ", "]") catch { - case NonFatal(e) => - s"Invalid tree; ${e.getMessage}:\n$queryExecution" - } + // This is declared with parentheses to prevent the Scala compiler from treating + // `rdd.toDataFrame("1")` as invoking this toDataFrame and then apply on the returned DataFrame. + def toDataFrame(): DataFrame = this /** * Returns a new [[DataFrame]] with columns renamed. This can be quite convenient in conversion @@ -235,16 +239,6 @@ trait DataFrame extends RDDApi[Row] { def col(colName: String): Column /** - * Selects a set of expressions, wrapped in a Product. - * {{{ - * // The following two are equivalent: - * df.apply(($"colA", $"colB" + 1)) - * df.select($"colA", $"colB" + 1) - * }}} - */ - def apply(projection: Product): DataFrame - - /** * Returns a new [[DataFrame]] with an alias set. */ def as(alias: String): DataFrame @@ -318,17 +312,6 @@ trait DataFrame extends RDDApi[Row] { def where(condition: Column): DataFrame /** - * Filters rows using the given condition. This is a shorthand meant for Scala. - * {{{ - * // The following are equivalent: - * peopleDf.filter($"age" > 15) - * peopleDf.where($"age" > 15) - * peopleDf($"age" > 15) - * }}} - */ - def apply(condition: Column): DataFrame - - /** * Groups the [[DataFrame]] using the specified columns, so we can run aggregation on them. * See [[GroupedData]] for all the available aggregate functions. * http://git-wip-us.apache.org/repos/asf/spark/blob/d931b01d/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala index 41da442..3863df5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala @@ -49,8 +49,10 @@ private[sql] class DataFrameImpl protected[sql]( extends DataFrame { /** - * A constructor that automatically analyzes the logical plan. This reports error eagerly - * as the [[DataFrame]] is constructed. + * A constructor that automatically analyzes the logical plan. + * + * This reports error eagerly as the [[DataFrame]] is constructed, unless + * [[SQLConf.dataFrameEagerAnalysis]] is turned off. */ def this(sqlContext: SQLContext, logicalPlan: LogicalPlan) = { this(sqlContext, { @@ -158,7 +160,7 @@ private[sql] class DataFrameImpl protected[sql]( } override def show(): Unit = { - println(showString) + println(showString()) } override def join(right: DataFrame): DataFrame = { @@ -205,14 +207,6 @@ private[sql] class DataFrameImpl protected[sql]( Column(sqlContext, Project(Seq(expr), logicalPlan), expr) } - override def apply(projection: Product): DataFrame = { - require(projection.productArity >= 1) - select(projection.productIterator.map { - case c: Column => c - case o: Any => Column(Literal(o)) - }.toSeq :_*) - } - override def as(alias: String): DataFrame = Subquery(alias, logicalPlan) override def as(alias: Symbol): DataFrame = Subquery(alias.name, logicalPlan) @@ -259,10 +253,6 @@ private[sql] class DataFrameImpl protected[sql]( filter(condition) } - override def apply(condition: Column): DataFrame = { - filter(condition) - } - override def groupBy(cols: Column*): GroupedData = { new GroupedData(this, cols.map(_.expr)) } @@ -323,7 +313,7 @@ private[sql] class DataFrameImpl protected[sql]( override def count(): Long = groupBy().count().rdd.collect().head.getLong(0) override def repartition(numPartitions: Int): DataFrame = { - sqlContext.applySchema(rdd.repartition(numPartitions), schema) + sqlContext.createDataFrame(rdd.repartition(numPartitions), schema) } override def distinct: DataFrame = Distinct(logicalPlan) @@ -401,7 +391,7 @@ private[sql] class DataFrameImpl protected[sql]( val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null) new Iterator[String] { - override def hasNext() = iter.hasNext + override def hasNext = iter.hasNext override def next(): String = { JsonRDD.rowToJSON(rowSchema, gen)(iter.next()) gen.flush() http://git-wip-us.apache.org/repos/asf/spark/blob/d931b01d/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala index 494e49c..4f9d92d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala @@ -80,8 +80,6 @@ private[sql] class IncomputableColumn(protected[sql] val expr: Expression) exten override def col(colName: String): Column = err() - override def apply(projection: Product): DataFrame = err() - override def select(cols: Column*): DataFrame = err() override def select(col: String, cols: String*): DataFrame = err() @@ -98,8 +96,6 @@ private[sql] class IncomputableColumn(protected[sql] val expr: Expression) exten override def where(condition: Column): DataFrame = err() - override def apply(condition: Column): DataFrame = err() - override def groupBy(cols: Column*): GroupedData = err() override def groupBy(col1: String, cols: String*): GroupedData = err() http://git-wip-us.apache.org/repos/asf/spark/blob/d931b01d/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index fd121ce..ca5e62f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -180,21 +180,59 @@ class SQLContext(@transient val sparkContext: SparkContext) */ object implicits { // scalastyle:on - /** - * Creates a DataFrame from an RDD of case classes. - * - * @group userf - */ + + /** Creates a DataFrame from an RDD of case classes or tuples. */ implicit def rddToDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame = { self.createDataFrame(rdd) } - /** - * Creates a DataFrame from a local Seq of Product. - */ + /** Creates a DataFrame from a local Seq of Product. */ implicit def localSeqToDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame = { self.createDataFrame(data) } + + // Do NOT add more implicit conversions. They are likely to break source compatibility by + // making existing implicit conversions ambiguous. In particular, RDD[Double] is dangerous + // because of [[DoubleRDDFunctions]]. + + /** Creates a single column DataFrame from an RDD[Int]. */ + implicit def intRddToDataFrame(data: RDD[Int]): DataFrame = { + val dataType = IntegerType + val rows = data.mapPartitions { iter => + val row = new SpecificMutableRow(dataType :: Nil) + iter.map { v => + row.setInt(0, v) + row: Row + } + } + self.createDataFrame(rows, StructType(StructField("_1", dataType) :: Nil)) + } + + /** Creates a single column DataFrame from an RDD[Long]. */ + implicit def longRddToDataFrame(data: RDD[Long]): DataFrame = { + val dataType = LongType + val rows = data.mapPartitions { iter => + val row = new SpecificMutableRow(dataType :: Nil) + iter.map { v => + row.setLong(0, v) + row: Row + } + } + self.createDataFrame(rows, StructType(StructField("_1", dataType) :: Nil)) + } + + /** Creates a single column DataFrame from an RDD[String]. */ + implicit def stringRddToDataFrame(data: RDD[String]): DataFrame = { + val dataType = StringType + val rows = data.mapPartitions { iter => + val row = new SpecificMutableRow(dataType :: Nil) + iter.map { v => + row.setString(0, v) + row: Row + } + } + self.createDataFrame(rows, StructType(StructField("_1", dataType) :: Nil)) + } } /** http://git-wip-us.apache.org/repos/asf/spark/blob/d931b01d/sql/core/src/test/scala/org/apache/spark/sql/DataFrameImplicitsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameImplicitsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameImplicitsSuite.scala new file mode 100644 index 0000000..8fa830d --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameImplicitsSuite.scala @@ -0,0 +1,55 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql + +import org.apache.spark.sql.test.TestSQLContext.{sparkContext => sc} +import org.apache.spark.sql.test.TestSQLContext.implicits._ + + +class DataFrameImplicitsSuite extends QueryTest { + + test("RDD of tuples") { + checkAnswer( + sc.parallelize(1 to 10).map(i => (i, i.toString)).toDataFrame("intCol", "strCol"), + (1 to 10).map(i => Row(i, i.toString))) + } + + test("Seq of tuples") { + checkAnswer( + (1 to 10).map(i => (i, i.toString)).toDataFrame("intCol", "strCol"), + (1 to 10).map(i => Row(i, i.toString))) + } + + test("RDD[Int]") { + checkAnswer( + sc.parallelize(1 to 10).toDataFrame("intCol"), + (1 to 10).map(i => Row(i))) + } + + test("RDD[Long]") { + checkAnswer( + sc.parallelize(1L to 10L).toDataFrame("longCol"), + (1L to 10L).map(i => Row(i))) + } + + test("RDD[String]") { + checkAnswer( + sc.parallelize(1 to 10).map(_.toString).toDataFrame("stringCol"), + (1 to 10).map(i => Row(i.toString))) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org