Repository: spark
Updated Branches:
  refs/heads/branch-1.3 c294216cd -> ef739d9ea


[SQL] Add toString to DataFrame/Column

Author: Michael Armbrust <mich...@databricks.com>

Closes #4436 from marmbrus/dfToString and squashes the following commits:

8a3c35f [Michael Armbrust] Merge remote-tracking branch 'origin/master' into 
dfToString
b72a81b [Michael Armbrust] add toString

(cherry picked from commit de80b1ba4d3c4b1b3316d482d62e4668b996f6ac)
Signed-off-by: Michael Armbrust <mich...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ef739d9e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ef739d9e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ef739d9e

Branch: refs/heads/branch-1.3
Commit: ef739d9ea3e2e71a7e7335d0c4228984545d49bb
Parents: c294216
Author: Michael Armbrust <mich...@databricks.com>
Authored: Tue Feb 10 13:14:01 2015 -0800
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Tue Feb 10 13:14:08 2015 -0800

----------------------------------------------------------------------
 python/pyspark/sql/dataframe.py                 |  2 +-
 .../sql/catalyst/expressions/Expression.scala   | 12 ++++++++
 .../catalyst/expressions/namedExpressions.scala | 20 ++++++++++++++
 .../scala/org/apache/spark/sql/DataFrame.scala  |  8 ++++++
 .../org/apache/spark/sql/DataFrameImpl.scala    | 10 +++----
 .../apache/spark/sql/IncomputableColumn.scala   |  2 ++
 .../spark/sql/execution/debug/package.scala     | 11 +++++++-
 .../org/apache/spark/sql/DataFrameSuite.scala   | 29 ++++++++++++++++++++
 8 files changed, 86 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ef739d9e/python/pyspark/sql/dataframe.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index cda704e..04be65f 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -447,7 +447,7 @@ class DataFrame(object):
         `select` that accepts SQL expressions.
 
         >>> df.selectExpr("age * 2", "abs(age)").collect()
