Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19467#discussion_r144152923
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
 ---
    @@ -131,17 +132,17 @@ class IncrementalExecution(
       }
     
       override def preparations: Seq[Rule[SparkPlan]] =
    -    Seq(state, EnsureStatefulOpPartitioning) ++ super.preparations
    +    Seq(state, 
EnsureStatefulOpPartitioning(sparkSession.sessionState.conf)) ++ 
super.preparations
     
       /** No need assert supported, as this check has already been done */
       override def assertSupported(): Unit = { }
     }
     
    -object EnsureStatefulOpPartitioning extends Rule[SparkPlan] {
    +case class EnsureStatefulOpPartitioning(conf: SQLConf) extends 
Rule[SparkPlan] {
       // Needs to be transformUp to avoid extra shuffles
       override def apply(plan: SparkPlan): SparkPlan = plan transformUp {
         case so: StatefulOperator =>
    -      val numPartitions = 
plan.sqlContext.sessionState.conf.numShufflePartitions
    +      val numPartitions = conf.numShufflePartitions
    --- End diff --
    
    Why this change? Doesnt the plan have the same context and conf?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to