Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19467#discussion_r144152859 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala --- @@ -131,17 +132,17 @@ class IncrementalExecution( } override def preparations: Seq[Rule[SparkPlan]] = - Seq(state, EnsureStatefulOpPartitioning) ++ super.preparations + Seq(state, EnsureStatefulOpPartitioning(sparkSession.sessionState.conf)) ++ super.preparations /** No need assert supported, as this check has already been done */ override def assertSupported(): Unit = { } } -object EnsureStatefulOpPartitioning extends Rule[SparkPlan] { +case class EnsureStatefulOpPartitioning(conf: SQLConf) extends Rule[SparkPlan] { // Needs to be transformUp to avoid extra shuffles override def apply(plan: SparkPlan): SparkPlan = plan transformUp { case so: StatefulOperator => - val numPartitions = plan.sqlContext.sessionState.conf.numShufflePartitions + val numPartitions = conf.numShufflePartitions --- End diff -- why this change?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org