-        [Row(('age * 2)=4, Abs('age)=2), Row(('age * 2)=10, Abs('age)=5)]
+        [Row((age * 2)=4, Abs(age)=2), Row((age * 2)=10, Abs(age)=5)]
         """
         jexpr = ListConverter().convert(expr, 
self._sc._gateway._gateway_client)
         jdf = self._jdf.selectExpr(self._sc._jvm.PythonUtils.toSeq(jexpr))

http://git-wip-us.apache.org/repos/asf/spark/blob/ef739d9e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
index cf14992..c32a4b8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.errors.TreeNodeException
 import org.apache.spark.sql.catalyst.trees
 import org.apache.spark.sql.catalyst.trees.TreeNode
@@ -67,6 +68,17 @@ abstract class Expression extends TreeNode[Expression] {
   def childrenResolved = !children.exists(!_.resolved)
 
   /**
+   * Returns a string representation of this expression that does not have 
developer centric
+   * debugging information like the expression id.
+   */
+  def prettyString: String = {
+    transform {
+      case a: AttributeReference => PrettyAttribute(a.name)
+      case u: UnresolvedAttribute => PrettyAttribute(u.name)
+    }.toString
+  }
+
+  /**
    * A set of helper functions that return the correct descendant of 
`scala.math.Numeric[T]` type
    * and do any casting necessary of child evaluation.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/ef739d9e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index e6ab1fd..7f122e9 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -190,6 +190,26 @@ case class AttributeReference(
   override def toString: String = s"$name#${exprId.id}$typeSuffix"
 }
 
+/**
+ * A place holder used when printing expressions without debugging information 
such as the
+ * expression id or the unresolved indicator.
+ */
+case class PrettyAttribute(name: String) extends Attribute with 
trees.LeafNode[Expression] {
+  type EvaluatedType = Any
+
+  override def toString = name
+
+  override def withNullability(newNullability: Boolean): Attribute = ???
+  override def newInstance(): Attribute = ???
+  override def withQualifiers(newQualifiers: Seq[String]): Attribute = ???
+  override def withName(newName: String): Attribute = ???
+  override def qualifiers: Seq[String] = ???
+  override def exprId: ExprId = ???
+  override def eval(input: Row): EvaluatedType = ???
+  override def nullable: Boolean = ???
+  override def dataType: DataType = ???
+}
+
 object VirtualColumn {
   val groupingIdName = "grouping__id"
   def newGroupingId = AttributeReference(groupingIdName, IntegerType, false)()

http://git-wip-us.apache.org/repos/asf/spark/blob/ef739d9e/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 6abfb78..04e0d09 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -27,6 +27,8 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.Utils
 
+import scala.util.control.NonFatal
+
 
 private[sql] object DataFrame {
   def apply(sqlContext: SQLContext, logicalPlan: LogicalPlan): DataFrame = {
@@ -92,6 +94,12 @@ trait DataFrame extends RDDApi[Row] {
    */
   def toDataFrame: DataFrame = this
 
+  override def toString =
+     try schema.map(f => s"${f.name}: 
${f.dataType.simpleString}").mkString("[", ", ", "]") catch {
+       case NonFatal(e) =>
+         s"Invalid tree; ${e.getMessage}:\n$queryExecution"
+     }
+
   /**
    * Returns a new [[DataFrame]] with columns renamed. This can be quite 
convenient in conversion
    * from a RDD of tuples into a [[DataFrame]] with meaningful names. For 
example:

http://git-wip-us.apache.org/repos/asf/spark/blob/ef739d9e/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
index 7339329..1ee16ad 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
@@ -201,13 +201,11 @@ private[sql] class DataFrameImpl protected[sql](
   override def as(alias: Symbol): DataFrame = Subquery(alias.name, logicalPlan)
 
   override def select(cols: Column*): DataFrame = {
-    val exprs = cols.zipWithIndex.map {
-      case (Column(expr: NamedExpression), _) =>
-        expr
-      case (Column(expr: Expression), _) =>
-        Alias(expr, expr.toString)()
+    val namedExpressions = cols.map {
+      case Column(expr: NamedExpression) => expr
+      case Column(expr: Expression) => Alias(expr, expr.prettyString)()
     }
-    Project(exprs.toSeq, logicalPlan)
+    Project(namedExpressions.toSeq, logicalPlan)
   }
 
   override def select(col: String, cols: String*): DataFrame = {

http://git-wip-us.apache.org/repos/asf/spark/blob/ef739d9e/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
index 0600dcc..ce0557b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
@@ -40,6 +40,8 @@ private[sql] class IncomputableColumn(protected[sql] val 
expr: Expression) exten
     throw new UnsupportedOperationException("Cannot run this method on an 
UncomputableColumn")
   }
 
+  override def toString = expr.prettyString
+
   override def isComputable: Boolean = false
 
   override val sqlContext: SQLContext = null

http://git-wip-us.apache.org/repos/asf/spark/blob/ef739d9e/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index 5cc67cd..acef49a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -22,7 +22,7 @@ import scala.collection.mutable.HashSet
 import org.apache.spark.{AccumulatorParam, Accumulator, SparkContext}
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.SparkContext._
-import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.{SQLConf, SQLContext, DataFrame, Row}
 import org.apache.spark.sql.catalyst.trees.TreeNodeRef
 import org.apache.spark.sql.types._
 
@@ -38,6 +38,15 @@ import org.apache.spark.sql.types._
 package object debug {
 
   /**
+   * Augments [[SQLContext]] with debug methods.
+   */
+  implicit class DebugSQLContext(sqlContext: SQLContext) {
+    def debug() = {
+      sqlContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, "false")
+    }
+  }
+
+  /**
    * :: DeveloperApi ::
    * Augments [[DataFrame]]s with debug methods.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/ef739d9e/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 5aa3db7..02623f7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql
 
+import org.apache.spark.sql.TestData._
+
 import scala.language.postfixOps
 
 import org.apache.spark.sql.Dsl._
@@ -53,6 +55,33 @@ class DataFrameSuite extends QueryTest {
     TestSQLContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, 
oldSetting.toString)
   }
 
+  test("dataframe toString") {
+    assert(testData.toString === "[key: int, value: string]")
+    assert(testData("key").toString === "[key: int]")
+  }
+
+  test("incomputable toString") {
+    assert($"test".toString === "test")
+  }
+
+  test("invalid plan toString, debug mode") {
+    val oldSetting = TestSQLContext.conf.dataFrameEagerAnalysis
+    TestSQLContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, "true")
+
+    // Turn on debug mode so we can see invalid query plans.
+    import org.apache.spark.sql.execution.debug._
+    TestSQLContext.debug()
+
+    val badPlan = testData.select('badColumn)
+
+    assert(badPlan.toString contains badPlan.queryExecution.toString,
+      "toString on bad query plans should include the query execution but 
was:\n" +
+        badPlan.toString)
+
+    // Set the flag back to original value before this test.
+    TestSQLContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, 
oldSetting.toString)
+  }
+
   test("table scan") {
     checkAnswer(
       testData,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to