[SPARK-12593][SQL] Converts resolved logical plan back to SQL

This PR tries to enable Spark SQL to convert resolved logical plans back to SQL 
query strings.  For now, the major use case is to canonicalize Spark SQL native 
view support.  The major entry point is `SQLBuilder.toSQL`, which returns an 
`Option[String]` if the logical plan is recognized.

The current version is still in WIP status, and is quite limited.  Known 
limitations include:

1.  The logical plan must be analyzed but not optimized

    The optimizer erases `Subquery` operators, which contain necessary scope 
information for SQL generation.  Future versions should be able to recover 
erased scope information by inserting subqueries when necessary.

1.  The logical plan must be created using HiveQL query string

    Query plans generated by composing arbitrary DataFrame API combinations are 
not supported yet.  Operators within these query plans need to be rearranged 
into a canonical form that is more suitable for direct SQL generation.  For 
example, the following query plan

    ```
    Filter (a#1 < 10)
     +- MetastoreRelation default, src, None
    ```

    need to be canonicalized into the following form before SQL generation:

    ```
    Project [a#1, b#2, c#3]
     +- Filter (a#1 < 10)
         +- MetastoreRelation default, src, None
    ```

    Otherwise, the SQL generation process will have to handle a large number of 
special cases.

1.  Only a fraction of expressions and basic logical plan operators are 
supported in this PR

    Currently, 95.7% (1720 out of 1798) query plans in `HiveCompatibilitySuite` 
can be successfully converted to SQL query strings.

    Known unsupported components are:

    - Expressions
      - Part of math expressions
      - Part of string expressions (buggy?)
      - Null expressions
      - Calendar interval literal
      - Part of date time expressions
      - Complex type creators
      - Special `NOT` expressions, e.g. `NOT LIKE` and `NOT IN`
    - Logical plan operators/patterns
      - Cube, rollup, and grouping set
      - Script transformation
      - Generator
      - Distinct aggregation patterns that fit `DistinctAggregationRewriter` 
analysis rule
      - Window functions

    Support for window functions, generators, and cubes etc. will be added in 
follow-up PRs.

This PR leverages `HiveCompatibilitySuite` for testing SQL generation in a 
"round-trip" manner:

*   For all select queries, we try to convert it back to SQL
*   If the query plan is convertible, we parse the generated SQL into a new 
logical plan
*   Run the new logical plan instead of the original one

If the query plan is inconvertible, the test case simply falls back to the 
original logic.

TODO

- [x] Fix failed test cases
- [x] Support for more basic expressions and logical plan operators (e.g. 
distinct aggregation etc.)
- [x] Comments and documentation

Author: Cheng Lian <l...@databricks.com>

Closes #10541 from liancheng/sql-generation.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d9447cac
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d9447cac
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d9447cac

Branch: refs/heads/master
Commit: d9447cac747823e71b676c08c75f4aab34de12a2
Parents: 659fd9d
Author: Cheng Lian <l...@databricks.com>
Authored: Fri Jan 8 14:08:13 2016 -0800
Committer: Yin Huai <yh...@databricks.com>
Committed: Fri Jan 8 14:08:13 2016 -0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/parser/SparkSqlParser.g  |  48 ++--
 .../spark/sql/catalyst/analysis/Analyzer.scala  |  20 +-
 .../spark/sql/catalyst/analysis/Catalog.scala   |   4 +-
 .../spark/sql/catalyst/expressions/Cast.scala   |   8 +
 .../sql/catalyst/expressions/Expression.scala   |  23 +-
 .../catalyst/expressions/InputFileName.scala    |   1 +
 .../expressions/MonotonicallyIncreasingID.scala |   4 +
 .../sql/catalyst/expressions/SortOrder.scala    |  14 +-
 .../expressions/aggregate/interfaces.scala      |  14 +-
 .../sql/catalyst/expressions/arithmetic.scala   |   8 +
 .../expressions/complexTypeExtractors.scala     |   2 +
 .../expressions/conditionalExpressions.scala    |  41 +++-
 .../expressions/datetimeExpressions.scala       |  22 ++
 .../expressions/decimalExpressions.scala        |   3 +
 .../sql/catalyst/expressions/literals.scala     |  37 ++-
 .../catalyst/expressions/mathExpressions.scala  |   2 +
 .../spark/sql/catalyst/expressions/misc.scala   |   4 +
 .../catalyst/expressions/namedExpressions.scala |  12 +
 .../catalyst/expressions/nullExpressions.scala  |   6 +
 .../sql/catalyst/expressions/predicates.scala   |  19 ++
 .../expressions/randomExpressions.scala         |   3 +
 .../expressions/regexpExpressions.scala         |   2 +
 .../expressions/stringExpressions.scala         |  28 ++-
 .../sql/catalyst/optimizer/Optimizer.scala      |  52 ++++
 .../spark/sql/catalyst/plans/joinTypes.scala    |  24 +-
 .../catalyst/plans/logical/basicOperators.scala |   1 +
 .../spark/sql/catalyst/rules/RuleExecutor.scala |   2 +-
 .../spark/sql/catalyst/util/package.scala       |  14 ++
 .../org/apache/spark/sql/types/ArrayType.scala  |   2 +
 .../org/apache/spark/sql/types/DataType.scala   |   2 +
 .../org/apache/spark/sql/types/MapType.scala    |   2 +
 .../org/apache/spark/sql/types/StructType.scala |   5 +
 .../spark/sql/types/UserDefinedType.scala       |   2 +
 .../sql/catalyst/analysis/AnalysisSuite.scala   |  38 ---
 .../optimizer/ComputeCurrentTimeSuite.scala     |  68 ++++++
 .../optimizer/FilterPushdownSuite.scala         |   6 +-
 .../datasources/parquet/ParquetRelation.scala   |  16 +-
 .../hive/execution/HiveCompatibilitySuite.scala |  12 +-
 .../HiveWindowFunctionQuerySuite.scala          |   1 +
 .../org/apache/spark/sql/hive/HiveQl.scala      |   3 +-
 .../org/apache/spark/sql/hive/SQLBuilder.scala  | 244 +++++++++++++++++++
 .../org/apache/spark/sql/hive/hiveUDFs.scala    |  48 ++--
 .../sql/hive/ExpressionSQLBuilderSuite.scala    |  75 ++++++
 .../spark/sql/hive/LogicalPlanToSQLSuite.scala  | 146 +++++++++++
 .../apache/spark/sql/hive/SQLBuilderTest.scala  |  74 ++++++
 .../sql/hive/execution/HiveComparisonTest.scala |  70 +++++-
 .../sql/hive/execution/HiveQuerySuite.scala     |   1 +
 47 files changed, 1087 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d9447cac/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/SparkSqlParser.g
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/SparkSqlParser.g
 
