Repository: spark
Updated Branches:
  refs/heads/master 52ed9b289 -> 1f5dddffa


Revert "[SPARK-20392][SQL] Set barrier to prevent re-entering a tree"

This reverts commit 8ce0d8ffb68bd9e89c23d3a026308dcc039a1b1d.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1f5dddff
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1f5dddff
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1f5dddff

Branch: refs/heads/master
Commit: 1f5dddffa3f065dff2b0a6b0fe7e463edfa4a5f1
Parents: 52ed9b2
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Tue May 30 21:14:55 2017 -0700
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Tue May 30 21:14:55 2017 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  | 75 ++++++----------
 .../catalyst/analysis/DecimalPrecision.scala    |  2 +-
 .../analysis/ResolveTableValuedFunctions.scala  |  2 +-
 .../sql/catalyst/analysis/TypeCoercion.scala    | 22 ++---
 .../catalyst/analysis/timeZoneAnalysis.scala    |  2 +-
 .../spark/sql/catalyst/analysis/view.scala      |  2 +-
 .../spark/sql/catalyst/optimizer/subquery.scala |  2 +-
 .../catalyst/plans/logical/LogicalPlan.scala    | 35 ++++++++
 .../plans/logical/basicLogicalOperators.scala   |  9 --
 .../sql/catalyst/analysis/AnalysisSuite.scala   | 14 ---
 .../sql/catalyst/plans/LogicalPlanSuite.scala   | 26 +++---
 .../scala/org/apache/spark/sql/Dataset.scala    | 92 ++++++++++----------
 .../sql/execution/datasources/DataSource.scala  |  2 +-
 .../spark/sql/execution/datasources/rules.scala |  2 +-
 .../spark/sql/execution/PlannerSuite.scala      |  2 +-
 .../apache/spark/sql/hive/HiveStrategies.scala  |  6 +-
 16 files changed, 144 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1f5dddff/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 8818404..29183fd 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -166,15 +166,14 @@ class Analyzer(
     Batch("Subquery", Once,
       UpdateOuterReferences),
     Batch("Cleanup", fixedPoint,
-      CleanupAliases,
-      EliminateBarriers)
+      CleanupAliases)
   )
 
   /**
    * Analyze cte definitions and substitute child plan with analyzed cte 
definitions.
    */
   object CTESubstitution extends Rule[LogicalPlan] {
-    def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators  {
       case With(child, relations) =>
         substituteCTE(child, relations.foldLeft(Seq.empty[(String, 
LogicalPlan)]) {
           case (resolved, (name, relation)) =>
@@ -202,7 +201,7 @@ class Analyzer(
    * Substitute child plan with WindowSpecDefinitions.
    */
   object WindowsSubstitution extends Rule[LogicalPlan] {
-    def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
       // Lookup WindowSpecDefinitions. This rule works with unresolved 
children.
       case WithWindowDefinition(windowDefinitions, child) =>
         child.transform {
@@ -244,7 +243,7 @@ class Analyzer(
     private def hasUnresolvedAlias(exprs: Seq[NamedExpression]) =
       exprs.exists(_.find(_.isInstanceOf[UnresolvedAlias]).isDefined)
 
-    def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
       case Aggregate(groups, aggs, child) if child.resolved && 
hasUnresolvedAlias(aggs) =>
         Aggregate(groups, assignAliases(aggs), child)
 
@@ -634,7 +633,7 @@ class Analyzer(
       case _ => plan
     }
 
-    def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
       case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if 
child.resolved =>
         EliminateSubqueryAliases(lookupTableFromCatalog(u)) match {
           case v: View =>
@@ -689,9 +688,7 @@ class Analyzer(
      * Generate a new logical plan for the right child with different 
expression IDs
      * for all conflicting attributes.
      */
-    private def dedupRight (left: LogicalPlan, oriRight: LogicalPlan): 
LogicalPlan = {
-      // Remove analysis barrier if any.
-      val right = EliminateBarriers(oriRight)
+    private def dedupRight (left: LogicalPlan, right: LogicalPlan): 
LogicalPlan = {
       val conflictingAttributes = left.outputSet.intersect(right.outputSet)
       logDebug(s"Conflicting attributes ${conflictingAttributes.mkString(",")} 
" +
         s"between $left and $right")
@@ -734,7 +731,7 @@ class Analyzer(
            * that this rule cannot handle. When that is the case, there must 
be another rule
            * that resolves these conflicts. Otherwise, the analysis will fail.
            */
-          oriRight
+          right
         case Some((oldRelation, newRelation)) =>
           val attributeRewrites = 
AttributeMap(oldRelation.output.zip(newRelation.output))
           val newRight = right transformUp {
@@ -747,7 +744,7 @@ class Analyzer(
                 s.withNewPlan(dedupOuterReferencesInSubquery(s.plan, 
attributeRewrites))
             }
           }
-          AnalysisBarrier(newRight)
+          newRight
       }
     }
 
@@ -808,7 +805,7 @@ class Analyzer(
       }
     }
 
-    def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
       case p: LogicalPlan if !p.childrenResolved => p
 
       // If the projection list contains Stars, expand it.
@@ -982,7 +979,7 @@ class Analyzer(
    * have no effect on the results.
    */
   object ResolveOrdinalInOrderByAndGroupBy extends Rule[LogicalPlan] {
-    def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
       case p if !p.childrenResolved => p
       // Replace the index with the related attribute for ORDER BY,
       // which is a 1-base position of the projection list.
@@ -1038,7 +1035,7 @@ class Analyzer(
       }}
     }
 
-    override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+    override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators 
{
       case agg @ Aggregate(groups, aggs, child)
           if conf.groupByAliases && child.resolved && aggs.forall(_.resolved) 
&&
             groups.exists(!_.resolved) =>
@@ -1062,13 +1059,11 @@ class Analyzer(
    * The HAVING clause could also used a grouping columns that is not 
presented in the SELECT.
    */
   object ResolveMissingReferences extends Rule[LogicalPlan] {
-    def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
       // Skip sort with aggregate. This will be handled in 
ResolveAggregateFunctions
-      case sa @ Sort(_, _, AnalysisBarrier(child: Aggregate)) => sa
       case sa @ Sort(_, _, child: Aggregate) => sa
 
-      case s @ Sort(order, _, orgChild) if !s.resolved && orgChild.resolved =>
-        val child = EliminateBarriers(orgChild)
+      case s @ Sort(order, _, child) if !s.resolved && child.resolved =>
         try {
           val newOrder = order.map(resolveExpressionRecursively(_, 
child).asInstanceOf[SortOrder])
           val requiredAttrs = AttributeSet(newOrder).filter(_.resolved)
@@ -1089,8 +1084,7 @@ class Analyzer(
           case ae: AnalysisException => s
         }
 
-      case f @ Filter(cond, orgChild) if !f.resolved && orgChild.resolved =>
-        val child = EliminateBarriers(orgChild)
+      case f @ Filter(cond, child) if !f.resolved && child.resolved =>
         try {
           val newCond = resolveExpressionRecursively(cond, child)
           val requiredAttrs = newCond.references.filter(_.resolved)
@@ -1117,7 +1111,7 @@ class Analyzer(
      */
     private def addMissingAttr(plan: LogicalPlan, missingAttrs: AttributeSet): 
LogicalPlan = {
       if (missingAttrs.isEmpty) {
-        return AnalysisBarrier(plan)
+        return plan
       }
       plan match {
         case p: Project =>
@@ -1189,7 +1183,7 @@ class Analyzer(
    * Replaces [[UnresolvedFunction]]s with concrete [[Expression]]s.
    */
   object ResolveFunctions extends Rule[LogicalPlan] {
-    def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
       case q: LogicalPlan =>
         q transformExpressions {
           case u if !u.childrenResolved => u // Skip until children are 
resolved.
@@ -1528,7 +1522,7 @@ class Analyzer(
     /**
      * Resolve and rewrite all subqueries in an operator tree..
      */
-    def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
       // In case of HAVING (a filter after an aggregate) we use both the 
aggregate and
       // its child for resolution.
       case f @ Filter(_, a: Aggregate) if f.childrenResolved =>
@@ -1543,7 +1537,7 @@ class Analyzer(
    * Turns projections that contain aggregate expressions into aggregations.
    */
   object GlobalAggregates extends Rule[LogicalPlan] {
-    def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
       case Project(projectList, child) if containsAggregates(projectList) =>
         Aggregate(Nil, projectList, child)
     }
@@ -1569,9 +1563,7 @@ class Analyzer(
    * underlying aggregate operator and then projected away after the original 
operator.
    */
   object ResolveAggregateFunctions extends Rule[LogicalPlan] {
-    def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
-      case filter @ Filter(havingCondition, AnalysisBarrier(aggregate: 
Aggregate)) =>
-        apply(Filter(havingCondition, aggregate)).mapChildren(AnalysisBarrier)
+    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
       case filter @ Filter(havingCondition,
              aggregate @ Aggregate(grouping, originalAggExprs, child))
           if aggregate.resolved =>
@@ -1631,8 +1623,6 @@ class Analyzer(
           case ae: AnalysisException => filter
         }
 
-      case sort @ Sort(sortOrder, global, AnalysisBarrier(aggregate: 
Aggregate)) =>
-        apply(Sort(sortOrder, global, aggregate)).mapChildren(AnalysisBarrier)
       case sort @ Sort(sortOrder, global, aggregate: Aggregate) if 
aggregate.resolved =>
 
         // Try resolving the ordering as though it is in the aggregate clause.
@@ -1745,7 +1735,7 @@ class Analyzer(
       }
     }
 
-    def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
       case Project(projectList, _) if projectList.exists(hasNestedGenerator) =>
         val nestedGenerator = projectList.find(hasNestedGenerator).get
         throw new AnalysisException("Generators are not supported when it's 
nested in " +
@@ -1803,7 +1793,7 @@ class Analyzer(
    * that wrap the [[Generator]].
    */
   object ResolveGenerate extends Rule[LogicalPlan] {
-    def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
       case g: Generate if !g.child.resolved || !g.generator.resolved => g
       case g: Generate if !g.resolved =>
         g.copy(generatorOutput = makeGeneratorOutput(g.generator, 
g.generatorOutput.map(_.name)))
@@ -2120,7 +2110,7 @@ class Analyzer(
    * put them into an inner Project and finally project them away at the outer 
Project.
    */
   object PullOutNondeterministic extends Rule[LogicalPlan] {
-    override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+    override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators 
{
       case p if !p.resolved => p // Skip unresolved nodes.
       case p: Project => p
       case f: Filter => f
@@ -2165,7 +2155,7 @@ class Analyzer(
    * and we should return null if the input is null.
    */
   object HandleNullInputsForUDF extends Rule[LogicalPlan] {
-    override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+    override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators 
{
       case p if !p.resolved => p // Skip unresolved nodes.
 
       case p => p transformExpressionsUp {
@@ -2230,7 +2220,7 @@ class Analyzer(
    * Then apply a Project on a normal Join to eliminate natural or using join.
    */
   object ResolveNaturalAndUsingJoin extends Rule[LogicalPlan] {
-    override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+    override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators 
{
       case j @ Join(left, right, UsingJoin(joinType, usingCols), condition)
           if left.resolved && right.resolved && j.duplicateResolved =>
         commonNaturalJoinProcessing(left, right, joinType, usingCols, None)
@@ -2295,7 +2285,7 @@ class Analyzer(
    * to the given input attributes.
    */
   object ResolveDeserializer extends Rule[LogicalPlan] {
-    def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
       case p if !p.childrenResolved => p
       case p if p.resolved => p
 
@@ -2381,7 +2371,7 @@ class Analyzer(
    * constructed is an inner class.
    */
   object ResolveNewInstance extends Rule[LogicalPlan] {
-    def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
       case p if !p.childrenResolved => p
       case p if p.resolved => p
 
@@ -2415,7 +2405,7 @@ class Analyzer(
         "type of the field in the target object")
     }
 
-    def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
       case p if !p.childrenResolved => p
       case p if p.resolved => p
 
@@ -2469,7 +2459,7 @@ object CleanupAliases extends Rule[LogicalPlan] {
     case other => trimAliases(other)
   }
 
-  override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
     case Project(projectList, child) =>
       val cleanedProjectList =
         
projectList.map(trimNonTopLevelAliases(_).asInstanceOf[NamedExpression])
@@ -2498,13 +2488,6 @@ object CleanupAliases extends Rule[LogicalPlan] {
   }
 }
 
-/** Remove the barrier nodes of analysis */
-object EliminateBarriers extends Rule[LogicalPlan] {
-  override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
-    case AnalysisBarrier(child) => child
-  }
-}
-
 /**
  * Ignore event time watermark in batch query, which is only supported in 
Structured Streaming.
  * TODO: add this rule into analyzer rule list.
@@ -2554,7 +2537,7 @@ object TimeWindowing extends Rule[LogicalPlan] {
    * @return the logical plan that will generate the time windows using the 
Expand operator, with
    *         the Filter operator for correctness and Project for usability.
    */
-  def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+  def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
     case p: LogicalPlan if p.children.size == 1 =>
       val child = p.children.head
       val windowExpressions =

http://git-wip-us.apache.org/repos/asf/spark/blob/1f5dddff/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecision.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecision.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecision.scala
index ac72bc4..9c38dd2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecision.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecision.scala
@@ -78,7 +78,7 @@ object DecimalPrecision extends Rule[LogicalPlan] {
     PromotePrecision(Cast(e, dataType))
   }
 
-  def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+  def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
     // fix decimal precision for expressions
     case q => q.transformExpressions(
       
decimalAndDecimal.orElse(integralAndDecimalLiteral).orElse(nondecimalAndDecimal))

http://git-wip-us.apache.org/repos/asf/spark/blob/1f5dddff/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala
index a214e59..7358f9e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala
@@ -103,7 +103,7 @@ object ResolveTableValuedFunctions extends 
Rule[LogicalPlan] {
       })
   )
 
-  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
     case u: UnresolvedTableValuedFunction if u.functionArgs.forall(_.resolved) 
=>
       val resolvedFunc = 
builtinFunctions.get(u.functionName.toLowerCase(Locale.ROOT)) match {
         case Some(tvf) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/1f5dddff/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
index c364517..e1dd010 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
@@ -206,7 +206,7 @@ object TypeCoercion {
    * instances higher in the query tree.
    */
   object PropagateTypes extends Rule[LogicalPlan] {
-    def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+    def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
 
       // No propagation required for leaf nodes.
       case q: LogicalPlan if q.children.isEmpty => q
@@ -261,7 +261,7 @@ object TypeCoercion {
    */
   object WidenSetOperationTypes extends Rule[LogicalPlan] {
 
-    def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+    def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
       case p if p.analyzed => p
 
       case s @ SetOperation(left, right) if s.childrenResolved &&
@@ -335,7 +335,7 @@ object TypeCoercion {
       }
     }
 
-    def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
+    def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
       // Skip nodes who's children have not been resolved yet.
       case e if !e.childrenResolved => e
 
@@ -391,7 +391,7 @@ object TypeCoercion {
       }
     }
 
-    def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
+    def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
       // Skip nodes who's children have not been resolved yet.
       case e if !e.childrenResolved => e
 
@@ -449,7 +449,7 @@ object TypeCoercion {
     private val trueValues = Seq(1.toByte, 1.toShort, 1, 1L, Decimal.ONE)
     private val falseValues = Seq(0.toByte, 0.toShort, 0, 0L, Decimal.ZERO)
 
-    def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
+    def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
       // Skip nodes who's children have not been resolved yet.
       case e if !e.childrenResolved => e
 
@@ -490,7 +490,7 @@ object TypeCoercion {
    * This ensure that the types for various functions are as expected.
    */
   object FunctionArgumentConversion extends Rule[LogicalPlan] {
-    def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
+    def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
       // Skip nodes who's children have not been resolved yet.
       case e if !e.childrenResolved => e
 
@@ -580,7 +580,7 @@ object TypeCoercion {
    * converted to fractional types.
    */
   object Division extends Rule[LogicalPlan] {
-    def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
+    def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
       // Skip nodes who has not been resolved yet,
       // as this is an extra rule which should be applied at last.
       case e if !e.childrenResolved => e
@@ -602,7 +602,7 @@ object TypeCoercion {
    * Coerces the type of different branches of a CASE WHEN statement to a 
common type.
    */
   object CaseWhenCoercion extends Rule[LogicalPlan] {
-    def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
+    def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
       case c: CaseWhen if c.childrenResolved && !c.valueTypesEqual =>
         val maybeCommonType = findWiderCommonType(c.valueTypes)
         maybeCommonType.map { commonType =>
@@ -632,7 +632,7 @@ object TypeCoercion {
    * Coerces the type of different branches of If statement to a common type.
    */
   object IfCoercion extends Rule[LogicalPlan] {
-    def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
+    def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
       case e if !e.childrenResolved => e
       // Find tightest common type for If, if the true value and false value 
have different types.
       case i @ If(pred, left, right) if left.dataType != right.dataType =>
@@ -656,7 +656,7 @@ object TypeCoercion {
 
     private val acceptedTypes = Seq(DateType, TimestampType, StringType)
 
-    def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
+    def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
       // Skip nodes who's children have not been resolved yet.
       case e if !e.childrenResolved => e
 
@@ -673,7 +673,7 @@ object TypeCoercion {
    * Casts types according to the expected input types for [[Expression]]s.
    */
   object ImplicitTypeCasts extends Rule[LogicalPlan] {
-    def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
+    def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
       // Skip nodes who's children have not been resolved yet.
       case e if !e.childrenResolved => e
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1f5dddff/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/timeZoneAnalysis.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/timeZoneAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/timeZoneAnalysis.scala
index af1f916..a27aa84 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/timeZoneAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/timeZoneAnalysis.scala
@@ -38,7 +38,7 @@ case class ResolveTimeZone(conf: SQLConf) extends 
Rule[LogicalPlan] {
   }
 
   override def apply(plan: LogicalPlan): LogicalPlan =
-    plan.transformAllExpressions(transformTimeZoneExprs)
+    plan.resolveExpressions(transformTimeZoneExprs)
 
   def resolveTimeZones(e: Expression): Expression = 
e.transform(transformTimeZoneExprs)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1f5dddff/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
index 3bbe41c..ea46dd7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
@@ -48,7 +48,7 @@ import org.apache.spark.sql.internal.SQLConf
  * completely resolved during the batch of Resolution.
  */
 case class AliasViewChild(conf: SQLConf) extends Rule[LogicalPlan] with 
CastSupport {
-  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
     case v @ View(desc, output, child) if child.resolved && output != 
child.output =>
       val resolver = conf.resolver
       val queryColumnNames = desc.viewQueryColumnNames

http://git-wip-us.apache.org/repos/asf/spark/blob/1f5dddff/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
index 46d1aac..2a3e07a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
@@ -236,7 +236,7 @@ object PullupCorrelatedPredicates extends Rule[LogicalPlan] 
with PredicateHelper
   /**
    * Pull up the correlated predicates and rewrite all subqueries in an 
operator tree..
    */
-  def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+  def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
     case f @ Filter(_, a: Aggregate) =>
       rewriteSubQueries(f, Seq(a, a.child))
     // Only a few unary nodes (Project/Filter/Aggregate) can contain 
subqueries.

http://git-wip-us.apache.org/repos/asf/spark/blob/1f5dddff/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 23520eb..2ebb2ff 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -46,6 +46,41 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] 
with Logging {
   /** Returns true if this subtree contains any streaming data sources. */
   def isStreaming: Boolean = children.exists(_.isStreaming == true)
 
+  /**
+   * Returns a copy of this node where `rule` has been recursively applied 
first to all of its
+   * children and then itself (post-order). When `rule` does not apply to a 
given node, it is left
+   * unchanged.  This function is similar to `transformUp`, but skips 
sub-trees that have already
+   * been marked as analyzed.
+   *
+   * @param rule the function use to transform this nodes children
+   */
+  def resolveOperators(rule: PartialFunction[LogicalPlan, LogicalPlan]): 
LogicalPlan = {
+    if (!analyzed) {
+      val afterRuleOnChildren = mapChildren(_.resolveOperators(rule))
+      if (this fastEquals afterRuleOnChildren) {
+        CurrentOrigin.withOrigin(origin) {
+          rule.applyOrElse(this, identity[LogicalPlan])
+        }
+      } else {
+        CurrentOrigin.withOrigin(origin) {
+          rule.applyOrElse(afterRuleOnChildren, identity[LogicalPlan])
+        }
+      }
+    } else {
+      this
+    }
+  }
+
+  /**
+   * Recursively transforms the expressions of a tree, skipping nodes that 
have already
+   * been analyzed.
+   */
+  def resolveExpressions(r: PartialFunction[Expression, Expression]): 
LogicalPlan = {
+    this resolveOperators  {
+      case p => p.transformExpressions(r)
+    }
+  }
+
   /** A cache for the estimated statistics, such that it will only be computed 
once. */
   private var statsCache: Option[Statistics] = None
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1f5dddff/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index b8c2f76..6878b6b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -23,7 +23,6 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical.statsEstimation._
-import org.apache.spark.sql.catalyst.trees.CurrentOrigin
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
@@ -897,11 +896,3 @@ case class Deduplicate(
 
   override def output: Seq[Attribute] = child.output
 }
-
-/** A logical plan for setting a barrier of analysis */
-case class AnalysisBarrier(child: LogicalPlan) extends LeafNode {
-  override def output: Seq[Attribute] = child.output
-  override def analyzed: Boolean = true
-  override def isStreaming: Boolean = child.isStreaming
-  override lazy val canonicalized: LogicalPlan = child.canonicalized
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/1f5dddff/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 5393786..be26b1b 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -441,20 +441,6 @@ class AnalysisSuite extends AnalysisTest with 
ShouldMatchers {
     checkAnalysis(SubqueryAlias("tbl", testRelation).as("tbl2"), testRelation)
   }
 
-  test("analysis barrier") {
-    // [[AnalysisBarrier]] will be removed after analysis
-    checkAnalysis(
-      Project(Seq(UnresolvedAttribute("tbl.a")),
-        AnalysisBarrier(SubqueryAlias("tbl", testRelation))),
-      Project(testRelation.output, SubqueryAlias("tbl", testRelation)))
-
-    // Verify we won't go through a plan wrapped in a barrier.
-    // Since we wrap an unresolved plan and analyzer won't go through it. It 
remains unresolved.
-    val barrier = AnalysisBarrier(Project(Seq(UnresolvedAttribute("tbl.b")),
-      SubqueryAlias("tbl", testRelation)))
-    assertAnalysisError(barrier, Seq("cannot resolve '`tbl.b`'"))
-  }
-
   test("SPARK-20311 range(N) as alias") {
     def rangeWithAliases(args: Seq[Int], outputNames: Seq[String]): 
LogicalPlan = {
       SubqueryAlias("t", UnresolvedTableValuedFunction("range", 
args.map(Literal(_)), outputNames))

http://git-wip-us.apache.org/repos/asf/spark/blob/1f5dddff/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala
index 215db84..cc86f1f 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala
@@ -23,8 +23,8 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.types.IntegerType
 
 /**
- * This suite is used to test [[LogicalPlan]]'s `transformUp` plus analysis 
barrier and make sure
- * it can correctly skip sub-trees that have already been marked as analyzed.
+ * This suite is used to test [[LogicalPlan]]'s `resolveOperators` and make 
sure it can correctly
+ * skips sub-trees that have already been marked as analyzed.
  */
 class LogicalPlanSuite extends SparkFunSuite {
   private var invocationCount = 0
@@ -36,35 +36,37 @@ class LogicalPlanSuite extends SparkFunSuite {
 
   private val testRelation = LocalRelation()
 
-  test("transformUp runs on operators") {
+  test("resolveOperator runs on operators") {
     invocationCount = 0
     val plan = Project(Nil, testRelation)
-    plan transformUp function
+    plan resolveOperators function
 
     assert(invocationCount === 1)
   }
 
-  test("transformUp runs on operators recursively") {
+  test("resolveOperator runs on operators recursively") {
     invocationCount = 0
     val plan = Project(Nil, Project(Nil, testRelation))
-    plan transformUp function
+    plan resolveOperators function
 
     assert(invocationCount === 2)
   }
 
-  test("transformUp skips all ready resolved plans wrapped in analysis 
barrier") {
+  test("resolveOperator skips all ready resolved plans") {
     invocationCount = 0
-    val plan = AnalysisBarrier(Project(Nil, Project(Nil, testRelation)))
-    plan transformUp function
+    val plan = Project(Nil, Project(Nil, testRelation))
+    plan.foreach(_.setAnalyzed())
+    plan resolveOperators function
 
     assert(invocationCount === 0)
   }
 
-  test("transformUp skips partially resolved plans wrapped in analysis 
barrier") {
+  test("resolveOperator skips partially resolved plans") {
     invocationCount = 0
-    val plan1 = AnalysisBarrier(Project(Nil, testRelation))
+    val plan1 = Project(Nil, testRelation)
     val plan2 = Project(Nil, plan1)
-    plan2 transformUp function
+    plan1.foreach(_.setAnalyzed())
+    plan2 resolveOperators function
 
     assert(invocationCount === 1)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/1f5dddff/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index d5b4c82..5ffe32f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -187,9 +187,6 @@ class Dataset[T] private[sql](
     }
   }
 
-  // Wraps analyzed logical plans with an analysis barrier so we won't 
traverse/resolve it again.
-  @transient private val planWithBarrier = AnalysisBarrier(logicalPlan)
-
   /**
    * Currently [[ExpressionEncoder]] is the only implementation of 
[[Encoder]], here we turn the
    * passed in encoder to [[ExpressionEncoder]] explicitly, and mark it 
implicit so that we can use
@@ -421,7 +418,7 @@ class Dataset[T] private[sql](
    */
   @Experimental
   @InterfaceStability.Evolving
-  def as[U : Encoder]: Dataset[U] = Dataset[U](sparkSession, planWithBarrier)
+  def as[U : Encoder]: Dataset[U] = Dataset[U](sparkSession, logicalPlan)
 
   /**
    * Converts this strongly typed collection of data to generic `DataFrame` 
with columns renamed.
@@ -624,7 +621,7 @@ class Dataset[T] private[sql](
     require(parsedDelay.milliseconds >= 0 && parsedDelay.months >= 0,
       s"delay threshold ($delayThreshold) should not be negative.")
     EliminateEventTimeWatermark(
-      EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, 
planWithBarrier))
+      EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, 
logicalPlan))
   }
 
   /**
@@ -810,7 +807,7 @@ class Dataset[T] private[sql](
    * @since 2.0.0
    */
   def join(right: Dataset[_]): DataFrame = withPlan {
-    Join(planWithBarrier, right.planWithBarrier, joinType = Inner, None)
+    Join(logicalPlan, right.logicalPlan, joinType = Inner, None)
   }
 
   /**
@@ -888,7 +885,7 @@ class Dataset[T] private[sql](
     // Analyze the self join. The assumption is that the analyzer will 
disambiguate left vs right
     // by creating a new instance for one of the branch.
     val joined = sparkSession.sessionState.executePlan(
-      Join(planWithBarrier, right.planWithBarrier, joinType = 
JoinType(joinType), None))
+      Join(logicalPlan, right.logicalPlan, joinType = JoinType(joinType), 
None))
       .analyzed.asInstanceOf[Join]
 
     withPlan {
@@ -949,7 +946,7 @@ class Dataset[T] private[sql](
     // Trigger analysis so in the case of self-join, the analyzer will clone 
the plan.
     // After the cloning, left and right side will have distinct expression 
ids.
     val plan = withPlan(
-      Join(planWithBarrier, right.planWithBarrier, JoinType(joinType), 
Some(joinExprs.expr)))
+      Join(logicalPlan, right.logicalPlan, JoinType(joinType), 
Some(joinExprs.expr)))
       .queryExecution.analyzed.asInstanceOf[Join]
 
     // If auto self join alias is disabled, return the plan.
@@ -958,8 +955,8 @@ class Dataset[T] private[sql](
     }
 
     // If left/right have no output set intersection, return the plan.
-    val lanalyzed = withPlan(this.planWithBarrier).queryExecution.analyzed
-    val ranalyzed = withPlan(right.planWithBarrier).queryExecution.analyzed
+    val lanalyzed = withPlan(this.logicalPlan).queryExecution.analyzed
+    val ranalyzed = withPlan(right.logicalPlan).queryExecution.analyzed
     if (lanalyzed.outputSet.intersect(ranalyzed.outputSet).isEmpty) {
       return withPlan(plan)
     }
@@ -991,7 +988,7 @@ class Dataset[T] private[sql](
    * @since 2.1.0
    */
   def crossJoin(right: Dataset[_]): DataFrame = withPlan {
-    Join(planWithBarrier, right.planWithBarrier, joinType = Cross, None)
+    Join(logicalPlan, right.logicalPlan, joinType = Cross, None)
   }
 
   /**
@@ -1023,8 +1020,8 @@ class Dataset[T] private[sql](
     // etc.
     val joined = sparkSession.sessionState.executePlan(
       Join(
-        this.planWithBarrier,
-        other.planWithBarrier,
+        this.logicalPlan,
+        other.logicalPlan,
         JoinType(joinType),
         Some(condition.expr))).analyzed.asInstanceOf[Join]
 
@@ -1194,7 +1191,7 @@ class Dataset[T] private[sql](
    */
   @scala.annotation.varargs
   def hint(name: String, parameters: String*): Dataset[T] = withTypedPlan {
-    UnresolvedHint(name, parameters, planWithBarrier)
+    UnresolvedHint(name, parameters, logicalPlan)
   }
 
   /**
@@ -1220,7 +1217,7 @@ class Dataset[T] private[sql](
    * @since 1.6.0
    */
   def as(alias: String): Dataset[T] = withTypedPlan {
-    SubqueryAlias(alias, planWithBarrier)
+    SubqueryAlias(alias, logicalPlan)
   }
 
   /**
@@ -1258,7 +1255,7 @@ class Dataset[T] private[sql](
    */
   @scala.annotation.varargs
   def select(cols: Column*): DataFrame = withPlan {
-    Project(cols.map(_.named), planWithBarrier)
+    Project(cols.map(_.named), logicalPlan)
   }
 
   /**
@@ -1313,8 +1310,8 @@ class Dataset[T] private[sql](
   @InterfaceStability.Evolving
   def select[U1](c1: TypedColumn[T, U1]): Dataset[U1] = {
     implicit val encoder = c1.encoder
-    val project = Project(c1.withInputType(exprEnc, 
planWithBarrier.output).named :: Nil,
-      planWithBarrier)
+    val project = Project(c1.withInputType(exprEnc, logicalPlan.output).named 
:: Nil,
+      logicalPlan)
 
     if (encoder.flat) {
       new Dataset[U1](sparkSession, project, encoder)
@@ -1332,8 +1329,8 @@ class Dataset[T] private[sql](
   protected def selectUntyped(columns: TypedColumn[_, _]*): Dataset[_] = {
     val encoders = columns.map(_.encoder)
     val namedColumns =
-      columns.map(_.withInputType(exprEnc, planWithBarrier.output).named)
-    val execution = new QueryExecution(sparkSession, Project(namedColumns, 
planWithBarrier))
+      columns.map(_.withInputType(exprEnc, logicalPlan.output).named)
+    val execution = new QueryExecution(sparkSession, Project(namedColumns, 
logicalPlan))
     new Dataset(sparkSession, execution, ExpressionEncoder.tuple(encoders))
   }
 
@@ -1409,7 +1406,7 @@ class Dataset[T] private[sql](
    * @since 1.6.0
    */
   def filter(condition: Column): Dataset[T] = withTypedPlan {
-    Filter(condition.expr, planWithBarrier)
+    Filter(condition.expr, logicalPlan)
   }
 
   /**
@@ -1586,7 +1583,7 @@ class Dataset[T] private[sql](
   @Experimental
   @InterfaceStability.Evolving
   def groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K, T] = {
-    val inputPlan = planWithBarrier
+    val inputPlan = logicalPlan
     val withGroupingKey = AppendColumns(func, inputPlan)
     val executed = sparkSession.sessionState.executePlan(withGroupingKey)
 
@@ -1732,7 +1729,7 @@ class Dataset[T] private[sql](
    * @since 2.0.0
    */
   def limit(n: Int): Dataset[T] = withTypedPlan {
-    Limit(Literal(n), planWithBarrier)
+    Limit(Literal(n), logicalPlan)
   }
 
   /**
@@ -1761,7 +1758,7 @@ class Dataset[T] private[sql](
   def union(other: Dataset[T]): Dataset[T] = withSetOperator {
     // This breaks caching, but it's usually ok because it addresses a very 
specific use case:
     // using union to union many files or partitions.
-    CombineUnions(Union(logicalPlan, 
other.logicalPlan)).mapChildren(AnalysisBarrier)
+    CombineUnions(Union(logicalPlan, other.logicalPlan))
   }
 
   /**
@@ -1775,7 +1772,7 @@ class Dataset[T] private[sql](
    * @since 1.6.0
    */
   def intersect(other: Dataset[T]): Dataset[T] = withSetOperator {
-    Intersect(planWithBarrier, other.planWithBarrier)
+    Intersect(logicalPlan, other.logicalPlan)
   }
 
   /**
@@ -1789,7 +1786,7 @@ class Dataset[T] private[sql](
    * @since 2.0.0
    */
   def except(other: Dataset[T]): Dataset[T] = withSetOperator {
-    Except(planWithBarrier, other.planWithBarrier)
+    Except(logicalPlan, other.logicalPlan)
   }
 
   /**
@@ -1810,7 +1807,7 @@ class Dataset[T] private[sql](
       s"Fraction must be nonnegative, but got ${fraction}")
 
     withTypedPlan {
-      Sample(0.0, fraction, withReplacement, seed, planWithBarrier)()
+      Sample(0.0, fraction, withReplacement, seed, logicalPlan)()
     }
   }
 
@@ -1852,15 +1849,15 @@ class Dataset[T] private[sql](
     // overlapping splits. To prevent this, we explicitly sort each input 
partition to make the
     // ordering deterministic. Note that MapTypes cannot be sorted and are 
explicitly pruned out
     // from the sort order.
-    val sortOrder = planWithBarrier.output
+    val sortOrder = logicalPlan.output
       .filter(attr => RowOrdering.isOrderable(attr.dataType))
       .map(SortOrder(_, Ascending))
     val plan = if (sortOrder.nonEmpty) {
-      Sort(sortOrder, global = false, planWithBarrier)
+      Sort(sortOrder, global = false, logicalPlan)
     } else {
       // SPARK-12662: If sort order is empty, we materialize the dataset to 
guarantee determinism
       cache()
-      planWithBarrier
+      logicalPlan
     }
     val sum = weights.sum
     val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
@@ -1944,7 +1941,7 @@ class Dataset[T] private[sql](
 
     withPlan {
       Generate(generator, join = true, outer = false,
-        qualifier = None, generatorOutput = Nil, planWithBarrier)
+        qualifier = None, generatorOutput = Nil, logicalPlan)
     }
   }
 
@@ -1985,7 +1982,7 @@ class Dataset[T] private[sql](
 
     withPlan {
       Generate(generator, join = true, outer = false,
-        qualifier = None, generatorOutput = Nil, planWithBarrier)
+        qualifier = None, generatorOutput = Nil, logicalPlan)
     }
   }
 
@@ -2100,7 +2097,7 @@ class Dataset[T] private[sql](
           u.name, sparkSession.sessionState.analyzer.resolver).getOrElse(u)
       case Column(expr: Expression) => expr
     }
-    val attrs = this.planWithBarrier.output
+    val attrs = this.logicalPlan.output
     val colsAfterDrop = attrs.filter { attr =>
       attr != expression
     }.map(attr => Column(attr))
@@ -2148,7 +2145,7 @@ class Dataset[T] private[sql](
       }
       cols
     }
-    Deduplicate(groupCols, planWithBarrier, isStreaming)
+    Deduplicate(groupCols, logicalPlan, isStreaming)
   }
 
   /**
@@ -2297,7 +2294,7 @@ class Dataset[T] private[sql](
   @Experimental
   @InterfaceStability.Evolving
   def filter(func: T => Boolean): Dataset[T] = {
-    withTypedPlan(TypedFilter(func, planWithBarrier))
+    withTypedPlan(TypedFilter(func, logicalPlan))
   }
 
   /**
@@ -2311,7 +2308,7 @@ class Dataset[T] private[sql](
   @Experimental
   @InterfaceStability.Evolving
   def filter(func: FilterFunction[T]): Dataset[T] = {
-    withTypedPlan(TypedFilter(func, planWithBarrier))
+    withTypedPlan(TypedFilter(func, logicalPlan))
   }
 
   /**
@@ -2325,7 +2322,7 @@ class Dataset[T] private[sql](
   @Experimental
   @InterfaceStability.Evolving
   def map[U : Encoder](func: T => U): Dataset[U] = withTypedPlan {
-    MapElements[T, U](func, planWithBarrier)
+    MapElements[T, U](func, logicalPlan)
   }
 
   /**
@@ -2340,7 +2337,7 @@ class Dataset[T] private[sql](
   @InterfaceStability.Evolving
   def map[U](func: MapFunction[T, U], encoder: Encoder[U]): Dataset[U] = {
     implicit val uEnc = encoder
-    withTypedPlan(MapElements[T, U](func, planWithBarrier))
+    withTypedPlan(MapElements[T, U](func, logicalPlan))
   }
 
   /**
@@ -2356,7 +2353,7 @@ class Dataset[T] private[sql](
   def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] 
= {
     new Dataset[U](
       sparkSession,
-      MapPartitions[T, U](func, planWithBarrier),
+      MapPartitions[T, U](func, logicalPlan),
       implicitly[Encoder[U]])
   }
 
@@ -2387,7 +2384,7 @@ class Dataset[T] private[sql](
     val rowEncoder = encoder.asInstanceOf[ExpressionEncoder[Row]]
     Dataset.ofRows(
       sparkSession,
-      MapPartitionsInR(func, packageNames, broadcastVars, schema, rowEncoder, 
planWithBarrier))
+      MapPartitionsInR(func, packageNames, broadcastVars, schema, rowEncoder, 
logicalPlan))
   }
 
   /**
@@ -2557,7 +2554,7 @@ class Dataset[T] private[sql](
    * @since 1.6.0
    */
   def repartition(numPartitions: Int): Dataset[T] = withTypedPlan {
-    Repartition(numPartitions, shuffle = true, planWithBarrier)
+    Repartition(numPartitions, shuffle = true, logicalPlan)
   }
 
   /**
@@ -2571,7 +2568,7 @@ class Dataset[T] private[sql](
    */
   @scala.annotation.varargs
   def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] = 
withTypedPlan {
-    RepartitionByExpression(partitionExprs.map(_.expr), planWithBarrier, 
numPartitions)
+    RepartitionByExpression(partitionExprs.map(_.expr), logicalPlan, 
numPartitions)
   }
 
   /**
@@ -2587,8 +2584,7 @@ class Dataset[T] private[sql](
   @scala.annotation.varargs
   def repartition(partitionExprs: Column*): Dataset[T] = withTypedPlan {
     RepartitionByExpression(
-      partitionExprs.map(_.expr), planWithBarrier,
-      sparkSession.sessionState.conf.numShufflePartitions)
+      partitionExprs.map(_.expr), logicalPlan, 
sparkSession.sessionState.conf.numShufflePartitions)
   }
 
   /**
@@ -2609,7 +2605,7 @@ class Dataset[T] private[sql](
    * @since 1.6.0
    */
   def coalesce(numPartitions: Int): Dataset[T] = withTypedPlan {
-    Repartition(numPartitions, shuffle = false, planWithBarrier)
+    Repartition(numPartitions, shuffle = false, logicalPlan)
   }
 
   /**
@@ -2698,7 +2694,7 @@ class Dataset[T] private[sql](
    */
   lazy val rdd: RDD[T] = {
     val objectType = exprEnc.deserializer.dataType
-    val deserialized = CatalystSerde.deserialize[T](planWithBarrier)
+    val deserialized = CatalystSerde.deserialize[T](logicalPlan)
     sparkSession.sessionState.executePlan(deserialized).toRdd.mapPartitions { 
rows =>
       rows.map(_.get(0, objectType).asInstanceOf[T])
     }
@@ -2812,7 +2808,7 @@ class Dataset[T] private[sql](
       comment = None,
       properties = Map.empty,
       originalText = None,
-      child = planWithBarrier,
+      child = logicalPlan,
       allowExisting = false,
       replace = replace,
       viewType = viewType)
@@ -2981,7 +2977,7 @@ class Dataset[T] private[sql](
       }
     }
     withTypedPlan {
-      Sort(sortOrder, global = global, planWithBarrier)
+      Sort(sortOrder, global = global, logicalPlan)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1f5dddff/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 958715e..08c78e6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -416,7 +416,7 @@ case class DataSource(
       }.head
     }
     // For partitioned relation r, r.schema's column ordering can be different 
from the column
-    // ordering of data.logicalPlan (partition columns are all moved after 
data column). This
+    // ordering of data.logicalPlan (partition columns are all moved after 
data column).  This
     // will be adjusted within InsertIntoHadoopFsRelation.
     InsertIntoHadoopFsRelationCommand(
       outputPath = outputPath,

http://git-wip-us.apache.org/repos/asf/spark/blob/1f5dddff/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 5f65898..3f4a785 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -38,7 +38,7 @@ class ResolveSQLOnFile(sparkSession: SparkSession) extends 
Rule[LogicalPlan] {
     sparkSession.sessionState.conf.runSQLonFile && 
u.tableIdentifier.database.isDefined
   }
 
-  def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+  def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
     case u: UnresolvedRelation if maybeSQLFile(u) =>
       try {
         val dataSource = DataSource(

http://git-wip-us.apache.org/repos/asf/spark/blob/1f5dddff/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index d02c8ff..4d155d5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -241,7 +241,7 @@ class PlannerSuite extends SharedSQLContext {
   test("collapse adjacent repartitions") {
     val doubleRepartitioned = 
testData.repartition(10).repartition(20).coalesce(5)
     def countRepartitions(plan: LogicalPlan): Int = plan.collect { case r: 
Repartition => r }.length
-    assert(countRepartitions(doubleRepartitioned.queryExecution.analyzed) === 
3)
+    assert(countRepartitions(doubleRepartitioned.queryExecution.logical) === 3)
     assert(countRepartitions(doubleRepartitioned.queryExecution.optimizedPlan) 
=== 2)
     doubleRepartitioned.queryExecution.optimizedPlan match {
       case Repartition (numPartitions, shuffle, Repartition(_, shuffleChild, 
_)) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/1f5dddff/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 662fc80..9c60d22 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -88,7 +88,7 @@ class ResolveHiveSerdeTable(session: SparkSession) extends 
Rule[LogicalPlan] {
     }
   }
 
-  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
     case c @ CreateTable(t, _, query) if DDLUtils.isHiveTable(t) =>
       // Finds the database name if the name does not exist.
       val dbName = 
t.identifier.database.getOrElse(session.catalog.currentDatabase)
@@ -115,7 +115,7 @@ class ResolveHiveSerdeTable(session: SparkSession) extends 
Rule[LogicalPlan] {
 }
 
 class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
-  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
     case relation: CatalogRelation
         if DDLUtils.isHiveTable(relation.tableMeta) && 
relation.tableMeta.stats.isEmpty =>
       val table = relation.tableMeta
@@ -146,7 +146,7 @@ class DetermineTableStats(session: SparkSession) extends 
Rule[LogicalPlan] {
  * `PreprocessTableInsertion`.
  */
 object HiveAnalysis extends Rule[LogicalPlan] {
-  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
     case InsertIntoTable(r: CatalogRelation, partSpec, query, overwrite, 
ifPartitionNotExists)
         if DDLUtils.isHiveTable(r.tableMeta) =>
       InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite, 
ifPartitionNotExists)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to