Repository: spark
Updated Branches:
  refs/heads/branch-1.3 c515634ef -> e1996aafa


[SPARK-5643][SQL] Add a show method to print the content of a DataFrame in 
tabular format.

An example:
```
year  month AVG('Adj Close) MAX('Adj Close)
1980  12    0.503218        0.595103
1981  01    0.523289        0.570307
1982  02    0.436504        0.475256
1983  03    0.410516        0.442194
1984  04    0.450090        0.483521
```

Author: Reynold Xin <r...@databricks.com>

Closes #4416 from rxin/SPARK-5643 and squashes the following commits:

d0e0d6e [Reynold Xin] [SQL] Minor update to data source and statistics 
documentation.
269da83 [Reynold Xin] Updated isLocal comment.
2cf3c27 [Reynold Xin] Moved logic into optimizer.
1a04d8b [Reynold Xin] [SPARK-5643][SQL] Add a show method to print the content 
of a DataFrame in columnar format.

(cherry picked from commit a052ed42501fee3641348337505b6176426653c4)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e1996aaf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e1996aaf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e1996aaf

Branch: refs/heads/branch-1.3
Commit: e1996aafadec95fb365b1ce1b87300441cd272ef
Parents: c515634
Author: Reynold Xin <r...@databricks.com>
Authored: Sun Feb 8 18:56:51 2015 -0800
Committer: Reynold Xin <r...@databricks.com>
Committed: Sun Feb 8 18:57:03 2015 -0800

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/Optimizer.scala      | 18 ++++++-
 .../catalyst/plans/logical/LogicalPlan.scala    |  7 ++-
 .../optimizer/ConvertToLocalRelationSuite.scala | 57 ++++++++++++++++++++
 .../scala/org/apache/spark/sql/DataFrame.scala  | 21 +++++++-
 .../org/apache/spark/sql/DataFrameImpl.scala    | 41 ++++++++++++--
 .../apache/spark/sql/IncomputableColumn.scala   |  6 ++-
 .../spark/sql/execution/basicOperators.scala    |  7 +--
 .../apache/spark/sql/sources/interfaces.scala   | 15 +++---
 8 files changed, 151 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e1996aaf/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 8c8f289..3bc48c9 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -50,7 +50,9 @@ object DefaultOptimizer extends Optimizer {
       CombineFilters,
       PushPredicateThroughProject,
       PushPredicateThroughJoin,
-      ColumnPruning) :: Nil
+      ColumnPruning) ::
+    Batch("LocalRelation", FixedPoint(100),
+      ConvertToLocalRelation) :: Nil
 }
 
 /**
@@ -610,3 +612,17 @@ object DecimalAggregates extends Rule[LogicalPlan] {
         DecimalType(prec + 4, scale + 4))
   }
 }
+
+/**
+ * Converts local operations (i.e. ones that don't require data exchange) on 
LocalRelation to
+ * another LocalRelation.
+ *
+ * This is relatively simple as it currently handles only a single case: 
Project.
+ */
+object ConvertToLocalRelation extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    case Project(projectList, LocalRelation(output, data)) =>
+      val projection = new InterpretedProjection(projectList, output)
+      LocalRelation(projectList.map(_.toAttribute), data.map(projection))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e1996aaf/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 8d30528..7cf4b81 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -29,12 +29,15 @@ import org.apache.spark.sql.catalyst.trees
 /**
  * Estimates of various statistics.  The default estimation logic simply 
lazily multiplies the
  * corresponding statistic produced by the children.  To override this 
behavior, override
- * `statistics` and assign it an overriden version of `Statistics`.
+ * `statistics` and assign it an overridden version of `Statistics`.
  *
- * '''NOTE''': concrete and/or overriden versions of statistics fields should 
pay attention to the
+ * '''NOTE''': concrete and/or overridden versions of statistics fields should 
pay attention to the
  * performance of the implementations.  The reason is that estimations might 
get triggered in
  * performance-critical processes, such as query plan planning.
  *
+ * Note that we are using a BigInt here since it is easy to overflow a 64-bit 
integer in
+ * cardinality estimation (e.g. cartesian joins).
+ *
  * @param sizeInBytes Physical size in bytes. For leaf operators this defaults 
to 1, otherwise it
  *                    defaults to the product of children's `sizeInBytes`.
  */