b/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/SparkSqlParser.g
index b04bb67..2c13d30 100644
--- 
a/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/SparkSqlParser.g
+++ 
b/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/SparkSqlParser.g
@@ -1,9 +1,9 @@
 /**
-   Licensed to the Apache Software Foundation (ASF) under one or more 
-   contributor license agreements.  See the NOTICE file distributed with 
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
    this work for additional information regarding copyright ownership.
    The ASF licenses this file to You under the Apache License, Version 2.0
-   (the "License"); you may not use this file except in compliance with 
+   (the "License"); you may not use this file except in compliance with
    the License.  You may obtain a copy of the License at
 
        http://www.apache.org/licenses/LICENSE-2.0
@@ -582,7 +582,7 @@ import java.util.HashMap;
 
     return header;
   }
-  
+
   @Override
   public String getErrorMessage(RecognitionException e, String[] tokenNames) {
     String msg = null;
@@ -619,7 +619,7 @@ import java.util.HashMap;
     }
     return msg;
   }
-  
+
   public void pushMsg(String msg, RecognizerSharedState state) {
     // ANTLR generated code does not wrap the @init code wit this backtracking 
check,
     //  even if the matching @after has it. If we have parser rules with that 
are doing
@@ -639,7 +639,7 @@ import java.util.HashMap;
   // counter to generate unique union aliases
   private int aliasCounter;
   private String generateUnionAlias() {
-    return "_u" + (++aliasCounter);
+    return "u_" + (++aliasCounter);
   }
   private char [] excludedCharForColumnName = {'.', ':'};
   private boolean containExcludedCharForCreateTableColumnName(String input) {
@@ -1235,7 +1235,7 @@ alterTblPartitionStatementSuffixSkewedLocation
   : KW_SET KW_SKEWED KW_LOCATION skewedLocations
   -> ^(TOK_ALTERTABLE_SKEWED_LOCATION skewedLocations)
   ;
-  
+
 skewedLocations
 @init { pushMsg("skewed locations", state); }
 @after { popMsg(state); }
@@ -1264,7 +1264,7 @@ alterStatementSuffixLocation
   -> ^(TOK_ALTERTABLE_LOCATION $newLoc)
   ;
 
-       
+
 alterStatementSuffixSkewedby
 @init {pushMsg("alter skewed by statement", state);}
 @after{popMsg(state);}
@@ -1336,10 +1336,10 @@ tabTypeExpr
    (identifier (DOT^
    (
    (KW_ELEM_TYPE) => KW_ELEM_TYPE
-   | 
+   |
    (KW_KEY_TYPE) => KW_KEY_TYPE
-   | 
-   (KW_VALUE_TYPE) => KW_VALUE_TYPE 
+   |
+   (KW_VALUE_TYPE) => KW_VALUE_TYPE
    | identifier
    ))*
    )?
@@ -1376,7 +1376,7 @@ descStatement
 analyzeStatement
 @init { pushMsg("analyze statement", state); }
 @after { popMsg(state); }
-    : KW_ANALYZE KW_TABLE (parttype=tableOrPartition) KW_COMPUTE KW_STATISTICS 
((noscan=KW_NOSCAN) | (partialscan=KW_PARTIALSCAN) 
+    : KW_ANALYZE KW_TABLE (parttype=tableOrPartition) KW_COMPUTE KW_STATISTICS 
((noscan=KW_NOSCAN) | (partialscan=KW_PARTIALSCAN)
                                                       | (KW_FOR KW_COLUMNS 
(statsColumnName=columnNameList)?))?
       -> ^(TOK_ANALYZE $parttype $noscan? $partialscan? KW_COLUMNS? 
$statsColumnName?)
     ;
@@ -1389,7 +1389,7 @@ showStatement
     | KW_SHOW KW_COLUMNS (KW_FROM|KW_IN) tableName ((KW_FROM|KW_IN) 
db_name=identifier)?
     -> ^(TOK_SHOWCOLUMNS tableName $db_name?)
     | KW_SHOW KW_FUNCTIONS (KW_LIKE 
showFunctionIdentifier|showFunctionIdentifier)?  -> ^(TOK_SHOWFUNCTIONS 
KW_LIKE? showFunctionIdentifier?)
-    | KW_SHOW KW_PARTITIONS tabName=tableName partitionSpec? -> 
^(TOK_SHOWPARTITIONS $tabName partitionSpec?) 
+    | KW_SHOW KW_PARTITIONS tabName=tableName partitionSpec? -> 
^(TOK_SHOWPARTITIONS $tabName partitionSpec?)
     | KW_SHOW KW_CREATE (
         (KW_DATABASE|KW_SCHEMA) => (KW_DATABASE|KW_SCHEMA) db_name=identifier 
-> ^(TOK_SHOW_CREATEDATABASE $db_name)
         |
@@ -1398,7 +1398,7 @@ showStatement
     | KW_SHOW KW_TABLE KW_EXTENDED ((KW_FROM|KW_IN) db_name=identifier)? 
KW_LIKE showStmtIdentifier partitionSpec?
     -> ^(TOK_SHOW_TABLESTATUS showStmtIdentifier $db_name? partitionSpec?)
     | KW_SHOW KW_TBLPROPERTIES tableName (LPAREN prptyName=StringLiteral 
RPAREN)? -> ^(TOK_SHOW_TBLPROPERTIES tableName $prptyName?)
-    | KW_SHOW KW_LOCKS 
+    | KW_SHOW KW_LOCKS
       (
       (KW_DATABASE|KW_SCHEMA) => (KW_DATABASE|KW_SCHEMA) (dbName=Identifier) 
(isExtended=KW_EXTENDED)? -> ^(TOK_SHOWDBLOCKS $dbName $isExtended?)
       |
@@ -1511,7 +1511,7 @@ showCurrentRole
 setRole
 @init {pushMsg("set role", state);}
 @after {popMsg(state);}
-    : KW_SET KW_ROLE 
+    : KW_SET KW_ROLE
     (
     (KW_ALL) => (all=KW_ALL) -> ^(TOK_SHOW_SET_ROLE Identifier[$all.text])
     |
@@ -1966,7 +1966,7 @@ columnNameOrderList
 skewedValueElement
 @init { pushMsg("skewed value element", state); }
 @after { popMsg(state); }
-    : 
+    :
       skewedColumnValues
      | skewedColumnValuePairList
     ;
@@ -1980,8 +1980,8 @@ skewedColumnValuePairList
 skewedColumnValuePair
 @init { pushMsg("column value pair", state); }
 @after { popMsg(state); }
-    : 
-      LPAREN colValues=skewedColumnValues RPAREN 
+    :
+      LPAREN colValues=skewedColumnValues RPAREN
       -> ^(TOK_TABCOLVALUES $colValues)
     ;
 
@@ -2001,11 +2001,11 @@ skewedColumnValue
 skewedValueLocationElement
 @init { pushMsg("skewed value location element", state); }
 @after { popMsg(state); }
-    : 
+    :
       skewedColumnValue
      | skewedColumnValuePair
     ;
-    
+
 columnNameOrder
 @init { pushMsg("column name order", state); }
 @after { popMsg(state); }
@@ -2118,7 +2118,7 @@ unionType
 @after { popMsg(state); }
     : KW_UNIONTYPE LESSTHAN colTypeList GREATERTHAN -> ^(TOK_UNIONTYPE 
colTypeList)
     ;
-    
+
 setOperator
 @init { pushMsg("set operator", state); }
 @after { popMsg(state); }
@@ -2172,7 +2172,7 @@ fromStatement[boolean topLevel]
                    {adaptor.create(Identifier, generateUnionAlias())}
                   )
                )
-              ^(TOK_INSERT 
+              ^(TOK_INSERT
                  ^(TOK_DESTINATION ^(TOK_DIR TOK_TMP_FILE))
                  ^(TOK_SELECT ^(TOK_SELEXPR TOK_ALLCOLREF))
                )
@@ -2414,8 +2414,8 @@ setColumnsClause
    KW_SET columnAssignmentClause (COMMA columnAssignmentClause)* -> 
^(TOK_SET_COLUMNS_CLAUSE columnAssignmentClause* )
    ;
 
-/* 
-  UPDATE <table> 
+/*
+  UPDATE <table>
   SET col1 = val1, col2 = val2... WHERE ...
 */
 updateStatement

http://git-wip-us.apache.org/repos/asf/spark/blob/d9447cac/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index e362b55..8a33af8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -86,8 +86,7 @@ class Analyzer(
       HiveTypeCoercion.typeCoercionRules ++
       extendedResolutionRules : _*),
     Batch("Nondeterministic", Once,
-      PullOutNondeterministic,
-      ComputeCurrentTime),
+      PullOutNondeterministic),
     Batch("UDF", Once,
       HandleNullInputsForUDF),
     Batch("Cleanup", fixedPoint,
@@ -1230,23 +1229,6 @@ object CleanupAliases extends Rule[LogicalPlan] {
 }
 
 /**
- * Computes the current date and time to make sure we return the same result 
in a single query.
- */
-object ComputeCurrentTime extends Rule[LogicalPlan] {
-  def apply(plan: LogicalPlan): LogicalPlan = {
-    val dateExpr = CurrentDate()
-    val timeExpr = CurrentTimestamp()
-    val currentDate = Literal.create(dateExpr.eval(EmptyRow), 
dateExpr.dataType)
-    val currentTime = Literal.create(timeExpr.eval(EmptyRow), 
timeExpr.dataType)
-
-    plan transformAllExpressions {
-      case CurrentDate() => currentDate
-      case CurrentTimestamp() => currentTime
-    }
-  }
-}
-
-/**
  * Replace the `UpCast` expression by `Cast`, and throw exceptions if the cast 
may truncate.
  */
 object ResolveUpCast extends Rule[LogicalPlan] {

http://git-wip-us.apache.org/repos/asf/spark/blob/d9447cac/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
index e8b2fcf..a8f89ce 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
@@ -110,7 +110,9 @@ class SimpleCatalog(val conf: CatalystConf) extends Catalog 
{
 
     // If an alias was specified by the lookup, wrap the plan in a subquery so 
that attributes are
     // properly qualified with this alias.
-    alias.map(a => Subquery(a, 
tableWithQualifiers)).getOrElse(tableWithQualifiers)
+    alias
+      .map(a => Subquery(a, tableWithQualifiers))
+      .getOrElse(tableWithQualifiers)
   }
 
   override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] 
= {

http://git-wip-us.apache.org/repos/asf/spark/blob/d9447cac/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
index d82d3ed..6f199cf 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
@@ -931,6 +931,14 @@ case class Cast(child: Expression, dataType: DataType) 
extends UnaryExpression {
         $evPrim = $result.copy();
       """
   }
+
+  override def sql: String = dataType match {
+    // HiveQL doesn't allow casting to complex types. For logical plans 
translated from HiveQL, this
+    // type of casting can only be introduced by the analyzer, and can be 
omitted when converting
+    // back to SQL query string.
+    case _: ArrayType | _: MapType | _: StructType => child.sql
+    case _ => s"CAST(${child.sql} AS ${dataType.sql})"
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/d9447cac/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
index 6a9c121..d621951 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
@@ -18,9 +18,10 @@
 package org.apache.spark.sql.catalyst.expressions
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, 
UnresolvedAttribute}
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, TypeCheckResult, 
UnresolvedAttribute}
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.trees.TreeNode
+import org.apache.spark.sql.catalyst.util.sequenceOption
 import org.apache.spark.sql.types._
 
 
////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -223,6 +224,15 @@ abstract class Expression extends TreeNode[Expression] {
   protected def toCommentSafeString: String = this.toString
     .replace("*/", "\\*\\/")
     .replace("\\u", "\\\\u")
+
+  /**
+   * Returns SQL representation of this expression.  For expressions that 
don't have a SQL
+   * representation (e.g. `ScalaUDF`), this method should throw an 
`UnsupportedOperationException`.
+   */
+  @throws[UnsupportedOperationException](cause = "Expression doesn't have a 
SQL representation")
+  def sql: String = throw new UnsupportedOperationException(
+    s"Cannot map expression $this to its SQL representation"
+  )
 }
 
 
@@ -356,6 +366,8 @@ abstract class UnaryExpression extends Expression {
       """
     }
   }
+
+  override def sql: String = s"($prettyName(${child.sql}))"
 }
 
 
