[GitHub] spark pull request #19467: [SPARK-22238] Fix plan resolution bug caused by E...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19467 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19467: [SPARK-22238] Fix plan resolution bug caused by E...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19467#discussion_r144403807 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala --- @@ -50,7 +50,8 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { numPartitions: Int): Partitioning = { requiredDistribution match { case AllTuples => SinglePartition - case ClusteredDistribution(clustering) => HashPartitioning(clustering, numPartitions) + case ClusteredDistribution(clustering, desiredPartitions) => --- End diff -- Update scala docs saying that numPartitions param is only if the distribution does not specify it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19467: [SPARK-22238] Fix plan resolution bug caused by E...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19467#discussion_r144434463 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StatefulOperatorTest.scala --- @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.streaming._ + +trait StatefulOperatorTest { + /** + * Check that the output partitioning of a child operator of a Stateful operator satisfies the + * distribution that we expect for our Stateful operator. + */ + protected def checkChildOutputPartitioning[T <: StatefulOperator]( --- End diff -- since this is only for HashPartitioning, how about checkChildOutputHashPartitioning --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19467: [SPARK-22238] Fix plan resolution bug caused by E...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19467#discussion_r144434390 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StatefulOperatorTest.scala --- @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.streaming._ + +trait StatefulOperatorTest { + /** + * Check that the output partitioning of a child operator of a Stateful operator satisfies the + * distribution that we expect for our Stateful operator. + */ + protected def checkChildOutputPartitioning[T <: StatefulOperator]( + sq: StreamingQuery, + colNames: Seq[String], + numPartitions: Option[Int] = None): Boolean = { --- End diff -- numPartitions is never used. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19467: [SPARK-22238] Fix plan resolution bug caused by E...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19467#discussion_r144433715 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala --- @@ -214,7 +214,7 @@ class StateStoreRDDSuite extends SparkFunSuite with BeforeAndAfter with BeforeAn path: String, queryRunId: UUID = UUID.randomUUID, version: Int = 0): StatefulOperatorStateInfo = { -StatefulOperatorStateInfo(path, queryRunId, operatorId = 0, version) --- End diff -- super nit: numPartitions = 5 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19467: [SPARK-22238] Fix plan resolution bug caused by E...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19467#discussion_r144402006 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala --- @@ -243,8 +246,9 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) override def satisfies(required: Distribution): Boolean = required match { case UnspecifiedDistribution => true -case ClusteredDistribution(requiredClustering) => - expressions.forall(x => requiredClustering.exists(_.semanticEquals(x))) +case ClusteredDistribution(requiredClustering, desiredPartitions) => + expressions.forall(x => requiredClustering.exists(_.semanticEquals(x))) && +desiredPartitions.forall(_ == numPartitions) // if desiredPartition = true, returns true --- End diff -- // if desiredPartition**s** is **None**, return true --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19467: [SPARK-22238] Fix plan resolution bug caused by E...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19467#discussion_r144408406 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/EnsureStatefulOpPartitioningSuite.scala --- @@ -53,7 +53,7 @@ class EnsureStatefulOpPartitioningSuite extends SparkPlanTest with SharedSQLCont test("ClusteredDistribution with coalesce(1) generates Exchange with HashPartitioning") { testEnsureStatefulOpPartitioning( baseDf.coalesce(1).queryExecution.sparkPlan, - requiredDistribution = keys => ClusteredDistribution(keys), --- End diff -- This test suite does not make sense as this rule does not exist anymore. So if we add tests in the related PlannerSuite to test the new addition in EnsureRequirements and Partitioning, then we will only need to test whether each stateful operator specifies the numPartitions in its required distribution. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19467: [SPARK-22238] Fix plan resolution bug caused by E...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19467#discussion_r144406045 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala --- @@ -43,10 +43,11 @@ case class StatefulOperatorStateInfo( checkpointLocation: String, queryRunId: UUID, operatorId: Long, -storeVersion: Long) { +storeVersion: Long, +numPartitions: Int) { --- End diff -- nice! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19467: [SPARK-22238] Fix plan resolution bug caused by E...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19467#discussion_r144406113 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala --- @@ -239,7 +240,7 @@ case class StateStoreRestoreExec( if (keyExpressions.isEmpty) { AllTuples :: Nil } else { - ClusteredDistribution(keyExpressions) :: Nil + ClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil --- End diff -- need tests for these --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19467: [SPARK-22238] Fix plan resolution bug caused by E...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19467#discussion_r144335223 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala --- @@ -590,10 +590,33 @@ case class CoalesceExec(numPartitions: Int, child: SparkPlan) extends UnaryExecN } protected override def doExecute(): RDD[InternalRow] = { -child.execute().coalesce(numPartitions, shuffle = false) +if (numPartitions == 1 && child.execute().getNumPartitions < 1) { --- End diff -- the existing tests for the original problem should catch it --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19467: [SPARK-22238] Fix plan resolution bug caused by E...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19467#discussion_r144154295 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala --- @@ -590,10 +590,33 @@ case class CoalesceExec(numPartitions: Int, child: SparkPlan) extends UnaryExecN } protected override def doExecute(): RDD[InternalRow] = { -child.execute().coalesce(numPartitions, shuffle = false) +if (numPartitions == 1 && child.execute().getNumPartitions < 1) { --- End diff -- Add a test in DatasetSuite that tests this empty rdd case. maybe in the same test as the existing coalesce test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19467: [SPARK-22238] Fix plan resolution bug caused by E...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19467#discussion_r144152923 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala --- @@ -131,17 +132,17 @@ class IncrementalExecution( } override def preparations: Seq[Rule[SparkPlan]] = -Seq(state, EnsureStatefulOpPartitioning) ++ super.preparations +Seq(state, EnsureStatefulOpPartitioning(sparkSession.sessionState.conf)) ++ super.preparations /** No need assert supported, as this check has already been done */ override def assertSupported(): Unit = { } } -object EnsureStatefulOpPartitioning extends Rule[SparkPlan] { +case class EnsureStatefulOpPartitioning(conf: SQLConf) extends Rule[SparkPlan] { // Needs to be transformUp to avoid extra shuffles override def apply(plan: SparkPlan): SparkPlan = plan transformUp { case so: StatefulOperator => - val numPartitions = plan.sqlContext.sessionState.conf.numShufflePartitions + val numPartitions = conf.numShufflePartitions --- End diff -- Why this change? Doesnt the plan have the same context and conf? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19467: [SPARK-22238] Fix plan resolution bug caused by E...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19467#discussion_r144152859 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala --- @@ -131,17 +132,17 @@ class IncrementalExecution( } override def preparations: Seq[Rule[SparkPlan]] = -Seq(state, EnsureStatefulOpPartitioning) ++ super.preparations +Seq(state, EnsureStatefulOpPartitioning(sparkSession.sessionState.conf)) ++ super.preparations /** No need assert supported, as this check has already been done */ override def assertSupported(): Unit = { } } -object EnsureStatefulOpPartitioning extends Rule[SparkPlan] { +case class EnsureStatefulOpPartitioning(conf: SQLConf) extends Rule[SparkPlan] { // Needs to be transformUp to avoid extra shuffles override def apply(plan: SparkPlan): SparkPlan = plan transformUp { case so: StatefulOperator => - val numPartitions = plan.sqlContext.sessionState.conf.numShufflePartitions + val numPartitions = conf.numShufflePartitions --- End diff -- why this change? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19467: [SPARK-22238] Fix plan resolution bug caused by E...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19467#discussion_r144152254 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala --- @@ -590,10 +590,33 @@ case class CoalesceExec(numPartitions: Int, child: SparkPlan) extends UnaryExecN } protected override def doExecute(): RDD[InternalRow] = { -child.execute().coalesce(numPartitions, shuffle = false) +if (numPartitions == 1 && child.execute().getNumPartitions < 1) { + // Make sure we don't output an RDD with 0 partitions, when claiming that we have a + // `SinglePartition`. + new CoalesceExec.EmptyRDDWithPartitions(sparkContext, numPartitions) +} else { + child.execute().coalesce(numPartitions, shuffle = false) +} } } +object CoalesceExec { + /** A simple RDD with no data, but with the given number of partitions. */ + class EmptyRDDWithPartitions( + @transient private val sc: SparkContext, + numPartitions: Int) extends RDD[InternalRow](sc, Nil) { + +override def getPartitions: Array[Partition] = + Array.tabulate(numPartitions)(i => SimplePartition(i)) + +override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { + Iterator.empty +} + } + + case class SimplePartition(index: Int) extends Partition --- End diff -- nit: EmptyPartition? isnt that more descriptive than "simple" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19467: [SPARK-22238] Fix plan resolution bug caused by E...
GitHub user brkyvz opened a pull request: https://github.com/apache/spark/pull/19467 [SPARK-22238] Fix plan resolution bug caused by EnsureStatefulOpPartitioning ## What changes were proposed in this pull request? In EnsureStatefulOpPartitioning, we check that the inputRDD to a SparkPlan has the expected partitioning for Streaming Stateful Operators. The problem is that we are not allowed to access this information during planning. The reason we added that check was because CoalesceExec could actually create RDDs with 0 partitions. We should fix it such that when CoalesceExec says that there is a SinglePartition, there is in fact an inputRDD of 1 partition instead of 0 partitions. ## How was this patch tested? Regression test in StreamingQuerySuite You can merge this pull request into a Git repository by running: $ git pull https://github.com/brkyvz/spark stateful-op Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19467.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19467 commit 961ade120f7a179751e5ec45b24e159259de0bae Author: Burak YavuzDate: 2017-10-10T22:02:48Z Fix plan resolution bug caused by EnsureStatefulOpPartitioning --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org