http://git-wip-us.apache.org/repos/asf/spark/blob/e1996aaf/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConvertToLocalRelationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConvertToLocalRelationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConvertToLocalRelationSuite.scala
new file mode 100644
index 0000000..cf42d43
--- /dev/null
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConvertToLocalRelationSuite.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+
+
+class ConvertToLocalRelationSuite extends PlanTest {
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+    val batches =
+      Batch("LocalRelation", FixedPoint(100),
+        ConvertToLocalRelation) :: Nil
+  }
+
+  test("Project on LocalRelation should be turned into a single 
LocalRelation") {
+    val testRelation = LocalRelation(
+      LocalRelation('a.int, 'b.int).output,
+      Row(1, 2) ::
+      Row(4, 5) :: Nil)
+
+    val correctAnswer = LocalRelation(
+      LocalRelation('a1.int, 'b1.int).output,
+      Row(1, 3) ::
+      Row(4, 6) :: Nil)
+
+    val projectOnLocal = testRelation.select(
+      UnresolvedAttribute("a").as("a1"),
+      (UnresolvedAttribute("b") + 1).as("b1"))
+
+    val optimized = Optimize(projectOnLocal.analyze)
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e1996aaf/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 8ad6526..17ea3cd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -102,7 +102,7 @@ trait DataFrame extends RDDApi[Row] {
    * }}}
    */
   @scala.annotation.varargs
-  def toDataFrame(colName: String, colNames: String*): DataFrame
+  def toDataFrame(colNames: String*): DataFrame
 
   /** Returns the schema of this [[DataFrame]]. */
   def schema: StructType