@@ -456,6 +468,8 @@ abstract class BinaryExpression extends Expression {
       """
     }
   }
+
+  override def sql: String = s"$prettyName(${left.sql}, ${right.sql})"
 }
 
 
@@ -492,6 +506,8 @@ abstract class BinaryOperator extends BinaryExpression with 
ExpectsInputTypes {
       TypeCheckResult.TypeCheckSuccess
     }
   }
+
+  override def sql: String = s"(${left.sql} $symbol ${right.sql})"
 }
 
 
@@ -593,4 +609,9 @@ abstract class TernaryExpression extends Expression {
       """
     }
   }
+
+  override def sql: String = {
+    val childrenSQL = children.map(_.sql).mkString(", ")
+    s"$prettyName($childrenSQL)"
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d9447cac/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InputFileName.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InputFileName.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InputFileName.scala
index f33833c..827dce8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InputFileName.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InputFileName.scala
@@ -49,4 +49,5 @@ case class InputFileName() extends LeafExpression with 
Nondeterministic {
       "org.apache.spark.rdd.SqlNewHadoopRDDState.getInputFileName();"
   }
 
+  override def sql: String = prettyName
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d9447cac/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala
index d0b78e1..94f8801 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala
@@ -78,4 +78,8 @@ private[sql] case class MonotonicallyIncreasingID() extends 
LeafExpression with
       $countTerm++;
     """
   }
+
+  override def prettyName: String = "monotonically_increasing_id"
+
+  override def sql: String = s"$prettyName()"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d9447cac/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
index 3add722..1cb1b9d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
@@ -24,9 +24,17 @@ import org.apache.spark.sql.types._
 import 
org.apache.spark.util.collection.unsafe.sort.PrefixComparators.BinaryPrefixComparator
 import 
org.apache.spark.util.collection.unsafe.sort.PrefixComparators.DoublePrefixComparator
 
-abstract sealed class SortDirection
-case object Ascending extends SortDirection
-case object Descending extends SortDirection
+abstract sealed class SortDirection {
+  def sql: String
+}
+
+case object Ascending extends SortDirection {
+  override def sql: String = "ASC"
+}
+
+case object Descending extends SortDirection {
+  override def sql: String = "DESC"
+}
 
 /**
  * An expression that can be used to sort a tuple.  This class extends 
expression primarily so that

http://git-wip-us.apache.org/repos/asf/spark/blob/d9447cac/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
index b47f32d..ddd99c5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
@@ -19,7 +19,8 @@ package org.apache.spark.sql.catalyst.expressions.aggregate
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, 
CodegenFallback, GeneratedExpressionCode}
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.util.sequenceOption
 import org.apache.spark.sql.types._
 
 /** The mode of an [[AggregateFunction]]. */
@@ -93,11 +94,13 @@ private[sql] case class AggregateExpression(
 
   override def prettyString: String = aggregateFunction.prettyString
 
-  override def toString: String = 
s"(${aggregateFunction},mode=$mode,isDistinct=$isDistinct)"
+  override def toString: String = 
s"($aggregateFunction,mode=$mode,isDistinct=$isDistinct)"
+
+  override def sql: String = aggregateFunction.sql(isDistinct)
 }
 
 /**
- * AggregateFunction2 is the superclass of two aggregation function interfaces:
+ * AggregateFunction is the superclass of two aggregation function interfaces:
  *
  *  - [[ImperativeAggregate]] is for aggregation functions that are specified 
in terms of
  *    initialize(), update(), and merge() functions that operate on Row-based 
aggregation buffers.
@@ -163,6 +166,11 @@ sealed abstract class AggregateFunction extends Expression 
with ImplicitCastInpu
   def toAggregateExpression(isDistinct: Boolean): AggregateExpression = {
     AggregateExpression(aggregateFunction = this, mode = Complete, isDistinct 
= isDistinct)
   }
+
+  def sql(isDistinct: Boolean): String = {
+    val distinct = if (isDistinct) "DISTINCT " else " "
+    s"$prettyName($distinct${children.map(_.sql).mkString(", ")})"
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/d9447cac/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
index 61a17fd..7bd851c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
@@ -54,6 +54,8 @@ case class UnaryMinus(child: Expression) extends 
UnaryExpression with ExpectsInp
       numeric.negate(input)
     }
   }
+
+  override def sql: String = s"(-${child.sql})"
 }
 
 case class UnaryPositive(child: Expression) extends UnaryExpression with 
ExpectsInputTypes {
@@ -67,6 +69,8 @@ case class UnaryPositive(child: Expression) extends 
UnaryExpression with Expects
     defineCodeGen(ctx, ev, c => c)
 
   protected override def nullSafeEval(input: Any): Any = input
+
+  override def sql: String = s"(+${child.sql})"
 }
 
 /**
@@ -91,6 +95,8 @@ case class Abs(child: Expression) extends UnaryExpression 
with ExpectsInputTypes
   }
 
   protected override def nullSafeEval(input: Any): Any = numeric.abs(input)
+
+  override def sql: String = s"$prettyName(${child.sql})"
 }
 
 abstract class BinaryArithmetic extends BinaryOperator {
@@ -513,4 +519,6 @@ case class Pmod(left: Expression, right: Expression) 
extends BinaryArithmetic {
     val r = a % n
     if (r.compare(Decimal.ZERO) < 0) {(r + n) % n} else r
   }
+
+  override def sql: String = s"$prettyName(${left.sql}, ${right.sql})"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d9447cac/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeExtractors.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeExtractors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeExtractors.scala
index 9c73239..5bd97cc 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeExtractors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeExtractors.scala
@@ -130,6 +130,8 @@ case class GetStructField(child: Expression, ordinal: Int, 
name: Option[String]
       }
     })
   }
+
+  override def sql: String = child.sql + s".`${childSchema(ordinal).name}`"
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/d9447cac/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
index f79c867..19da849 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
 import org.apache.spark.sql.catalyst.expressions.codegen._
-import org.apache.spark.sql.catalyst.util.TypeUtils
+import org.apache.spark.sql.catalyst.util.{sequenceOption, TypeUtils}
 import org.apache.spark.sql.types._
 
 
@@ -74,6 +74,8 @@ case class If(predicate: Expression, trueValue: Expression, 
falseValue: Expressi
   }
 
   override def toString: String = s"if ($predicate) $trueValue else 
$falseValue"
+
+  override def sql: String = s"(IF(${predicate.sql}, ${trueValue.sql}, 
${falseValue.sql}))"
 }
 
 trait CaseWhenLike extends Expression {
@@ -110,7 +112,7 @@ trait CaseWhenLike extends Expression {
 
   override def nullable: Boolean = {
     // If no value is nullable and no elseValue is provided, the whole 
statement defaults to null.
-    thenList.exists(_.nullable) || (elseValue.map(_.nullable).getOrElse(true))
+    thenList.exists(_.nullable) || elseValue.map(_.nullable).getOrElse(true)
   }
 }
 
@@ -206,6 +208,23 @@ case class CaseWhen(branches: Seq[Expression]) extends 
CaseWhenLike {
       case Seq(elseValue) => s" ELSE $elseValue"
     }.mkString
   }
+
+  override def sql: String = {
+    val branchesSQL = branches.map(_.sql)
+    val (cases, maybeElse) = if (branches.length % 2 == 0) {
+      (branchesSQL, None)
+    } else {
+      (branchesSQL.init, Some(branchesSQL.last))
+    }
+
+    val head = s"CASE "
+    val tail = maybeElse.map(e => s" ELSE $e").getOrElse("") + " END"
+    val body = cases.grouped(2).map {
+      case Seq(whenExpr, thenExpr) => s"WHEN $whenExpr THEN $thenExpr"
+    }.mkString(" ")
+
+    head + body + tail
+  }
 }
 
 // scalastyle:off
@@ -310,6 +329,24 @@ case class CaseKeyWhen(key: Expression, branches: 
Seq[Expression]) extends CaseW
       case Seq(elseValue) => s" ELSE $elseValue"
     }.mkString
   }
+
+  override def sql: String = {
+    val keySQL = key.sql
+    val branchesSQL = branches.map(_.sql)
+    val (cases, maybeElse) = if (branches.length % 2 == 0) {
+      (branchesSQL, None)
+    } else {
+      (branchesSQL.init, Some(branchesSQL.last))
+    }
+
+    val head = s"CASE $keySQL "
+    val tail = maybeElse.map(e => s" ELSE $e").getOrElse("") + " END"
+    val body = cases.grouped(2).map {
+      case Seq(whenExpr, thenExpr) => s"WHEN $whenExpr THEN $thenExpr"
+    }.mkString(" ")
+
+    head + body + tail
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/d9447cac/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
index 3d65946..17f1df0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
@@ -44,6 +44,8 @@ case class CurrentDate() extends LeafExpression with 
CodegenFallback {
   override def eval(input: InternalRow): Any = {
     DateTimeUtils.millisToDays(System.currentTimeMillis())
   }
+
+  override def prettyName: String = "current_date"
 }
 
 /**
@@ -61,6 +63,8 @@ case class CurrentTimestamp() extends LeafExpression with 
CodegenFallback {
   override def eval(input: InternalRow): Any = {
     System.currentTimeMillis() * 1000L
   }
+
+  override def prettyName: String = "current_timestamp"
 }
 
 /**
@@ -85,6 +89,8 @@ case class DateAdd(startDate: Expression, days: Expression)
       s"""${ev.value} = $sd + $d;"""
     })
   }
+
+  override def prettyName: String = "date_add"
 }
 
 /**
@@ -108,6 +114,8 @@ case class DateSub(startDate: Expression, days: Expression)
       s"""${ev.value} = $sd - $d;"""
     })
   }
+
+  override def prettyName: String = "date_sub"
 }
 
 case class Hour(child: Expression) extends UnaryExpression with 
ImplicitCastInputTypes {
@@ -309,6 +317,8 @@ case class ToUnixTimestamp(timeExp: Expression, format: 
Expression) extends Unix
   def this(time: Expression) = {
     this(time, Literal("yyyy-MM-dd HH:mm:ss"))
   }
+
+  override def prettyName: String = "to_unix_timestamp"
 }
 
 /**
@@ -332,6 +342,8 @@ case class UnixTimestamp(timeExp: Expression, format: 
Expression) extends UnixTi
   def this() = {
     this(CurrentTimestamp())
   }
+
+  override def prettyName: String = "unix_timestamp"
 }
 
 abstract class UnixTime extends BinaryExpression with ExpectsInputTypes {
@@ -437,6 +449,8 @@ abstract class UnixTime extends BinaryExpression with 
ExpectsInputTypes {
         """
     }
   }
