Repository: spark
Updated Branches:
  refs/heads/master 82183f7b5 -> 00d176d2f


[SPARK-20392][SQL] Set barrier to prevent re-entering a tree

## What changes were proposed in this pull request?

The SQL `Analyzer` goes through a whole query plan even most part of it is 
analyzed. This increases the time spent on query analysis for long pipelines in 
ML, especially.

This patch adds a logical node called `AnalysisBarrier` that wraps an analyzed 
logical plan to prevent it from analysis again. The barrier is applied to the 
analyzed logical plan in `Dataset`. It won't change the output of wrapped 
logical plan and just acts as a wrapper to hide it from analyzer. New 
operations on the dataset will be put on the barrier, so only the new nodes 
created will be analyzed.

This analysis barrier will be removed at the end of analysis stage.

## How was this patch tested?

Added tests.

Author: Liang-Chi Hsieh <vii...@gmail.com>

Closes #19873 from viirya/SPARK-20392-reopen.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/00d176d2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/00d176d2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/00d176d2

Branch: refs/heads/master
Commit: 00d176d2fe7bbdf55cb3146a9cb04ca99b1858b7
Parents: 82183f7
Author: Liang-Chi Hsieh <vii...@gmail.com>
Authored: Tue Dec 5 21:43:41 2017 -0800
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Tue Dec 5 21:43:41 2017 -0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  | 77 ++++++++++-------
 .../sql/catalyst/analysis/CheckAnalysis.scala   |  4 -
 .../catalyst/analysis/DecimalPrecision.scala    |  2 +-
 .../analysis/ResolveTableValuedFunctions.scala  |  2 +-
 .../analysis/SubstituteUnresolvedOrdinals.scala |  2 +-
 .../sql/catalyst/analysis/TypeCoercion.scala    | 32 ++++---
 .../catalyst/analysis/timeZoneAnalysis.scala    |  2 +-
 .../spark/sql/catalyst/analysis/view.scala      |  2 +-
 .../spark/sql/catalyst/optimizer/subquery.scala |  2 +-
 .../catalyst/plans/logical/LogicalPlan.scala    | 49 -----------
 .../plans/logical/basicLogicalOperators.scala   | 19 ++++
 .../sql/catalyst/analysis/AnalysisSuite.scala   | 14 +++
 .../sql/catalyst/plans/LogicalPlanSuite.scala   | 42 ++++++---
 .../scala/org/apache/spark/sql/Dataset.scala    | 91 ++++++++++----------
 .../spark/sql/execution/datasources/rules.scala |  2 +-
 .../spark/sql/execution/PlannerSuite.scala      |  2 +-
 .../apache/spark/sql/hive/HiveStrategies.scala  |  6 +-
 17 files changed, 185 insertions(+), 165 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/00d176d2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index e5c93b5..0d5e866 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -165,14 +165,15 @@ class Analyzer(
     Batch("Subquery", Once,
       UpdateOuterReferences),
     Batch("Cleanup", fixedPoint,
-      CleanupAliases)
+      CleanupAliases,
+      EliminateBarriers)
   )
 
   /**
    * Analyze cte definitions and substitute child plan with analyzed cte 
definitions.
    */
   object CTESubstitution extends Rule[LogicalPlan] {
-    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators  {
+    def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp  {
       case With(child, relations) =>
         substituteCTE(child, relations.foldLeft(Seq.empty[(String, 
LogicalPlan)]) {
           case (resolved, (name, relation)) =>
@@ -200,7 +201,7 @@ class Analyzer(
    * Substitute child plan with WindowSpecDefinitions.
    */
   object WindowsSubstitution extends Rule[LogicalPlan] {
-    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
+    def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
       // Lookup WindowSpecDefinitions. This rule works with unresolved 
children.
       case WithWindowDefinition(windowDefinitions, child) =>
         child.transform {
@@ -242,7 +243,7 @@ class Analyzer(
     private def hasUnresolvedAlias(exprs: Seq[NamedExpression]) =
       exprs.exists(_.find(_.isInstanceOf[UnresolvedAlias]).isDefined)
 
-    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
+    def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
       case Aggregate(groups, aggs, child) if child.resolved && 
hasUnresolvedAlias(aggs) =>
         Aggregate(groups, assignAliases(aggs), child)
 
@@ -611,7 +612,7 @@ class Analyzer(
       case _ => plan
     }
 
-    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
+    def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
       case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if 
child.resolved =>
         EliminateSubqueryAliases(lookupTableFromCatalog(u)) match {
           case v: View =>
@@ -666,7 +667,9 @@ class Analyzer(
      * Generate a new logical plan for the right child with different 
expression IDs
      * for all conflicting attributes.
      */
-    private def dedupRight (left: LogicalPlan, right: LogicalPlan): 
LogicalPlan = {
+    private def dedupRight (left: LogicalPlan, originalRight: LogicalPlan): 
LogicalPlan = {
+      // Remove analysis barrier if any.
+      val right = EliminateBarriers(originalRight)
       val conflictingAttributes = left.outputSet.intersect(right.outputSet)
       logDebug(s"Conflicting attributes ${conflictingAttributes.mkString(",")} 
" +
         s"between $left and $right")
@@ -709,7 +712,7 @@ class Analyzer(
            * that this rule cannot handle. When that is the case, there must 
be another rule
            * that resolves these conflicts. Otherwise, the analysis will fail.
            */
-          right
+          originalRight
         case Some((oldRelation, newRelation)) =>
           val attributeRewrites = 
AttributeMap(oldRelation.output.zip(newRelation.output))
           val newRight = right transformUp {
@@ -722,7 +725,7 @@ class Analyzer(
                 s.withNewPlan(dedupOuterReferencesInSubquery(s.plan, 
attributeRewrites))
             }
           }
-          newRight
+          AnalysisBarrier(newRight)
       }
     }
 
@@ -799,7 +802,7 @@ class Analyzer(
       case _ => e.mapChildren(resolve(_, q))
     }
 
-    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
+    def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
       case p: LogicalPlan if !p.childrenResolved => p
 
       // If the projection list contains Stars, expand it.
@@ -993,7 +996,7 @@ class Analyzer(
    * have no effect on the results.
    */
   object ResolveOrdinalInOrderByAndGroupBy extends Rule[LogicalPlan] {
-    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
+    def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
       case p if !p.childrenResolved => p
       // Replace the index with the related attribute for ORDER BY,
       // which is a 1-base position of the projection list.
@@ -1049,7 +1052,7 @@ class Analyzer(
       }}
     }
 
-    override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators 
{
+    override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
       case agg @ Aggregate(groups, aggs, child)
           if conf.groupByAliases && child.resolved && aggs.forall(_.resolved) 
&&
             groups.exists(!_.resolved) =>
@@ -1073,11 +1076,13 @@ class Analyzer(
    * The HAVING clause could also used a grouping columns that is not 
presented in the SELECT.
    */
   object ResolveMissingReferences extends Rule[LogicalPlan] {
-    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
+    def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
       // Skip sort with aggregate. This will be handled in 
ResolveAggregateFunctions
+      case sa @ Sort(_, _, AnalysisBarrier(child: Aggregate)) => sa
       case sa @ Sort(_, _, child: Aggregate) => sa
 
-      case s @ Sort(order, _, child) if !s.resolved && child.resolved =>
+      case s @ Sort(order, _, originalChild) if !s.resolved && 
originalChild.resolved =>
+        val child = EliminateBarriers(originalChild)
         try {
           val newOrder = order.map(resolveExpressionRecursively(_, 
child).asInstanceOf[SortOrder])
           val requiredAttrs = AttributeSet(newOrder).filter(_.resolved)
@@ -1098,7 +1103,8 @@ class Analyzer(
           case ae: AnalysisException => s
         }
 
-      case f @ Filter(cond, child) if !f.resolved && child.resolved =>
+      case f @ Filter(cond, originalChild) if !f.resolved && 
originalChild.resolved =>
+        val child = EliminateBarriers(originalChild)
         try {
           val newCond = resolveExpressionRecursively(cond, child)
           val requiredAttrs = newCond.references.filter(_.resolved)
@@ -1125,7 +1131,7 @@ class Analyzer(
      */
     private def addMissingAttr(plan: LogicalPlan, missingAttrs: AttributeSet): 
LogicalPlan = {
       if (missingAttrs.isEmpty) {
-        return plan
+        return AnalysisBarrier(plan)
       }
       plan match {
         case p: Project =>
@@ -1197,7 +1203,7 @@ class Analyzer(
    * Replaces [[UnresolvedFunction]]s with concrete [[Expression]]s.
    */
   object ResolveFunctions extends Rule[LogicalPlan] {
-    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
+    def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
       case q: LogicalPlan =>
         q transformExpressions {
           case u if !u.childrenResolved => u // Skip until children are 
resolved.
@@ -1334,7 +1340,7 @@ class Analyzer(
     /**
      * Resolve and rewrite all subqueries in an operator tree..
      */
-    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
+    def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
       // In case of HAVING (a filter after an aggregate) we use both the 
aggregate and
       // its child for resolution.
       case f @ Filter(_, a: Aggregate) if f.childrenResolved =>
@@ -1350,7 +1356,7 @@ class Analyzer(
    */
   object ResolveSubqueryColumnAliases extends Rule[LogicalPlan] {
 
-     def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
+     def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
       case u @ UnresolvedSubqueryColumnAliases(columnNames, child) if 
child.resolved =>
         // Resolves output attributes if a query has alias names in its 
subquery:
         // e.g., SELECT * FROM (SELECT 1 AS a, 1 AS b) t(col1, col2)
@@ -1373,7 +1379,7 @@ class Analyzer(
    * Turns projections that contain aggregate expressions into aggregations.
    */
   object GlobalAggregates extends Rule[LogicalPlan] {
-    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
+    def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
       case Project(projectList, child) if containsAggregates(projectList) =>
         Aggregate(Nil, projectList, child)
     }
@@ -1399,7 +1405,9 @@ class Analyzer(
    * underlying aggregate operator and then projected away after the original 
operator.
    */
   object ResolveAggregateFunctions extends Rule[LogicalPlan] {
-    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
+    def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+      case filter @ Filter(havingCondition, AnalysisBarrier(aggregate: 
Aggregate)) =>
+        apply(Filter(havingCondition, aggregate)).mapChildren(AnalysisBarrier)
       case filter @ Filter(havingCondition,
              aggregate @ Aggregate(grouping, originalAggExprs, child))
           if aggregate.resolved =>
@@ -1459,6 +1467,8 @@ class Analyzer(
           case ae: AnalysisException => filter
         }
 
+      case sort @ Sort(sortOrder, global, AnalysisBarrier(aggregate: 
Aggregate)) =>
+        apply(Sort(sortOrder, global, aggregate)).mapChildren(AnalysisBarrier)
       case sort @ Sort(sortOrder, global, aggregate: Aggregate) if 
aggregate.resolved =>
 
         // Try resolving the ordering as though it is in the aggregate clause.
@@ -1571,7 +1581,7 @@ class Analyzer(
       }
     }
 
-    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
+    def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
       case Project(projectList, _) if projectList.exists(hasNestedGenerator) =>
         val nestedGenerator = projectList.find(hasNestedGenerator).get
         throw new AnalysisException("Generators are not supported when it's 
nested in " +
@@ -1629,7 +1639,7 @@ class Analyzer(
    * that wrap the [[Generator]].
    */
   object ResolveGenerate extends Rule[LogicalPlan] {
-    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
+    def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
       case g: Generate if !g.child.resolved || !g.generator.resolved => g
       case g: Generate if !g.resolved =>
         g.copy(generatorOutput = makeGeneratorOutput(g.generator, 
g.generatorOutput.map(_.name)))
@@ -1946,7 +1956,7 @@ class Analyzer(
    * put them into an inner Project and finally project them away at the outer 
Project.
    */
   object PullOutNondeterministic extends Rule[LogicalPlan] {
-    override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators 
{
+    override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
       case p if !p.resolved => p // Skip unresolved nodes.
       case p: Project => p
       case f: Filter => f
@@ -1991,7 +2001,7 @@ class Analyzer(
    * and we should return null if the input is null.
    */
   object HandleNullInputsForUDF extends Rule[LogicalPlan] {
-    override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators 
{
+    override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
       case p if !p.resolved => p // Skip unresolved nodes.
 
       case p => p transformExpressionsUp {
@@ -2056,7 +2066,7 @@ class Analyzer(
    * Then apply a Project on a normal Join to eliminate natural or using join.
    */
   object ResolveNaturalAndUsingJoin extends Rule[LogicalPlan] {
-    override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators 
{
+    override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
       case j @ Join(left, right, UsingJoin(joinType, usingCols), condition)
           if left.resolved && right.resolved && j.duplicateResolved =>
         commonNaturalJoinProcessing(left, right, joinType, usingCols, None)
@@ -2121,7 +2131,7 @@ class Analyzer(
    * to the given input attributes.
    */
   object ResolveDeserializer extends Rule[LogicalPlan] {
-    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
+    def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
       case p if !p.childrenResolved => p
       case p if p.resolved => p
 
@@ -2207,7 +2217,7 @@ class Analyzer(
    * constructed is an inner class.
    */
   object ResolveNewInstance extends Rule[LogicalPlan] {
-    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
+    def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
       case p if !p.childrenResolved => p
       case p if p.resolved => p
 
@@ -2241,7 +2251,7 @@ class Analyzer(
         "type of the field in the target object")
     }
 
-    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
+    def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
       case p if !p.childrenResolved => p
       case p if p.resolved => p
 
@@ -2300,7 +2310,7 @@ object CleanupAliases extends Rule[LogicalPlan] {
     case other => trimAliases(other)
   }
 
-  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
     case Project(projectList, child) =>
       val cleanedProjectList =
         
projectList.map(trimNonTopLevelAliases(_).asInstanceOf[NamedExpression])
@@ -2329,6 +2339,13 @@ object CleanupAliases extends Rule[LogicalPlan] {
   }
 }
 
+/** Remove the barrier nodes of analysis */
+object EliminateBarriers extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
+    case AnalysisBarrier(child) => child
+  }
+}
+
 /**
  * Ignore event time watermark in batch query, which is only supported in 
Structured Streaming.
  * TODO: add this rule into analyzer rule list.
@@ -2379,7 +2396,7 @@ object TimeWindowing extends Rule[LogicalPlan] {
    * @return the logical plan that will generate the time windows using the 
Expand operator, with
    *         the Filter operator for correctness and Project for usability.
    */
-  def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
+  def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
     case p: LogicalPlan if p.children.size == 1 =>
       val child = p.children.head
       val windowExpressions =

http://git-wip-us.apache.org/repos/asf/spark/blob/00d176d2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index b5e8bdd..6894aed 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -78,8 +78,6 @@ trait CheckAnalysis extends PredicateHelper {
     // We transform up and order the rules so as to catch the first possible 
failure instead
     // of the result of cascading resolution failures.
     plan.foreachUp {
-      case p if p.analyzed => // Skip already analyzed sub-plans
-
       case u: UnresolvedRelation =>
         u.failAnalysis(s"Table or view not found: ${u.tableIdentifier}")
 
@@ -353,8 +351,6 @@ trait CheckAnalysis extends PredicateHelper {
       case o if !o.resolved => failAnalysis(s"unresolved operator 
${o.simpleString}")
       case _ =>
     }
-
-    plan.foreach(_.setAnalyzed())
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/00d176d2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecision.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecision.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecision.scala
index 070bc54..a8100b9 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecision.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecision.scala
@@ -78,7 +78,7 @@ object DecimalPrecision extends TypeCoercionRule {
     PromotePrecision(Cast(e, dataType))
   }
 
-  override protected def coerceTypes(plan: LogicalPlan): LogicalPlan = plan 
resolveOperators {
+  override protected def coerceTypes(plan: LogicalPlan): LogicalPlan = plan 
transformUp {
     // fix decimal precision for expressions
     case q => q.transformExpressionsUp(
       
decimalAndDecimal.orElse(integralAndDecimalLiteral).orElse(nondecimalAndDecimal))

http://git-wip-us.apache.org/repos/asf/spark/blob/00d176d2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala
index 7358f9e..a214e59 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala
@@ -103,7 +103,7 @@ object ResolveTableValuedFunctions extends 
Rule[LogicalPlan] {
       })
   )
 
-  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
     case u: UnresolvedTableValuedFunction if u.functionArgs.forall(_.resolved) 
=>
       val resolvedFunc = 
builtinFunctions.get(u.functionName.toLowerCase(Locale.ROOT)) match {
         case Some(tvf) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/00d176d2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/SubstituteUnresolvedOrdinals.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/SubstituteUnresolvedOrdinals.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/SubstituteUnresolvedOrdinals.scala
index 860d20f..f9fd0df 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/SubstituteUnresolvedOrdinals.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/SubstituteUnresolvedOrdinals.scala
@@ -33,7 +33,7 @@ class SubstituteUnresolvedOrdinals(conf: SQLConf) extends 
Rule[LogicalPlan] {
     case _ => false
   }
 
-  def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
     case s: Sort if conf.orderByOrdinal && s.order.exists(o => 
isIntLiteral(o.child)) =>
       val newOrders = s.order.map {
         case order @ SortOrder(ordinal @ Literal(index: Int, IntegerType), _, 
_, _) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/00d176d2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
index 1ee2f6e..2f306f5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
@@ -247,9 +247,7 @@ object TypeCoercion {
    */
   object WidenSetOperationTypes extends Rule[LogicalPlan] {
 
-    def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-      case p if p.analyzed => p
-
+    def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
       case s @ SetOperation(left, right) if s.childrenResolved &&
           left.output.length == right.output.length && !s.resolved =>
         val newChildren: Seq[LogicalPlan] = 
buildNewChildrenWithWiderTypes(left :: right :: Nil)
@@ -321,7 +319,8 @@ object TypeCoercion {
       }
     }
 
-    override protected def coerceTypes(plan: LogicalPlan): LogicalPlan = plan 
resolveExpressions {
+    override protected def coerceTypes(
+        plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
       // Skip nodes who's children have not been resolved yet.
       case e if !e.childrenResolved => e
 
@@ -380,7 +379,8 @@ object TypeCoercion {
       }
     }
 
-    override protected def coerceTypes(plan: LogicalPlan): LogicalPlan = plan 
resolveExpressions {
+    override protected def coerceTypes(
+        plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
       // Skip nodes who's children have not been resolved yet.
       case e if !e.childrenResolved => e
 
@@ -439,7 +439,7 @@ object TypeCoercion {
     private val trueValues = Seq(1.toByte, 1.toShort, 1, 1L, Decimal.ONE)
     private val falseValues = Seq(0.toByte, 0.toShort, 0, 0L, Decimal.ZERO)
 
-    def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
+    def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
       // Skip nodes who's children have not been resolved yet.
       case e if !e.childrenResolved => e
 
@@ -480,7 +480,8 @@ object TypeCoercion {
    * This ensure that the types for various functions are as expected.
    */
   object FunctionArgumentConversion extends TypeCoercionRule {
-    override protected def coerceTypes(plan: LogicalPlan): LogicalPlan = plan 
resolveExpressions {
+    override protected def coerceTypes(
+        plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
       // Skip nodes who's children have not been resolved yet.
       case e if !e.childrenResolved => e
 
@@ -570,7 +571,8 @@ object TypeCoercion {
    * converted to fractional types.
    */
   object Division extends TypeCoercionRule {
-    override protected def coerceTypes(plan: LogicalPlan): LogicalPlan = plan 
resolveExpressions {
+    override protected def coerceTypes(
+        plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
       // Skip nodes who has not been resolved yet,
       // as this is an extra rule which should be applied at last.
       case e if !e.childrenResolved => e
@@ -592,7 +594,8 @@ object TypeCoercion {
    * Coerces the type of different branches of a CASE WHEN statement to a 
common type.
    */
   object CaseWhenCoercion extends TypeCoercionRule {
-    override protected def coerceTypes(plan: LogicalPlan): LogicalPlan = plan 
resolveExpressions {
+    override protected def coerceTypes(
+        plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
       case c: CaseWhen if c.childrenResolved && !c.valueTypesEqual =>
         val maybeCommonType = findWiderCommonType(c.valueTypes)
         maybeCommonType.map { commonType =>
@@ -622,7 +625,8 @@ object TypeCoercion {
    * Coerces the type of different branches of If statement to a common type.
    */
   object IfCoercion extends TypeCoercionRule {
-    override protected def coerceTypes(plan: LogicalPlan): LogicalPlan = plan 
resolveExpressions {
+    override protected def coerceTypes(
+        plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
       case e if !e.childrenResolved => e
       // Find tightest common type for If, if the true value and false value 
have different types.
       case i @ If(pred, left, right) if left.dataType != right.dataType =>
@@ -662,7 +666,7 @@ object TypeCoercion {
 
     private val acceptedTypes = Seq(DateType, TimestampType, StringType)
 
-    def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
+    def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
       // Skip nodes who's children have not been resolved yet.
       case e if !e.childrenResolved => e
 
@@ -679,7 +683,8 @@ object TypeCoercion {
    * Casts types according to the expected input types for [[Expression]]s.
    */
   object ImplicitTypeCasts extends TypeCoercionRule {
-    override protected def coerceTypes(plan: LogicalPlan): LogicalPlan = plan 
resolveExpressions {
+    override protected def coerceTypes(
+        plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
       // Skip nodes who's children have not been resolved yet.
       case e if !e.childrenResolved => e
 
@@ -796,7 +801,8 @@ object TypeCoercion {
    * Cast WindowFrame boundaries to the type they operate upon.
    */
   object WindowFrameCoercion extends TypeCoercionRule {
-    override protected def coerceTypes(plan: LogicalPlan): LogicalPlan = plan 
resolveExpressions {
+    override protected def coerceTypes(
+        plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
       case s @ WindowSpecDefinition(_, Seq(order), 
SpecifiedWindowFrame(RangeFrame, lower, upper))
           if order.resolved =>
         s.copy(frameSpecification = SpecifiedWindowFrame(

http://git-wip-us.apache.org/repos/asf/spark/blob/00d176d2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/timeZoneAnalysis.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/timeZoneAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/timeZoneAnalysis.scala
index a27aa84..af1f916 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/timeZoneAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/timeZoneAnalysis.scala
@@ -38,7 +38,7 @@ case class ResolveTimeZone(conf: SQLConf) extends 
Rule[LogicalPlan] {
   }
 
   override def apply(plan: LogicalPlan): LogicalPlan =
-    plan.resolveExpressions(transformTimeZoneExprs)
+    plan.transformAllExpressions(transformTimeZoneExprs)
 
   def resolveTimeZones(e: Expression): Expression = 
e.transform(transformTimeZoneExprs)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/00d176d2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
index ea46dd7..3bbe41c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
@@ -48,7 +48,7 @@ import org.apache.spark.sql.internal.SQLConf
  * completely resolved during the batch of Resolution.
  */
 case class AliasViewChild(conf: SQLConf) extends Rule[LogicalPlan] with 
CastSupport {
-  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
     case v @ View(desc, output, child) if child.resolved && output != 
child.output =>
       val resolver = conf.resolver
       val queryColumnNames = desc.viewQueryColumnNames

http://git-wip-us.apache.org/repos/asf/spark/blob/00d176d2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
index 64b2856..2673bea 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
@@ -270,7 +270,7 @@ object PullupCorrelatedPredicates extends Rule[LogicalPlan] 
with PredicateHelper
   /**
    * Pull up the correlated predicates and rewrite all subqueries in an 
operator tree..
    */
-  def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
     case f @ Filter(_, a: Aggregate) =>
       rewriteSubQueries(f, Seq(a, a.child))
     // Only a few unary nodes (Project/Filter/Aggregate) can contain 
subqueries.

http://git-wip-us.apache.org/repos/asf/spark/blob/00d176d2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 1418882..a38458a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -33,58 +33,9 @@ abstract class LogicalPlan
   with QueryPlanConstraints
   with Logging {
 
-  private var _analyzed: Boolean = false
-
-  /**
-   * Marks this plan as already analyzed. This should only be called by 
[[CheckAnalysis]].
-   */
-  private[catalyst] def setAnalyzed(): Unit = { _analyzed = true }
-
-  /**
-   * Returns true if this node and its children have already been gone through 
analysis and
-   * verification.  Note that this is only an optimization used to avoid 
analyzing trees that
-   * have already been analyzed, and can be reset by transformations.
-   */
-  def analyzed: Boolean = _analyzed
-
   /** Returns true if this subtree has data from a streaming data source. */
   def isStreaming: Boolean = children.exists(_.isStreaming == true)
 
-  /**
-   * Returns a copy of this node where `rule` has been recursively applied 
first to all of its
-   * children and then itself (post-order). When `rule` does not apply to a 
given node, it is left
-   * unchanged.  This function is similar to `transformUp`, but skips 
sub-trees that have already
-   * been marked as analyzed.
-   *
-   * @param rule the function use to transform this nodes children
-   */
-  def resolveOperators(rule: PartialFunction[LogicalPlan, LogicalPlan]): 
LogicalPlan = {
-    if (!analyzed) {
-      val afterRuleOnChildren = mapChildren(_.resolveOperators(rule))
-      if (this fastEquals afterRuleOnChildren) {
-        CurrentOrigin.withOrigin(origin) {
-          rule.applyOrElse(this, identity[LogicalPlan])
-        }
-      } else {
-        CurrentOrigin.withOrigin(origin) {
-          rule.applyOrElse(afterRuleOnChildren, identity[LogicalPlan])
-        }
-      }
-    } else {
-      this
-    }
-  }
-
-  /**
-   * Recursively transforms the expressions of a tree, skipping nodes that 
have already
-   * been analyzed.
-   */
-  def resolveExpressions(r: PartialFunction[Expression, Expression]): 
LogicalPlan = {
-    this resolveOperators  {
-      case p => p.transformExpressions(r)
-    }
-  }
-
   override def verboseStringWithSuffix: String = {
     super.verboseString + statsCache.map(", " + _.toString).getOrElse("")
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/00d176d2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index ba5f97d..cd47455 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -883,3 +883,22 @@ case class Deduplicate(
 
   override def output: Seq[Attribute] = child.output
 }
+
+/**
+ * A logical plan for setting a barrier of analysis.
+ *
+ * The SQL Analyzer goes through a whole query plan even most part of it is 
analyzed. This
+ * increases the time spent on query analysis for long pipelines in ML, 
especially.
+ *
+ * This logical plan wraps an analyzed logical plan to prevent it from 
analysis again. The barrier
+ * is applied to the analyzed logical plan in Dataset. It won't change the 
output of wrapped
+ * logical plan and just acts as a wrapper to hide it from analyzer. New 
operations on the dataset
+ * will be put on the barrier, so only the new nodes created will be analyzed.
+ *
+ * This analysis barrier will be removed at the end of analysis stage.
+ */
+case class AnalysisBarrier(child: LogicalPlan) extends LeafNode {
+  override def output: Seq[Attribute] = child.output
+  override def isStreaming: Boolean = child.isStreaming
+  override def doCanonicalize(): LogicalPlan = child.canonicalized
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/00d176d2/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 109fb32..f451420 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -543,4 +543,18 @@ class AnalysisSuite extends AnalysisTest with Matchers {
       checkPartitioning(numPartitions = 10, exprs = SortOrder('a.attr, 
Ascending), 'b.attr)
     }
   }
+
+  test("SPARK-20392: analysis barrier") {
+    // [[AnalysisBarrier]] will be removed after analysis
+    checkAnalysis(
+      Project(Seq(UnresolvedAttribute("tbl.a")),
+        AnalysisBarrier(SubqueryAlias("tbl", testRelation))),
+      Project(testRelation.output, SubqueryAlias("tbl", testRelation)))
+
+    // Verify we won't go through a plan wrapped in a barrier.
+    // Since we wrap an unresolved plan and analyzer won't go through it. It 
remains unresolved.
+    val barrier = AnalysisBarrier(Project(Seq(UnresolvedAttribute("tbl.b")),
+      SubqueryAlias("tbl", testRelation)))
+    assertAnalysisError(barrier, Seq("cannot resolve '`tbl.b`'"))
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/00d176d2/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala
index cdf912d..1404174 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala
@@ -23,8 +23,8 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.types.IntegerType
 
 /**
- * This suite is used to test [[LogicalPlan]]'s `resolveOperators` and make 
sure it can correctly
- * skips sub-trees that have already been marked as analyzed.
+ * This suite is used to test [[LogicalPlan]]'s `transformUp/transformDown` 
plus analysis barrier
+ * and make sure it can correctly skip sub-trees that have already been 
analyzed.
  */
 class LogicalPlanSuite extends SparkFunSuite {
   private var invocationCount = 0
@@ -36,39 +36,53 @@ class LogicalPlanSuite extends SparkFunSuite {
 
   private val testRelation = LocalRelation()
 
-  test("resolveOperator runs on operators") {
+  test("transformUp runs on operators") {
     invocationCount = 0
     val plan = Project(Nil, testRelation)
-    plan resolveOperators function
+    plan transformUp function
 
     assert(invocationCount === 1)
+
+    invocationCount = 0
+    plan transformDown function
+    assert(invocationCount === 1)
   }
 
-  test("resolveOperator runs on operators recursively") {
+  test("transformUp runs on operators recursively") {
     invocationCount = 0
     val plan = Project(Nil, Project(Nil, testRelation))
-    plan resolveOperators function
+    plan transformUp function
 
     assert(invocationCount === 2)
+
+    invocationCount = 0
+    plan transformDown function
+    assert(invocationCount === 2)
   }
 
-  test("resolveOperator skips all ready resolved plans") {
+  test("transformUp skips all ready resolved plans wrapped in analysis 
barrier") {
     invocationCount = 0
-    val plan = Project(Nil, Project(Nil, testRelation))
-    plan.foreach(_.setAnalyzed())
-    plan resolveOperators function
+    val plan = AnalysisBarrier(Project(Nil, Project(Nil, testRelation)))
+    plan transformUp function
 
     assert(invocationCount === 0)
+
+    invocationCount = 0
+    plan transformDown function
+    assert(invocationCount === 0)
   }
 
-  test("resolveOperator skips partially resolved plans") {
+  test("transformUp skips partially resolved plans wrapped in analysis 
barrier") {
     invocationCount = 0
-    val plan1 = Project(Nil, testRelation)
+    val plan1 = AnalysisBarrier(Project(Nil, testRelation))
     val plan2 = Project(Nil, plan1)
-    plan1.foreach(_.setAnalyzed())
-    plan2 resolveOperators function
+    plan2 transformUp function
 
     assert(invocationCount === 1)
+
+    invocationCount = 0
+    plan2 transformDown function
+    assert(invocationCount === 1)
   }
 
   test("isStreaming") {

http://git-wip-us.apache.org/repos/asf/spark/blob/00d176d2/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 167c9d0..c34cf0a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -191,6 +191,9 @@ class Dataset[T] private[sql](
     }
   }
 
+  // Wraps analyzed logical plans with an analysis barrier so we won't 
traverse/resolve it again.
+  @transient private val planWithBarrier = AnalysisBarrier(logicalPlan)
+
   /**
    * Currently [[ExpressionEncoder]] is the only implementation of 
[[Encoder]], here we turn the
    * passed in encoder to [[ExpressionEncoder]] explicitly, and mark it 
implicit so that we can use
@@ -403,7 +406,7 @@ class Dataset[T] private[sql](
    */
   @Experimental
   @InterfaceStability.Evolving
-  def as[U : Encoder]: Dataset[U] = Dataset[U](sparkSession, logicalPlan)
+  def as[U : Encoder]: Dataset[U] = Dataset[U](sparkSession, planWithBarrier)
 
   /**
    * Converts this strongly typed collection of data to generic `DataFrame` 
with columns renamed.
@@ -604,7 +607,7 @@ class Dataset[T] private[sql](
     require(parsedDelay.milliseconds >= 0 && parsedDelay.months >= 0,
       s"delay threshold ($delayThreshold) should not be negative.")
     EliminateEventTimeWatermark(
-      EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, 
logicalPlan))
+      EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, 
planWithBarrier))
   }
 
   /**
@@ -777,7 +780,7 @@ class Dataset[T] private[sql](
    * @since 2.0.0
    */
   def join(right: Dataset[_]): DataFrame = withPlan {
-    Join(logicalPlan, right.logicalPlan, joinType = Inner, None)
+    Join(planWithBarrier, right.planWithBarrier, joinType = Inner, None)
   }
 
   /**
@@ -855,7 +858,7 @@ class Dataset[T] private[sql](
     // Analyze the self join. The assumption is that the analyzer will 
disambiguate left vs right
     // by creating a new instance for one of the branch.
     val joined = sparkSession.sessionState.executePlan(
-      Join(logicalPlan, right.logicalPlan, joinType = JoinType(joinType), 
None))
+      Join(planWithBarrier, right.planWithBarrier, joinType = 
JoinType(joinType), None))
       .analyzed.asInstanceOf[Join]
 
     withPlan {
@@ -916,7 +919,7 @@ class Dataset[T] private[sql](
     // Trigger analysis so in the case of self-join, the analyzer will clone 
the plan.
     // After the cloning, left and right side will have distinct expression 
ids.
     val plan = withPlan(
-      Join(logicalPlan, right.logicalPlan, JoinType(joinType), 
Some(joinExprs.expr)))
+      Join(planWithBarrier, right.planWithBarrier, JoinType(joinType), 
Some(joinExprs.expr)))
       .queryExecution.analyzed.asInstanceOf[Join]
 
     // If auto self join alias is disabled, return the plan.
@@ -925,8 +928,8 @@ class Dataset[T] private[sql](
     }
 
     // If left/right have no output set intersection, return the plan.
-    val lanalyzed = withPlan(this.logicalPlan).queryExecution.analyzed
-    val ranalyzed = withPlan(right.logicalPlan).queryExecution.analyzed
+    val lanalyzed = withPlan(this.planWithBarrier).queryExecution.analyzed
+    val ranalyzed = withPlan(right.planWithBarrier).queryExecution.analyzed
     if (lanalyzed.outputSet.intersect(ranalyzed.outputSet).isEmpty) {
       return withPlan(plan)
     }
@@ -958,7 +961,7 @@ class Dataset[T] private[sql](
    * @since 2.1.0
    */
   def crossJoin(right: Dataset[_]): DataFrame = withPlan {
-    Join(logicalPlan, right.logicalPlan, joinType = Cross, None)
+    Join(planWithBarrier, right.planWithBarrier, joinType = Cross, None)
   }
 
   /**
@@ -990,8 +993,8 @@ class Dataset[T] private[sql](
     // etc.
     val joined = sparkSession.sessionState.executePlan(
       Join(
-        this.logicalPlan,
-        other.logicalPlan,
+        this.planWithBarrier,
+        other.planWithBarrier,
         JoinType(joinType),
         Some(condition.expr))).analyzed.asInstanceOf[Join]
 
@@ -1212,7 +1215,7 @@ class Dataset[T] private[sql](
    * @since 1.6.0
    */
   def as(alias: String): Dataset[T] = withTypedPlan {
-    SubqueryAlias(alias, logicalPlan)
+    SubqueryAlias(alias, planWithBarrier)
   }
 
   /**
@@ -1250,7 +1253,7 @@ class Dataset[T] private[sql](
    */
   @scala.annotation.varargs
   def select(cols: Column*): DataFrame = withPlan {
-    Project(cols.map(_.named), logicalPlan)
+    Project(cols.map(_.named), planWithBarrier)
   }
 
   /**
@@ -1305,8 +1308,8 @@ class Dataset[T] private[sql](
   @InterfaceStability.Evolving
   def select[U1](c1: TypedColumn[T, U1]): Dataset[U1] = {
     implicit val encoder = c1.encoder
-    val project = Project(c1.withInputType(exprEnc, logicalPlan.output).named 
:: Nil,
-      logicalPlan)
+    val project = Project(c1.withInputType(exprEnc, 
planWithBarrier.output).named :: Nil,
+      planWithBarrier)
 
     if (encoder.flat) {
       new Dataset[U1](sparkSession, project, encoder)
@@ -1324,8 +1327,8 @@ class Dataset[T] private[sql](
   protected def selectUntyped(columns: TypedColumn[_, _]*): Dataset[_] = {
     val encoders = columns.map(_.encoder)
     val namedColumns =
-      columns.map(_.withInputType(exprEnc, logicalPlan.output).named)
-    val execution = new QueryExecution(sparkSession, Project(namedColumns, 
logicalPlan))
+      columns.map(_.withInputType(exprEnc, planWithBarrier.output).named)
+    val execution = new QueryExecution(sparkSession, Project(namedColumns, 
planWithBarrier))
     new Dataset(sparkSession, execution, ExpressionEncoder.tuple(encoders))
   }
 
@@ -1401,7 +1404,7 @@ class Dataset[T] private[sql](
    * @since 1.6.0
    */
   def filter(condition: Column): Dataset[T] = withTypedPlan {
-    Filter(condition.expr, logicalPlan)
+    Filter(condition.expr, planWithBarrier)
   }
 
   /**
@@ -1578,7 +1581,7 @@ class Dataset[T] private[sql](
   @Experimental
   @InterfaceStability.Evolving
   def groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K, T] = {
-    val inputPlan = logicalPlan
+    val inputPlan = planWithBarrier
     val withGroupingKey = AppendColumns(func, inputPlan)
     val executed = sparkSession.sessionState.executePlan(withGroupingKey)
 
@@ -1724,7 +1727,7 @@ class Dataset[T] private[sql](
    * @since 2.0.0
    */
   def limit(n: Int): Dataset[T] = withTypedPlan {
-    Limit(Literal(n), logicalPlan)
+    Limit(Literal(n), planWithBarrier)
   }
 
   /**
@@ -1774,7 +1777,7 @@ class Dataset[T] private[sql](
   def union(other: Dataset[T]): Dataset[T] = withSetOperator {
     // This breaks caching, but it's usually ok because it addresses a very 
specific use case:
     // using union to union many files or partitions.
-    CombineUnions(Union(logicalPlan, other.logicalPlan))
+    CombineUnions(Union(logicalPlan, 
other.logicalPlan)).mapChildren(AnalysisBarrier)
   }
 
   /**
@@ -1833,7 +1836,7 @@ class Dataset[T] private[sql](
 
     // This breaks caching, but it's usually ok because it addresses a very 
specific use case:
     // using union to union many files or partitions.
-    CombineUnions(Union(logicalPlan, rightChild))
+    CombineUnions(Union(logicalPlan, rightChild)).mapChildren(AnalysisBarrier)
   }
 
   /**
@@ -1847,7 +1850,7 @@ class Dataset[T] private[sql](
    * @since 1.6.0
    */
   def intersect(other: Dataset[T]): Dataset[T] = withSetOperator {
-    Intersect(logicalPlan, other.logicalPlan)
+    Intersect(planWithBarrier, other.planWithBarrier)
   }
 
   /**
@@ -1861,7 +1864,7 @@ class Dataset[T] private[sql](
    * @since 2.0.0
    */
   def except(other: Dataset[T]): Dataset[T] = withSetOperator {
-    Except(logicalPlan, other.logicalPlan)
+    Except(planWithBarrier, other.planWithBarrier)
   }
 
   /**
@@ -1912,7 +1915,7 @@ class Dataset[T] private[sql](
    */
   def sample(withReplacement: Boolean, fraction: Double, seed: Long): 
Dataset[T] = {
     withTypedPlan {
-      Sample(0.0, fraction, withReplacement, seed, logicalPlan)
+      Sample(0.0, fraction, withReplacement, seed, planWithBarrier)
     }
   }
 
@@ -1954,15 +1957,15 @@ class Dataset[T] private[sql](
     // overlapping splits. To prevent this, we explicitly sort each input 
partition to make the
     // ordering deterministic. Note that MapTypes cannot be sorted and are 
explicitly pruned out
     // from the sort order.
-    val sortOrder = logicalPlan.output
+    val sortOrder = planWithBarrier.output
       .filter(attr => RowOrdering.isOrderable(attr.dataType))
       .map(SortOrder(_, Ascending))
     val plan = if (sortOrder.nonEmpty) {
-      Sort(sortOrder, global = false, logicalPlan)
+      Sort(sortOrder, global = false, planWithBarrier)
     } else {
       // SPARK-12662: If sort order is empty, we materialize the dataset to 
guarantee determinism
       cache()
-      logicalPlan
+      planWithBarrier
     }
     val sum = weights.sum
     val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
@@ -2046,7 +2049,7 @@ class Dataset[T] private[sql](
 
     withPlan {
       Generate(generator, join = true, outer = false,
-        qualifier = None, generatorOutput = Nil, logicalPlan)
+        qualifier = None, generatorOutput = Nil, planWithBarrier)
     }
   }
 
@@ -2087,7 +2090,7 @@ class Dataset[T] private[sql](
 
     withPlan {
       Generate(generator, join = true, outer = false,
-        qualifier = None, generatorOutput = Nil, logicalPlan)
+        qualifier = None, generatorOutput = Nil, planWithBarrier)
     }
   }
 
@@ -2235,7 +2238,7 @@ class Dataset[T] private[sql](
           u.name, sparkSession.sessionState.analyzer.resolver).getOrElse(u)
       case Column(expr: Expression) => expr
     }
-    val attrs = this.logicalPlan.output
+    val attrs = this.planWithBarrier.output
     val colsAfterDrop = attrs.filter { attr =>
       attr != expression
     }.map(attr => Column(attr))
@@ -2283,7 +2286,7 @@ class Dataset[T] private[sql](
       }
       cols
     }
-    Deduplicate(groupCols, logicalPlan)
+    Deduplicate(groupCols, planWithBarrier)
   }
 
   /**
@@ -2465,7 +2468,7 @@ class Dataset[T] private[sql](
   @Experimental
   @InterfaceStability.Evolving
   def filter(func: T => Boolean): Dataset[T] = {
-    withTypedPlan(TypedFilter(func, logicalPlan))
+    withTypedPlan(TypedFilter(func, planWithBarrier))
   }
 
   /**
@@ -2479,7 +2482,7 @@ class Dataset[T] private[sql](
   @Experimental
   @InterfaceStability.Evolving
   def filter(func: FilterFunction[T]): Dataset[T] = {
-    withTypedPlan(TypedFilter(func, logicalPlan))
+    withTypedPlan(TypedFilter(func, planWithBarrier))
   }
 
   /**
@@ -2493,7 +2496,7 @@ class Dataset[T] private[sql](
   @Experimental
   @InterfaceStability.Evolving
   def map[U : Encoder](func: T => U): Dataset[U] = withTypedPlan {
-    MapElements[T, U](func, logicalPlan)
+    MapElements[T, U](func, planWithBarrier)
   }
 
   /**
@@ -2508,7 +2511,7 @@ class Dataset[T] private[sql](
   @InterfaceStability.Evolving
   def map[U](func: MapFunction[T, U], encoder: Encoder[U]): Dataset[U] = {
     implicit val uEnc = encoder
-    withTypedPlan(MapElements[T, U](func, logicalPlan))
+    withTypedPlan(MapElements[T, U](func, planWithBarrier))
   }
 
   /**
@@ -2524,7 +2527,7 @@ class Dataset[T] private[sql](
   def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] 
= {
     new Dataset[U](
       sparkSession,
-      MapPartitions[T, U](func, logicalPlan),
+      MapPartitions[T, U](func, planWithBarrier),
       implicitly[Encoder[U]])
   }
 
@@ -2555,7 +2558,7 @@ class Dataset[T] private[sql](
     val rowEncoder = encoder.asInstanceOf[ExpressionEncoder[Row]]
     Dataset.ofRows(
       sparkSession,
-      MapPartitionsInR(func, packageNames, broadcastVars, schema, rowEncoder, 
logicalPlan))
+      MapPartitionsInR(func, packageNames, broadcastVars, schema, rowEncoder, 
planWithBarrier))
   }
 
   /**
@@ -2719,7 +2722,7 @@ class Dataset[T] private[sql](
    * @since 1.6.0
    */
   def repartition(numPartitions: Int): Dataset[T] = withTypedPlan {
-    Repartition(numPartitions, shuffle = true, logicalPlan)
+    Repartition(numPartitions, shuffle = true, planWithBarrier)
   }
 
   /**
@@ -2742,7 +2745,7 @@ class Dataset[T] private[sql](
          |For range partitioning use repartitionByRange(...) instead.
        """.stripMargin)
     withTypedPlan {
-      RepartitionByExpression(partitionExprs.map(_.expr), logicalPlan, 
numPartitions)
+      RepartitionByExpression(partitionExprs.map(_.expr), planWithBarrier, 
numPartitions)
     }
   }
 
@@ -2779,7 +2782,7 @@ class Dataset[T] private[sql](
       case expr: Expression => SortOrder(expr, Ascending)
     })
     withTypedPlan {
-      RepartitionByExpression(sortOrder, logicalPlan, numPartitions)
+      RepartitionByExpression(sortOrder, planWithBarrier, numPartitions)
     }
   }
 
@@ -2817,7 +2820,7 @@ class Dataset[T] private[sql](
    * @since 1.6.0
    */
   def coalesce(numPartitions: Int): Dataset[T] = withTypedPlan {
-    Repartition(numPartitions, shuffle = false, logicalPlan)
+    Repartition(numPartitions, shuffle = false, planWithBarrier)
   }
 
   /**
@@ -2900,7 +2903,7 @@ class Dataset[T] private[sql](
 
   // Represents the `QueryExecution` used to produce the content of the 
Dataset as an `RDD`.
   @transient private lazy val rddQueryExecution: QueryExecution = {
-    val deserialized = CatalystSerde.deserialize[T](logicalPlan)
+    val deserialized = CatalystSerde.deserialize[T](planWithBarrier)
     sparkSession.sessionState.executePlan(deserialized)
   }
 
@@ -3026,7 +3029,7 @@ class Dataset[T] private[sql](
       comment = None,
       properties = Map.empty,
       originalText = None,
-      child = logicalPlan,
+      child = planWithBarrier,
       allowExisting = false,
       replace = replace,
       viewType = viewType)
@@ -3226,7 +3229,7 @@ class Dataset[T] private[sql](
       }
     }
     withTypedPlan {
-      Sort(sortOrder, global = global, logicalPlan)
+      Sort(sortOrder, global = global, planWithBarrier)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/00d176d2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 6e08df7..f64e079 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -39,7 +39,7 @@ class ResolveSQLOnFile(sparkSession: SparkSession) extends 
Rule[LogicalPlan] {
     sparkSession.sessionState.conf.runSQLonFile && 
u.tableIdentifier.database.isDefined
   }
 
-  def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
     case u: UnresolvedRelation if maybeSQLFile(u) =>
       try {
         val dataSource = DataSource(

http://git-wip-us.apache.org/repos/asf/spark/blob/00d176d2/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index c25c90d..b50642d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -241,7 +241,7 @@ class PlannerSuite extends SharedSQLContext {
   test("collapse adjacent repartitions") {
     val doubleRepartitioned = 
testData.repartition(10).repartition(20).coalesce(5)
     def countRepartitions(plan: LogicalPlan): Int = plan.collect { case r: 
Repartition => r }.length
-    assert(countRepartitions(doubleRepartitioned.queryExecution.logical) === 3)
+    assert(countRepartitions(doubleRepartitioned.queryExecution.analyzed) === 
3)
     assert(countRepartitions(doubleRepartitioned.queryExecution.optimizedPlan) 
=== 2)
     doubleRepartitioned.queryExecution.optimizedPlan match {
       case Repartition (numPartitions, shuffle, Repartition(_, shuffleChild, 
_)) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/00d176d2/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 3018c06..a7961c7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -87,7 +87,7 @@ class ResolveHiveSerdeTable(session: SparkSession) extends 
Rule[LogicalPlan] {
     }
   }
 
-  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
     case c @ CreateTable(t, _, query) if DDLUtils.isHiveTable(t) =>
       // Finds the database name if the name does not exist.
       val dbName = 
t.identifier.database.getOrElse(session.catalog.currentDatabase)
@@ -114,7 +114,7 @@ class ResolveHiveSerdeTable(session: SparkSession) extends 
Rule[LogicalPlan] {
 }
 
 class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
-  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
     case relation: HiveTableRelation
         if DDLUtils.isHiveTable(relation.tableMeta) && 
relation.tableMeta.stats.isEmpty =>
       val table = relation.tableMeta
@@ -145,7 +145,7 @@ class DetermineTableStats(session: SparkSession) extends 
Rule[LogicalPlan] {
  * `PreprocessTableInsertion`.
  */
 object HiveAnalysis extends Rule[LogicalPlan] {
-  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
     case InsertIntoTable(r: HiveTableRelation, partSpec, query, overwrite, 
ifPartitionNotExists)
         if DDLUtils.isHiveTable(r.tableMeta) =>
       InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite, 
ifPartitionNotExists)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to