@@ -117,6 +117,25 @@ trait DataFrame extends RDDApi[Row] {
   def printSchema(): Unit
 
   /**
+   * Returns true if the `collect` and `take` methods can be run locally
+   * (without any Spark executors).
+   */
+  def isLocal: Boolean
+
+  /**
+   * Displays the [[DataFrame]] in a tabular form. For example:
+   * {{{
+   *   year  month AVG('Adj Close) MAX('Adj Close)
+   *   1980  12    0.503218        0.595103
+   *   1981  01    0.523289        0.570307
+   *   1982  02    0.436504        0.475256
+   *   1983  03    0.410516        0.442194
+   *   1984  04    0.450090        0.483521
+   * }}}
+   */
+  def show(): Unit
+
+  /**
    * Cartesian join with another [[DataFrame]].
    *
    * Note that cartesian joins are very expensive without an extra filter that 
can be pushed down.

http://git-wip-us.apache.org/repos/asf/spark/blob/e1996aaf/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
index 789bcf6..fa05a5d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
@@ -90,14 +90,13 @@ private[sql] class DataFrameImpl protected[sql](
     }
   }
 
-  override def toDataFrame(colName: String, colNames: String*): DataFrame = {
-    val newNames = colName +: colNames
-    require(schema.size == newNames.size,
+  override def toDataFrame(colNames: String*): DataFrame = {
+    require(schema.size == colNames.size,
       "The number of columns doesn't match.\n" +
         "Old column names: " + schema.fields.map(_.name).mkString(", ") + "\n" 
+
-        "New column names: " + newNames.mkString(", "))
+        "New column names: " + colNames.mkString(", "))
 
-    val newCols = schema.fieldNames.zip(newNames).map { case (oldName, 
newName) =>
+    val newCols = schema.fieldNames.zip(colNames).map { case (oldName, 
newName) =>
       apply(oldName).as(newName)
     }
     select(newCols :_*)
@@ -113,6 +112,38 @@ private[sql] class DataFrameImpl protected[sql](
 
   override def printSchema(): Unit = println(schema.treeString)
 
+  override def isLocal: Boolean = {
+    logicalPlan.isInstanceOf[LocalRelation]
+  }
+
+  override def show(): Unit = {
+    val data = take(20)
+    val numCols = schema.fieldNames.length
+
+    // For cells that are beyond 20 characters, replace it with the first 17 
and "..."
+    val rows: Seq[Seq[String]] = schema.fieldNames.toSeq +: data.map { row =>
+      row.toSeq.map { cell =>
+        val str = if (cell == null) "null" else cell.toString
+        if (str.length > 20) str.substring(0, 17) + "..." else str
+      } : Seq[String]
+    }
+
+    // Compute the width of each column
+    val colWidths = Array.fill(numCols)(0)
+    for (row <- rows) {
+      for ((cell, i) <- row.zipWithIndex)  {
+        colWidths(i) = math.max(colWidths(i), cell.length)
+      }
+    }
+
+    // Pad the cells and print them
+    println(rows.map { row =>
+      row.zipWithIndex.map { case (cell, i) =>
+        String.format(s"%-${colWidths(i)}s", cell)
+      }.mkString(" ")
+    }.mkString("\n"))
+  }
+
   override def join(right: DataFrame): DataFrame = {
     Join(logicalPlan, right.logicalPlan, joinType = Inner, None)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/e1996aaf/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
index 6043fb4..782f6e2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
@@ -48,7 +48,7 @@ private[sql] class IncomputableColumn(protected[sql] val 
expr: Expression) exten
 
   protected[sql] override def logicalPlan: LogicalPlan = err()
 
-  override def toDataFrame(colName: String, colNames: String*): DataFrame = 
err()
+  override def toDataFrame(colNames: String*): DataFrame = err()
 
   override def schema: StructType = err()
 
@@ -58,6 +58,10 @@ private[sql] class IncomputableColumn(protected[sql] val 
expr: Expression) exten
 
   override def printSchema(): Unit = err()
 
+  override def show(): Unit = err()
+
+  override def isLocal: Boolean = false
+
   override def join(right: DataFrame): DataFrame = err()
 
   override def join(right: DataFrame, joinExprs: Column): DataFrame = err()

http://git-wip-us.apache.org/repos/asf/spark/blob/e1996aaf/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 66aed5d..4dc506c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -17,9 +17,6 @@
 
 package org.apache.spark.sql.execution
 
-import scala.collection.mutable.ArrayBuffer
-import scala.reflect.runtime.universe.TypeTag
-
 import org.apache.spark.{SparkEnv, HashPartitioner, SparkConf}
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.{RDD, ShuffledRDD}
@@ -40,7 +37,7 @@ case class Project(projectList: Seq[NamedExpression], child: 
SparkPlan) extends
 
   @transient lazy val buildProjection = newMutableProjection(projectList, 
child.output)
 
-  def execute() = child.execute().mapPartitions { iter =>
+  override def execute() = child.execute().mapPartitions { iter =>
     val resuableProjection = buildProjection()
     iter.map(resuableProjection)
   }
@@ -55,7 +52,7 @@ case class Filter(condition: Expression, child: SparkPlan) 
extends UnaryNode {
 
   @transient lazy val conditionEvaluator = newPredicate(condition, 
child.output)
 
-  def execute() = child.execute().mapPartitions { iter =>
+  override def execute() = child.execute().mapPartitions { iter =>
     iter.filter(conditionEvaluator)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e1996aaf/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index a640ba5..5eecc30 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -87,13 +87,13 @@ trait CreatableRelationProvider {
 
 /**
  * ::DeveloperApi::
- * Represents a collection of tuples with a known schema.  Classes that extend 
BaseRelation must
- * be able to produce the schema of their data in the form of a [[StructType]] 
 Concrete
+ * Represents a collection of tuples with a known schema. Classes that extend 
BaseRelation must
+ * be able to produce the schema of their data in the form of a 
[[StructType]]. Concrete
  * implementation should inherit from one of the descendant `Scan` classes, 
which define various
  * abstract methods for execution.
  *
  * BaseRelations must also define a equality function that only returns true 
when the two
- * instances will return the same data.  This equality function is used when 
determining when
+ * instances will return the same data. This equality function is used when 
determining when
  * it is safe to substitute cached results for a given relation.
  */
 @DeveloperApi
@@ -102,13 +102,16 @@ abstract class BaseRelation {
   def schema: StructType
 
   /**
-   * Returns an estimated size of this relation in bytes.  This information is 
used by the planner
+   * Returns an estimated size of this relation in bytes. This information is 
used by the planner
    * to decided when it is safe to broadcast a relation and can be overridden 
by sources that
    * know the size ahead of time. By default, the system will assume that 
tables are too
-   * large to broadcast.  This method will be called multiple times during 
query planning
+   * large to broadcast. This method will be called multiple times during 
query planning
    * and thus should not perform expensive operations for each invocation.
+   *
+   * Note that it is always better to overestimate size than underestimate, 
because underestimation
+   * could lead to execution plans that are suboptimal (i.e. broadcasting a 
very large table).
    */
-  def sizeInBytes = sqlContext.conf.defaultSizeInBytes
+  def sizeInBytes: Long = sqlContext.conf.defaultSizeInBytes
 }
 
 /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to