+
+  override def prettyName: String = "unix_time"
 }
 
 /**
@@ -451,6 +465,8 @@ case class FromUnixTime(sec: Expression, format: Expression)
   override def left: Expression = sec
   override def right: Expression = format
 
+  override def prettyName: String = "from_unixtime"
+
   def this(unix: Expression) = {
     this(unix, Literal("yyyy-MM-dd HH:mm:ss"))
   }
@@ -733,6 +749,8 @@ case class AddMonths(startDate: Expression, numMonths: 
Expression)
       s"""$dtu.dateAddMonths($sd, $m)"""
     })
   }
+
+  override def prettyName: String = "add_months"
 }
 
 /**
@@ -758,6 +776,8 @@ case class MonthsBetween(date1: Expression, date2: 
Expression)
       s"""$dtu.monthsBetween($l, $r)"""
     })
   }
+
+  override def prettyName: String = "months_between"
 }
 
 /**
@@ -823,6 +843,8 @@ case class ToDate(child: Expression) extends 
UnaryExpression with ImplicitCastIn
   override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): 
String = {
     defineCodeGen(ctx, ev, d => d)
   }
+
+  override def prettyName: String = "to_date"
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/d9447cac/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalExpressions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalExpressions.scala
index c54bcdd..5f8b544 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalExpressions.scala
@@ -73,6 +73,7 @@ case class PromotePrecision(child: Expression) extends 
UnaryExpression {
   override def gen(ctx: CodeGenContext): GeneratedExpressionCode = 
child.gen(ctx)
   override protected def genCode(ctx: CodeGenContext, ev: 
GeneratedExpressionCode): String = ""
   override def prettyName: String = "promote_precision"
+  override def sql: String = child.sql
 }
 
 /**
@@ -107,4 +108,6 @@ case class CheckOverflow(child: Expression, dataType: 
DecimalType) extends Unary
   }
 
   override def toString: String = s"CheckOverflow($child, $dataType)"
+
+  override def sql: String = child.sql
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d9447cac/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
index 672cc9c..0eb915f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
@@ -21,9 +21,9 @@ import java.sql.{Date, Timestamp}
 
 import org.json4s.JsonAST._
 
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types._
 
@@ -214,6 +214,41 @@ case class Literal protected (value: Any, dataType: 
DataType)
       }
     }
   }
+
+  override def sql: String = (value, dataType) match {
+    case (_, NullType | _: ArrayType | _: MapType | _: StructType) if value == 
null =>
+      "NULL"
+
+    case _ if value == null =>
+      s"CAST(NULL AS ${dataType.sql})"
+
+    case (v: UTF8String, StringType) =>
+      // Escapes all backslashes and double quotes.
+      "\"" + v.toString.replace("\\", "\\\\").replace("\"", "\\\"") + "\""
+
+    case (v: Byte, ByteType) =>
+      s"CAST($v AS ${ByteType.simpleString.toUpperCase})"
+
+    case (v: Short, ShortType) =>
+      s"CAST($v AS ${ShortType.simpleString.toUpperCase})"
+
+    case (v: Long, LongType) =>
+      s"CAST($v AS ${LongType.simpleString.toUpperCase})"
+
+    case (v: Float, FloatType) =>
+      s"CAST($v AS ${FloatType.simpleString.toUpperCase})"
+
+    case (v: Decimal, DecimalType.Fixed(precision, scale)) =>
+      s"CAST($v AS ${DecimalType.simpleString.toUpperCase}($precision, 
$scale))"
+
+    case (v: Int, DateType) =>
+      s"DATE '${DateTimeUtils.toJavaDate(v)}'"
+
+    case (v: Long, TimestampType) =>
+      s"TIMESTAMP('${DateTimeUtils.toJavaTimestamp(v)}')"
+
+    case _ => value.toString
+  }
 }
 
 // TODO: Specialize

http://git-wip-us.apache.org/repos/asf/spark/blob/d9447cac/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala
index 002f592..66d8631 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala
@@ -70,6 +70,8 @@ abstract class UnaryMathExpression(val f: Double => Double, 
name: String)
   override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): 
String = {
     defineCodeGen(ctx, ev, c => s"java.lang.Math.${funcName}($c)")
   }
+
+  override def sql: String = s"$name(${child.sql})"
 }
 
 abstract class UnaryLogExpression(f: Double => Double, name: String)

http://git-wip-us.apache.org/repos/asf/spark/blob/d9447cac/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
index fd95b12..cc406a3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
@@ -220,4 +220,8 @@ case class Murmur3Hash(children: Seq[Expression], seed: 
Int) extends Expression
       final int ${ev.value} = ${unsafeRow.value}.hashCode($seed);
     """
   }
+
+  override def prettyName: String = "hash"
+
+  override def sql: String = s"$prettyName(${children.map(_.sql).mkString(", 
")}, $seed)"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d9447cac/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index eefd9c7..eee708c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -164,6 +164,12 @@ case class Alias(child: Expression, name: String)(
         explicitMetadata == a.explicitMetadata
     case _ => false
   }
+
+  override def sql: String = {
+    val qualifiersString =
+      if (qualifiers.isEmpty) "" else qualifiers.map("`" + _ + 
"`").mkString("", ".", ".")
+    s"${child.sql} AS $qualifiersString`$name`"
+  }
 }
 
 /**
@@ -271,6 +277,12 @@ case class AttributeReference(
   // Since the expression id is not in the first constructor it is missing 
from the default
   // tree string.
   override def simpleString: String = s"$name#${exprId.id}: 
${dataType.simpleString}"
+
+  override def sql: String = {
+    val qualifiersString =
+      if (qualifiers.isEmpty) "" else qualifiers.map("`" + _ + 
"`").mkString("", ".", ".")
+    s"$qualifiersString`$name`"
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/d9447cac/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullExpressions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullExpressions.scala
index df4747d..89aec2b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullExpressions.scala
@@ -83,6 +83,8 @@ case class Coalesce(children: Seq[Expression]) extends 
Expression {
       """
     }.mkString("\n")
   }
+
+  override def sql: String = s"$prettyName(${children.map(_.sql).mkString(", 
")})"
 }
 
 
@@ -193,6 +195,8 @@ case class IsNull(child: Expression) extends 
UnaryExpression with Predicate {
     ev.value = eval.isNull
     eval.code
   }
+
+  override def sql: String = s"(${child.sql} IS NULL)"
 }
 
 
@@ -212,6 +216,8 @@ case class IsNotNull(child: Expression) extends 
UnaryExpression with Predicate {
     ev.value = s"(!(${eval.isNull}))"
     eval.code
   }
+
+  override def sql: String = s"(${child.sql} IS NOT NULL)"
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d9447cac/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
index 304b438..bca12a8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
@@ -101,6 +101,8 @@ case class Not(child: Expression)
   override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): 
String = {
     defineCodeGen(ctx, ev, c => s"!($c)")
   }
+
+  override def sql: String = s"(NOT ${child.sql})"
 }
 
 
@@ -176,6 +178,13 @@ case class In(value: Expression, list: Seq[Expression]) 
extends Predicate
       }
     """
   }
+
+  override def sql: String = {
+    val childrenSQL = children.map(_.sql)
+    val valueSQL = childrenSQL.head
+    val listSQL = childrenSQL.tail.mkString(", ")
+    s"($valueSQL IN ($listSQL))"
+  }
 }
 
 /**
@@ -226,6 +235,12 @@ case class InSet(child: Expression, hset: Set[Any]) 
extends UnaryExpression with
       }
      """
   }
+
+  override def sql: String = {
+    val valueSQL = child.sql
+    val listSQL = hset.toSeq.map(Literal(_).sql).mkString(", ")
+    s"($valueSQL IN ($listSQL))"
+  }
 }
 
 case class And(left: Expression, right: Expression) extends BinaryOperator 
with Predicate {
@@ -274,6 +289,8 @@ case class And(left: Expression, right: Expression) extends 
BinaryOperator with
       }
      """
   }
+
+  override def sql: String = s"(${left.sql} AND ${right.sql})"
 }
 
 
@@ -323,6 +340,8 @@ case class Or(left: Expression, right: Expression) extends 
BinaryOperator with P
       }
      """
   }
+
+  override def sql: String = s"(${left.sql} OR ${right.sql})"
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d9447cac/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/randomExpressions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/randomExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/randomExpressions.scala
index 8bde8cb..8de47e9 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/randomExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/randomExpressions.scala
@@ -49,6 +49,9 @@ abstract class RDG extends LeafExpression with 
Nondeterministic {
   override def nullable: Boolean = false
 
   override def dataType: DataType = DoubleType
+
+  // NOTE: Even if the user doesn't provide a seed, Spark SQL adds a default 
seed.
+  override def sql: String = s"$prettyName($seed)"
 }
 
 /** Generate a random column with i.i.d. uniformly distributed values in [0, 
1). */

http://git-wip-us.apache.org/repos/asf/spark/blob/d9447cac/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala
index adef605..db26663 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala
@@ -59,6 +59,8 @@ trait StringRegexExpression extends ImplicitCastInputTypes {
       matches(regex, input1.asInstanceOf[UTF8String].toString)
     }
   }
+
+  override def sql: String = s"${left.sql} ${prettyName.toUpperCase} 
${right.sql}"
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d9447cac/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
index 50c8b9d..931f752 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
@@ -23,6 +23,7 @@ import java.util.{HashMap, Locale, Map => JMap}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.catalyst.util.sequenceOption
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.{ByteArray, UTF8String}
 
@@ -61,6 +62,8 @@ case class Concat(children: Seq[Expression]) extends 
Expression with ImplicitCas
       }
     """
   }
+
+  override def sql: String = s"$prettyName(${children.map(_.sql).mkString(", 
")})"
 }
 
 
@@ -153,6 +156,8 @@ case class ConcatWs(children: Seq[Expression])
       """
     }
   }
+
+  override def sql: String = s"$prettyName(${children.map(_.sql).mkString(", 
")})"
 }
 
 trait String2StringExpression extends ImplicitCastInputTypes {
@@ -292,24 +297,24 @@ case class StringTranslate(srcExpr: Expression, 
matchingExpr: Expression, replac
     val termDict = ctx.freshName("dict")
     val classNameDict = classOf[JMap[Character, Character]].getCanonicalName
 
-    ctx.addMutableState("UTF8String", termLastMatching, s"${termLastMatching} 
= null;")
-    ctx.addMutableState("UTF8String", termLastReplace, s"${termLastReplace} = 
null;")
-    ctx.addMutableState(classNameDict, termDict, s"${termDict} = null;")
+    ctx.addMutableState("UTF8String", termLastMatching, s"$termLastMatching = 
null;")
+    ctx.addMutableState("UTF8String", termLastReplace, s"$termLastReplace = 
null;")
+    ctx.addMutableState(classNameDict, termDict, s"$termDict = null;")
 
     nullSafeCodeGen(ctx, ev, (src, matching, replace) => {
       val check = if (matchingExpr.foldable && replaceExpr.foldable) {
-        s"${termDict} == null"
+        s"$termDict == null"
       } else {
-        s"!${matching}.equals(${termLastMatching}) || 
!${replace}.equals(${termLastReplace})"
+        s"!$matching.equals($termLastMatching) || 
!$replace.equals($termLastReplace)"
       }
       s"""if ($check) {
         // Not all of them is literal or matching or replace value changed
-        ${termLastMatching} = ${matching}.clone();
-        ${termLastReplace} = ${replace}.clone();
-        ${termDict} = org.apache.spark.sql.catalyst.expressions.StringTranslate
-          .buildDict(${termLastMatching}, ${termLastReplace});
+        $termLastMatching = $matching.clone();
+        $termLastReplace = $replace.clone();
+        $termDict = org.apache.spark.sql.catalyst.expressions.StringTranslate
+          .buildDict($termLastMatching, $termLastReplace);
       }
-      ${ev.value} = ${src}.translate(${termDict});
+      ${ev.value} = $src.translate($termDict);
       """
     })
   }
@@ -340,6 +345,8 @@ case class FindInSet(left: Expression, right: Expression) 
extends BinaryExpressi
   }
 
   override def dataType: DataType = IntegerType
+
+  override def prettyName: String = "find_in_set"
 }
 
 /**
@@ -832,7 +839,6 @@ case class Base64(child: Expression) extends 
UnaryExpression with ImplicitCastIn
             org.apache.commons.codec.binary.Base64.encodeBase64($child));
        """})
   }
-
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/d9447cac/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 0b1c742..f8121a7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -37,6 +37,8 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
     // SubQueries are only needed for analysis and can be removed before 
execution.
     Batch("Remove SubQueries", FixedPoint(100),
       EliminateSubQueries) ::
+    Batch("Compute Current Time", Once,
+      ComputeCurrentTime) ::
     Batch("Aggregate", FixedPoint(100),
       ReplaceDistinctWithAggregate,
       RemoveLiteralFromGroupExpressions) ::
@@ -333,6 +335,39 @@ object ProjectCollapsing extends Rule[LogicalPlan] {
         )
         Project(cleanedProjection, child)
       }
+
+    // TODO Eliminate duplicate code
+    // This clause is identical to the one above except that the inner 
operator is an `Aggregate`
+    // rather than a `Project`.
+    case p @ Project(projectList1, agg @ Aggregate(_, projectList2, child)) =>
+      // Create a map of Aliases to their values from the child projection.
+      // e.g., 'SELECT ... FROM (SELECT a + b AS c, d ...)' produces Map(c -> 
Alias(a + b, c)).
+      val aliasMap = AttributeMap(projectList2.collect {
+        case a: Alias => (a.toAttribute, a)
+      })
+
+      // We only collapse these two Projects if their overlapped expressions 
are all
+      // deterministic.
+      val hasNondeterministic = projectList1.exists(_.collect {
+        case a: Attribute if aliasMap.contains(a) => aliasMap(a).child
+      }.exists(!_.deterministic))
+
+      if (hasNondeterministic) {
+        p
+      } else {
+        // Substitute any attributes that are produced by the child 
projection, so that we safely
+        // eliminate it.
+        // e.g., 'SELECT c + 1 FROM (SELECT a + b AS C ...' produces 'SELECT a 
+ b + 1 ...'
+        // TODO: Fix TransformBase to avoid the cast below.
+        val substitutedProjection = projectList1.map(_.transform {
+          case a: Attribute => aliasMap.getOrElse(a, a)
+        }).asInstanceOf[Seq[NamedExpression]]
+        // collapse 2 projects may introduce unnecessary Aliases, trim them 
here.
+        val cleanedProjection = substitutedProjection.map(p =>
+          
CleanupAliases.trimNonTopLevelAliases(p).asInstanceOf[NamedExpression]
+        )
+        agg.copy(aggregateExpressions = cleanedProjection)
+      }
   }
 }
 
@@ -976,3 +1011,20 @@ object RemoveLiteralFromGroupExpressions extends 
Rule[LogicalPlan] {
       a.copy(groupingExpressions = newGrouping)
   }
 }
+
+/**
+ * Computes the current date and time to make sure we return the same result 
in a single query.
+ */
+object ComputeCurrentTime extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = {
+    val dateExpr = CurrentDate()
+    val timeExpr = CurrentTimestamp()
+    val currentDate = Literal.create(dateExpr.eval(EmptyRow), 
dateExpr.dataType)
+    val currentTime = Literal.create(timeExpr.eval(EmptyRow), 
timeExpr.dataType)
+
+    plan transformAllExpressions {
+      case CurrentDate() => currentDate
+      case CurrentTimestamp() => currentTime
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/d9447cac/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala
index 77dec7c..a5f6764 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala
@@ -37,14 +37,26 @@ object JoinType {
   }
 }
 
-sealed abstract class JoinType
+sealed abstract class JoinType {
+  def sql: String
+}
 
-case object Inner extends JoinType
+case object Inner extends JoinType {
+  override def sql: String = "INNER"
+}
 
-case object LeftOuter extends JoinType
+case object LeftOuter extends JoinType {
+  override def sql: String = "LEFT OUTER"
+}
 
-case object RightOuter extends JoinType
+case object RightOuter extends JoinType {
+  override def sql: String = "RIGHT OUTER"
+}
 
-case object FullOuter extends JoinType
+case object FullOuter extends JoinType {
+  override def sql: String = "FULL OUTER"
+}
 
-case object LeftSemi extends JoinType
+case object LeftSemi extends JoinType {
+  override def sql: String = "LEFT SEMI"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/d9447cac/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 79759b5..64957db 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -423,6 +423,7 @@ case class Limit(limitExpr: Expression, child: LogicalPlan) 
extends UnaryNode {
 }
 
 case class Subquery(alias: String, child: LogicalPlan) extends UnaryNode {
+
   override def output: Seq[Attribute] = 
child.output.map(_.withQualifiers(alias :: Nil))
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d9447cac/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
index 62ea731..9ebacb4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
@@ -37,7 +37,7 @@ object RuleExecutor {
     val maxSize = map.keys.map(_.toString.length).max
     map.toSeq.sortBy(_._2).reverseMap { case (k, v) =>
       s"${k.padTo(maxSize, " ").mkString} $v"
-    }.mkString("\n")
+    }.mkString("\n", "\n", "")
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d9447cac/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
index 7129347..7a0d0de 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
@@ -130,6 +130,20 @@ package object util {
     ret
   }
 
+  /**
+   * Converts a `Seq` of `Option[T]` to an `Option` of `Seq[T]`.
+   */
+  def sequenceOption[T](seq: Seq[Option[T]]): Option[Seq[T]] = seq match {
+    case xs if xs.isEmpty =>
+      Option(Seq.empty[T])
+
+    case xs =>
+      for {
+        head <- xs.head
+        tail <- sequenceOption(xs.tail)
+      } yield head +: tail
+  }
+
   /* FIX ME
   implicit class debugLogging(a: Any) {
     def debugLogging() {

http://git-wip-us.apache.org/repos/asf/spark/blob/d9447cac/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala
index 6533622..520e344 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala
@@ -77,6 +77,8 @@ case class ArrayType(elementType: DataType, containsNull: 
Boolean) extends DataT
 
   override def simpleString: String = s"array<${elementType.simpleString}>"
 
+  override def sql: String = s"ARRAY<${elementType.sql}>"
+
   override private[spark] def asNullable: ArrayType =
     ArrayType(elementType.asNullable, containsNull = true)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d9447cac/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
index 136a97e..92cf8d4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
@@ -65,6 +65,8 @@ abstract class DataType extends AbstractDataType {
   /** Readable string representation for the type with truncation */
   private[sql] def simpleString(maxNumberFields: Int): String = simpleString
 
+  def sql: String = simpleString.toUpperCase
+
   /**
    * Check if `this` and `other` are the same data type when ignoring 
nullability
    * (`StructField.nullable`, `ArrayType.containsNull`, and 
`MapType.valueContainsNull`).

http://git-wip-us.apache.org/repos/asf/spark/blob/d9447cac/sql/catalyst/src/main/scala/org/apache/spark/sql/types/MapType.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/MapType.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/MapType.scala
index 00461e5..5474954 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/MapType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/MapType.scala
@@ -62,6 +62,8 @@ case class MapType(
 
   override def simpleString: String = 
s"map<${keyType.simpleString},${valueType.simpleString}>"
 
+  override def sql: String = s"MAP<${keyType.sql}, ${valueType.sql}>"
+
   override private[spark] def asNullable: MapType =
     MapType(keyType.asNullable, valueType.asNullable, valueContainsNull = true)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d9447cac/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
index 34382bf..9b5c86a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
@@ -279,6 +279,11 @@ case class StructType(fields: Array[StructField]) extends 
DataType with Seq[Stru
     s"struct<${fieldTypes.mkString(",")}>"
   }
 
+  override def sql: String = {
+    val fieldTypes = fields.map(f => s"`${f.name}`: ${f.dataType.sql}")
+    s"STRUCT<${fieldTypes.mkString(", ")}>"
+  }
+
   private[sql] override def simpleString(maxNumberFields: Int): String = {
     val builder = new StringBuilder
     val fieldTypes = fields.take(maxNumberFields).map {

http://git-wip-us.apache.org/repos/asf/spark/blob/d9447cac/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala
index 4305903..d7a2c23 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala
@@ -84,6 +84,8 @@ abstract class UserDefinedType[UserType] extends DataType 
with Serializable {
 
   override private[sql] def acceptsType(dataType: DataType) =
     this.getClass == dataType.getClass
+
+  override def sql: String = sqlType.sql
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/d9447cac/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index fa823e3..cf84855 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -22,7 +22,6 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.types._
 
 class AnalysisSuite extends AnalysisTest {
@@ -238,43 +237,6 @@ class AnalysisSuite extends AnalysisTest {
     checkAnalysis(plan, expected)
   }
 
-  test("analyzer should replace current_timestamp with literals") {
-    val in = Project(Seq(Alias(CurrentTimestamp(), "a")(), 
Alias(CurrentTimestamp(), "b")()),
-      LocalRelation())
-
-    val min = System.currentTimeMillis() * 1000
-    val plan = in.analyze.asInstanceOf[Project]
-    val max = (System.currentTimeMillis() + 1) * 1000
-
-    val lits = new scala.collection.mutable.ArrayBuffer[Long]
-    plan.transformAllExpressions { case e: Literal =>
-      lits += e.value.asInstanceOf[Long]
-      e
-    }
-    assert(lits.size == 2)
-    assert(lits(0) >= min && lits(0) <= max)
-    assert(lits(1) >= min && lits(1) <= max)
-    assert(lits(0) == lits(1))
-  }
-
-  test("analyzer should replace current_date with literals") {
-    val in = Project(Seq(Alias(CurrentDate(), "a")(), Alias(CurrentDate(), 
"b")()), LocalRelation())
-
-    val min = DateTimeUtils.millisToDays(System.currentTimeMillis())
-    val plan = in.analyze.asInstanceOf[Project]
-    val max = DateTimeUtils.millisToDays(System.currentTimeMillis())
-
-    val lits = new scala.collection.mutable.ArrayBuffer[Int]
-    plan.transformAllExpressions { case e: Literal =>
-      lits += e.value.asInstanceOf[Int]
-      e
-    }
-    assert(lits.size == 2)
-    assert(lits(0) >= min && lits(0) <= max)
-    assert(lits(1) >= min && lits(1) <= max)
-    assert(lits(0) == lits(1))
-  }
-
   test("SPARK-12102: Ignore nullablity when comparing two sides of case") {
     val relation = LocalRelation('a.struct('x.int), 
'b.struct('x.int.withNullability(false)))
     val plan = relation.select(CaseWhen(Seq(Literal(true), 'a, 'b)).as("val"))

http://git-wip-us.apache.org/repos/asf/spark/blob/d9447cac/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala
new file mode 100644
index 0000000..10ed4e4
--- /dev/null
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentDate, 
CurrentTimestamp, Literal}
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, 
LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+
+class ComputeCurrentTimeSuite extends PlanTest {
+  object Optimize extends RuleExecutor[LogicalPlan] {
+    val batches = Seq(Batch("ComputeCurrentTime", Once, ComputeCurrentTime))
+  }
+
+  test("analyzer should replace current_timestamp with literals") {
+    val in = Project(Seq(Alias(CurrentTimestamp(), "a")(), 
Alias(CurrentTimestamp(), "b")()),
+      LocalRelation())
+
+    val min = System.currentTimeMillis() * 1000
+    val plan = Optimize.execute(in.analyze).asInstanceOf[Project]
+    val max = (System.currentTimeMillis() + 1) * 1000
+
+    val lits = new scala.collection.mutable.ArrayBuffer[Long]
+    plan.transformAllExpressions { case e: Literal =>
+      lits += e.value.asInstanceOf[Long]
+      e
+    }
+    assert(lits.size == 2)
+    assert(lits(0) >= min && lits(0) <= max)
+    assert(lits(1) >= min && lits(1) <= max)
+    assert(lits(0) == lits(1))
+  }
+
+  test("analyzer should replace current_date with literals") {
+    val in = Project(Seq(Alias(CurrentDate(), "a")(), Alias(CurrentDate(), 
"b")()), LocalRelation())
+
+    val min = DateTimeUtils.millisToDays(System.currentTimeMillis())
+    val plan = Optimize.execute(in.analyze).asInstanceOf[Project]
+    val max = DateTimeUtils.millisToDays(System.currentTimeMillis())
+
+    val lits = new scala.collection.mutable.ArrayBuffer[Int]
+    plan.transformAllExpressions { case e: Literal =>
+      lits += e.value.asInstanceOf[Int]
+      e
+    }
+    assert(lits.size == 2)
+    assert(lits(0) >= min && lits(0) <= max)
+    assert(lits(1) >= min && lits(1) <= max)
+    assert(lits(0) == lits(1))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/d9447cac/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index b998636..f9f3bd5 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -75,8 +75,7 @@ class FilterPushdownSuite extends PlanTest {
     val correctAnswer =
       testRelation
         .select('a)
-        .groupBy('a)('a)
-        .select('a).analyze
+        .groupBy('a)('a).analyze
 
     comparePlans(optimized, correctAnswer)
   }
@@ -91,8 +90,7 @@ class FilterPushdownSuite extends PlanTest {
     val correctAnswer =
       testRelation
         .select('a)
-        .groupBy('a)('a as 'c)
-        .select('c).analyze
+        .groupBy('a)('a as 'c).analyze
 
     comparePlans(optimized, correctAnswer)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/d9447cac/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 4b375de..ca8d010 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -18,8 +18,8 @@
 package org.apache.spark.sql.execution.datasources.parquet
 
 import java.net.URI
-import java.util.{List => JList}
 import java.util.logging.{Logger => JLogger}
+import java.util.{List => JList}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -32,24 +32,24 @@ import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.hadoop.mapreduce.task.JobContextImpl
-import org.apache.parquet.{Log => ApacheParquetLog}
 import org.apache.parquet.filter2.predicate.FilterApi
 import org.apache.parquet.hadoop._
 import org.apache.parquet.hadoop.metadata.CompressionCodecName
 import org.apache.parquet.hadoop.util.ContextUtil
 import org.apache.parquet.schema.MessageType
+import org.apache.parquet.{Log => ApacheParquetLog}
 import org.slf4j.bridge.SLF4JBridgeHandler
 
-import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.{RDD, SqlNewHadoopPartition, SqlNewHadoopRDD}
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.catalyst.util.LegacyTypeStringParser
+import org.apache.spark.sql.catalyst.{InternalRow, SqlParser, TableIdentifier}
+import org.apache.spark.sql.execution.datasources.{PartitionSpec, _}
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.util.{SerializableConfiguration, Utils}
+import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}
 
 private[sql] class DefaultSource extends BucketedHadoopFsRelationProvider with 
DataSourceRegister {
 
@@ -147,6 +147,12 @@ private[sql] class ParquetRelation(
     .get(ParquetRelation.METASTORE_SCHEMA)
     .map(DataType.fromJson(_).asInstanceOf[StructType])
 
+  // If this relation is converted from a Hive metastore table, this method 
returns the name of the
+  // original Hive metastore table.
+  private[sql] def metastoreTableName: Option[TableIdentifier] = {
+    
parameters.get(ParquetRelation.METASTORE_TABLE_NAME).map(SqlParser.parseTableIdentifier)
+  }
+
   private lazy val metadataCache: MetadataCache = {
     val meta = new MetadataCache
     meta.refresh()

http://git-wip-us.apache.org/repos/asf/spark/blob/d9447cac/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
 
b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index bd1a52e..afd2f61 100644
--- 
a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ 
b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -41,9 +41,12 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with 
BeforeAndAfter {
   private val originalColumnBatchSize = TestHive.conf.columnBatchSize
   private val originalInMemoryPartitionPruning = 
TestHive.conf.inMemoryPartitionPruning
 
-  def testCases = hiveQueryDir.listFiles.map(f => f.getName.stripSuffix(".q") 
-> f)
+  def testCases: Seq[(String, File)] = {
+    hiveQueryDir.listFiles.map(f => f.getName.stripSuffix(".q") -> f)
+  }
 
   override def beforeAll() {
+    super.beforeAll()
     TestHive.cacheTables = true
     // Timezone is fixed to America/Los_Angeles for those timezone sensitive 
tests (timestamp_*)
     TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles"))
@@ -68,10 +71,11 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with 
BeforeAndAfter {
 
     // For debugging dump some statistics about how much time was spent in 
various optimizer rules.
     logWarning(RuleExecutor.dumpTimeSpent())
+    super.afterAll()
   }
 
   /** A list of tests deemed out of scope currently and thus completely 
disregarded. */
-  override def blackList = Seq(
+  override def blackList: Seq[String] = Seq(
     // These tests use hooks that are not on the classpath and thus break all 
subsequent execution.
     "hook_order",
     "hook_context_cs",
@@ -106,7 +110,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with 
BeforeAndAfter {
     "alter_merge",
     "alter_concatenate_indexed_table",
     "protectmode2",
-    //"describe_table",
+    // "describe_table",
     "describe_comment_nonascii",
 
     "create_merge_compressed",
@@ -323,7 +327,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with 
BeforeAndAfter {
    * The set of tests that are believed to be working in catalyst. Tests not 
on whiteList or
    * blacklist are implicitly marked as ignored.
    */
-  override def whiteList = Seq(
+  override def whiteList: Seq[String] = Seq(
     "add_part_exist",
     "add_part_multiple",
     "add_partition_no_whitelist",

http://git-wip-us.apache.org/repos/asf/spark/blob/d9447cac/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveWindowFunctionQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveWindowFunctionQuerySuite.scala
 
b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveWindowFunctionQuerySuite.scala
index 98bbdf0..bad3ca6 100644
--- 
a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveWindowFunctionQuerySuite.scala
+++ 
b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveWindowFunctionQuerySuite.scala
@@ -104,6 +104,7 @@ class HiveWindowFunctionQuerySuite extends 
HiveComparisonTest with BeforeAndAfte
     TimeZone.setDefault(originalTimeZone)
     Locale.setDefault(originalLocale)
     TestHive.reset()
+    super.afterAll()
   }
 
   /////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/spark/blob/d9447cac/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index bf3fe12..5b13dbe 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -668,7 +668,8 @@ private[hive] object HiveQl extends SparkQl with Logging {
         
Option(FunctionRegistry.getFunctionInfo(functionName.toLowerCase)).getOrElse(
           sys.error(s"Couldn't find function $functionName"))
       val functionClassName = functionInfo.getFunctionClass.getName
-      HiveGenericUDTF(new HiveFunctionWrapper(functionClassName), 
children.map(nodeToExpr))
+      HiveGenericUDTF(
+        functionName, new HiveFunctionWrapper(functionClassName), 
children.map(nodeToExpr))
     case other => super.nodeToGenerator(node)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d9447cac/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
new file mode 100644
index 0000000..1c91005
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
NamedExpression, SortOrder}
+import org.apache.spark.sql.catalyst.optimizer.ProjectCollapsing
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+ * A builder class used to convert a resolved logical plan into a SQL query 
string.  Note that this
+ * all resolved logical plan are convertible.  They either don't have 
corresponding SQL
+ * representations (e.g. logical plans that operate on local Scala 
collections), or are simply not
+ * supported by this builder (yet).
+ */
+class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends 
Logging {
+  def this(df: DataFrame) = this(df.queryExecution.analyzed, df.sqlContext)
+
+  def toSQL: Option[String] = {
+    val canonicalizedPlan = Canonicalizer.execute(logicalPlan)
+    val maybeSQL = try {
+      toSQL(canonicalizedPlan)
+    } catch { case cause: UnsupportedOperationException =>
+      logInfo(s"Failed to build SQL query string because: ${cause.getMessage}")
+      None
+    }
+
+    if (maybeSQL.isDefined) {
+      logDebug(
+        s"""Built SQL query string successfully from given logical plan:
+           |
+           |# Original logical plan:
+           |${logicalPlan.treeString}
+           |# Canonicalized logical plan:
+           |${canonicalizedPlan.treeString}
+           |# Built SQL query string:
+           |${maybeSQL.get}
+         """.stripMargin)
+    } else {
+      logDebug(
+        s"""Failed to build SQL query string from given logical plan:
+           |
+           |# Original logical plan:
+           |${logicalPlan.treeString}
+           |# Canonicalized logical plan:
+           |${canonicalizedPlan.treeString}
+         """.stripMargin)
+    }
+
+    maybeSQL
+  }
+
+  private def projectToSQL(
+      projectList: Seq[NamedExpression],
+      child: LogicalPlan,
+      isDistinct: Boolean): Option[String] = {
+    for {
+      childSQL <- toSQL(child)
+      listSQL = projectList.map(_.sql).mkString(", ")
+      maybeFrom = child match {
+        case OneRowRelation => " "
+        case _ => " FROM "
+      }
+      distinct = if (isDistinct) " DISTINCT " else " "
+    } yield s"SELECT$distinct$listSQL$maybeFrom$childSQL"
+  }
+
+  private def aggregateToSQL(
+      groupingExprs: Seq[Expression],
+      aggExprs: Seq[Expression],
+      child: LogicalPlan): Option[String] = {
+    val aggSQL = aggExprs.map(_.sql).mkString(", ")
+    val groupingSQL = groupingExprs.map(_.sql).mkString(", ")
+    val maybeGroupBy = if (groupingSQL.isEmpty) "" else " GROUP BY "
+    val maybeFrom = child match {
+      case OneRowRelation => " "
+      case _ => " FROM "
+    }
+
+    toSQL(child).map { childSQL =>
+      s"SELECT $aggSQL$maybeFrom$childSQL$maybeGroupBy$groupingSQL"
+    }
+  }
+
+  private def toSQL(node: LogicalPlan): Option[String] = node match {
+    case Distinct(Project(list, child)) =>
+      projectToSQL(list, child, isDistinct = true)
+
+    case Project(list, child) =>
+      projectToSQL(list, child, isDistinct = false)
+
+    case Aggregate(groupingExprs, aggExprs, child) =>
+      aggregateToSQL(groupingExprs, aggExprs, child)
+
+    case Limit(limit, child) =>
+      for {
+        childSQL <- toSQL(child)
+        limitSQL = limit.sql
+      } yield s"$childSQL LIMIT $limitSQL"
+
+    case Filter(condition, child) =>
+      for {
+        childSQL <- toSQL(child)
+        whereOrHaving = child match {
+          case _: Aggregate => "HAVING"
+          case _ => "WHERE"
+        }
+        conditionSQL = condition.sql
+      } yield s"$childSQL $whereOrHaving $conditionSQL"
+
+    case Union(left, right) =>
+      for {
+        leftSQL <- toSQL(left)
+        rightSQL <- toSQL(right)
+      } yield s"$leftSQL UNION ALL $rightSQL"
+
+    // ParquetRelation converted from Hive metastore table
+    case Subquery(alias, LogicalRelation(r: ParquetRelation, _)) =>
+      // There seems to be a bug related to `ParquetConversions` analysis 
rule.  The problem is
+      // that, the metastore database name and table name are not always 
propagated to converted
+      // `ParquetRelation` instances via data source options.  Here we use 
subquery alias as a
+      // workaround.
+      Some(s"`$alias`")
+
+    case Subquery(alias, child) =>
+      toSQL(child).map(childSQL => s"($childSQL) AS $alias")
+
+    case Join(left, right, joinType, condition) =>
+      for {
+        leftSQL <- toSQL(left)
+        rightSQL <- toSQL(right)
+        joinTypeSQL = joinType.sql
+        conditionSQL = condition.map(" ON " + _.sql).getOrElse("")
+      } yield s"$leftSQL $joinTypeSQL JOIN $rightSQL$conditionSQL"
+
+    case MetastoreRelation(database, table, alias) =>
+      val aliasSQL = alias.map(a => s" AS `$a`").getOrElse("")
+      Some(s"`$database`.`$table`$aliasSQL")
+
+    case Sort(orders, _, RepartitionByExpression(partitionExprs, child, _))
+        if orders.map(_.child) == partitionExprs =>
+      for {
+        childSQL <- toSQL(child)
+        partitionExprsSQL = partitionExprs.map(_.sql).mkString(", ")
+      } yield s"$childSQL CLUSTER BY $partitionExprsSQL"
+
+    case Sort(orders, global, child) =>
+      for {
+        childSQL <- toSQL(child)
+        ordersSQL = orders.map { case SortOrder(e, dir) => s"${e.sql} 
${dir.sql}" }.mkString(", ")
+        orderOrSort = if (global) "ORDER" else "SORT"
+      } yield s"$childSQL $orderOrSort BY $ordersSQL"
+
+    case RepartitionByExpression(partitionExprs, child, _) =>
+      for {
+        childSQL <- toSQL(child)
+        partitionExprsSQL = partitionExprs.map(_.sql).mkString(", ")
+      } yield s"$childSQL DISTRIBUTE BY $partitionExprsSQL"
+
+    case OneRowRelation =>
+      Some("")
+
+    case _ => None
+  }
+
+  object Canonicalizer extends RuleExecutor[LogicalPlan] {
+    override protected def batches: Seq[Batch] = Seq(
+      Batch("Canonicalizer", FixedPoint(100),
+        // The `WidenSetOperationTypes` analysis rule may introduce extra 
`Project`s over
+        // `Aggregate`s to perform type casting.  This rule merges these 
`Project`s into
+        // `Aggregate`s.
+        ProjectCollapsing,
+
+        // Used to handle other auxiliary `Project`s added by analyzer (e.g.
+        // `ResolveAggregateFunctions` rule)
+        RecoverScopingInfo
+      )
+    )
+
+    object RecoverScopingInfo extends Rule[LogicalPlan] {
+      override def apply(tree: LogicalPlan): LogicalPlan = tree transform {
+        // This branch handles aggregate functions within HAVING clauses.  For 
example:
+        //
+        //   SELECT key FROM src GROUP BY key HAVING max(value) > "val_255"
+        //
+        // This kind of query results in query plans of the following form 
because of analysis rule
+        // `ResolveAggregateFunctions`:
+        //
+        //   Project ...
+        //    +- Filter ...
+        //        +- Aggregate ...
+        //            +- MetastoreRelation default, src, None
+        case plan @ Project(_, Filter(_, _: Aggregate)) =>
+          wrapChildWithSubquery(plan)
+
+        case plan @ Project(_,
+          _: Subquery | _: Filter | _: Join | _: MetastoreRelation | 
OneRowRelation | _: Limit
+        ) => plan
+
+        case plan: Project =>
+          wrapChildWithSubquery(plan)
+      }
+
+      def wrapChildWithSubquery(project: Project): Project = project match {
+        case Project(projectList, child) =>
+          val alias = SQLBuilder.newSubqueryName
+          val childAttributes = child.outputSet
+          val aliasedProjectList = projectList.map(_.transform {
+            case a: Attribute if childAttributes.contains(a) =>
+              a.withQualifiers(alias :: Nil)
+          }.asInstanceOf[NamedExpression])
+
+          Project(aliasedProjectList, Subquery(alias, child))
+      }
+    }
+  }
+}
+
+object SQLBuilder {
+  private val nextSubqueryId = new AtomicLong(0)
+
+  private def newSubqueryName: String = 
s"gen_subquery_${nextSubqueryId.getAndIncrement()}"
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to