This is an automated email from the ASF dual-hosted git repository. zsxwing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 481e521 [SPARK-30657][SPARK-30658][SS] Fixed two bugs in streaming limits 481e521 is described below commit 481e5211d237173ea0fb7c0b292eb7abd2b8a3fe Author: Tathagata Das <tathagata.das1...@gmail.com> AuthorDate: Fri Jan 31 09:26:03 2020 -0800 [SPARK-30657][SPARK-30658][SS] Fixed two bugs in streaming limits This PR solves two bugs related to streaming limits **Bug 1 (SPARK-30658)**: Limit before a streaming aggregate (i.e. `df.limit(5).groupBy().count()`) in complete mode was not being planned as a stateful streaming limit. The planner rule planned a logical limit with a stateful streaming limit plan only if the query is in append mode. As a result, instead of allowing max 5 rows across batches, the planned streaming query was allowing 5 rows in every batch thus producing incorrect results. **Solution**: Change the planner rule to plan the logical limit with a streaming limit plan even when the query is in complete mode if the logical limit has no stateful operator before it. **Bug 2 (SPARK-30657)**: `LocalLimitExec` does not consume the iterator of the child plan. So if there is a limit after a stateful operator like streaming dedup in append mode (e.g. `df.dropDuplicates().limit(5)`), the state changes of streaming duplicate may not be committed (most stateful ops commit state changes only after the generated iterator is fully consumed). **Solution**: Change the planner rule to always use a new `StreamingLocalLimitExec` which always fully consumes the iterator. This is the safest thing to do. However, this will introduce a performance regression as consuming the iterator is extra work. To minimize this performance impact, add an additional post-planner optimization rule to replace `StreamingLocalLimitExec` with `LocalLimitExec` when there is no stateful operator before the limit that could be affected by it. No Updated incorrect unit tests and added new ones Closes #27373 from tdas/SPARK-30657. Authored-by: Tathagata Das <tathagata.das1...@gmail.com> Signed-off-by: Shixiong Zhu <zsxw...@gmail.com> --- .../spark/sql/execution/SparkStrategies.scala | 38 ++++--- .../execution/streaming/IncrementalExecution.scala | 34 ++++++- ...GlobalLimitExec.scala => streamingLimits.scala} | 55 ++++++++-- .../apache/spark/sql/streaming/StreamSuite.scala | 112 ++++++++++++++++++++- 4 files changed, 211 insertions(+), 28 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 00ad4e0..bd2684d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -451,21 +451,35 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { * Used to plan the streaming global limit operator for streams in append mode. * We need to check for either a direct Limit or a Limit wrapped in a ReturnAnswer operator, * following the example of the SpecialLimits Strategy above. - * Streams with limit in Append mode use the stateful StreamingGlobalLimitExec. - * Streams with limit in Complete mode use the stateless CollectLimitExec operator. - * Limit is unsupported for streams in Update mode. */ case class StreamingGlobalLimitStrategy(outputMode: OutputMode) extends Strategy { - override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case ReturnAnswer(rootPlan) => rootPlan match { - case Limit(IntegerLiteral(limit), child) - if plan.isStreaming && outputMode == InternalOutputModes.Append => - StreamingGlobalLimitExec(limit, LocalLimitExec(limit, planLater(child))) :: Nil - case _ => Nil + + private def generatesStreamingAppends(plan: LogicalPlan): Boolean = { + + /** Ensures that this plan does not have a streaming aggregate in it. */ + def hasNoStreamingAgg: Boolean = { + plan.collectFirst { case a: Aggregate if a.isStreaming => a }.isEmpty } - case Limit(IntegerLiteral(limit), child) - if plan.isStreaming && outputMode == InternalOutputModes.Append => - StreamingGlobalLimitExec(limit, LocalLimitExec(limit, planLater(child))) :: Nil + + // The following cases of limits on a streaming plan has to be executed with a stateful + // streaming plan. + // 1. When the query is in append mode (that is, all logical plan operate on appended data). + // 2. When the plan does not contain any streaming aggregate (that is, plan has only + // operators that operate on appended data). This must be executed with a stateful + // streaming plan even if the query is in complete mode because of a later streaming + // aggregation (e.g., `streamingDf.limit(5).groupBy().count()`). + plan.isStreaming && ( + outputMode == InternalOutputModes.Append || + outputMode == InternalOutputModes.Complete && hasNoStreamingAgg) + } + + override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case ReturnAnswer(Limit(IntegerLiteral(limit), child)) if generatesStreamingAppends(child) => + StreamingGlobalLimitExec(limit, StreamingLocalLimitExec(limit, planLater(child))) :: Nil + + case Limit(IntegerLiteral(limit), child) if generatesStreamingAppends(child) => + StreamingGlobalLimitExec(limit, StreamingLocalLimitExec(limit, planLater(child))) :: Nil + case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index bf80a0b..09ae769 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.{CurrentBatchTimestamp, Express import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, HashPartitioning, SinglePartition} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SparkPlanner, UnaryExecNode} +import org.apache.spark.sql.execution.{LeafExecNode, LocalLimitExec, QueryExecution, SparkPlan, SparkPlanner, UnaryExecNode} import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.OutputMode @@ -105,6 +105,32 @@ class IncrementalExecution( /** Locates save/restore pairs surrounding aggregation. */ val state = new Rule[SparkPlan] { + /** + * Ensures that this plan DOES NOT have any stateful operation in it whose pipelined execution + * depends on this plan. In other words, this function returns true if this plan does + * have a narrow dependency on a stateful subplan. + */ + private def hasNoStatefulOp(plan: SparkPlan): Boolean = { + var statefulOpFound = false + + def findStatefulOp(planToCheck: SparkPlan): Unit = { + planToCheck match { + case s: StatefulOperator => + statefulOpFound = true + + case e: ShuffleExchangeExec => + // Don't search recursively any further as any child stateful operator as we + // are only looking for stateful subplans that this plan has narrow dependencies on. + + case p: SparkPlan => + p.children.foreach(findStatefulOp) + } + } + + findStatefulOp(plan) + !statefulOpFound + } + override def apply(plan: SparkPlan): SparkPlan = plan transform { case StateStoreSaveExec(keys, None, None, None, stateFormatVersion, UnaryExecNode(agg, @@ -149,6 +175,12 @@ class IncrementalExecution( l.copy( stateInfo = Some(nextStatefulOperationStateInfo), outputMode = Some(outputMode)) + + case StreamingLocalLimitExec(limit, child) if hasNoStatefulOp(child) => + // Optimize limit execution by replacing StreamingLocalLimitExec (consumes the iterator + // completely) to LocalLimitExec (does not consume the iterator) when the child plan has + // no stateful operator (i.e., consuming the iterator is not needed). + LocalLimitExec(limit, child) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingGlobalLimitExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/streamingLimits.scala similarity index 68% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingGlobalLimitExec.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/streamingLimits.scala index bf4af60..b195402 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingGlobalLimitExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/streamingLimits.scala @@ -20,21 +20,21 @@ import java.util.concurrent.TimeUnit.NANOSECONDS import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.expressions.GenericInternalRow -import org.apache.spark.sql.catalyst.expressions.UnsafeProjection -import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericInternalRow, SortOrder, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, Distribution, Partitioning} import org.apache.spark.sql.catalyst.streaming.InternalOutputModes -import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.{LimitExec, SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.streaming.state.StateStoreOps import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.{LongType, NullType, StructField, StructType} -import org.apache.spark.util.CompletionIterator +import org.apache.spark.util.{CompletionIterator, NextIterator} /** * A physical operator for executing a streaming limit, which makes sure no more than streamLimit - * rows are returned. This operator is meant for streams in Append mode only. + * rows are returned. This physical operator is only meant for logical limit operations that + * will get a input stream of rows that are effectively appends. For example, + * - limit on any query in append mode + * - limit before the aggregation in a streaming aggregation query complete mode */ case class StreamingGlobalLimitExec( streamLimit: Long, @@ -49,9 +49,6 @@ case class StreamingGlobalLimitExec( override protected def doExecute(): RDD[InternalRow] = { metrics // force lazy init at driver - assert(outputMode.isDefined && outputMode.get == InternalOutputModes.Append, - "StreamingGlobalLimitExec is only valid for streams in Append output mode") - child.execute().mapPartitionsWithStateStore( getStateInfo, keySchema, @@ -100,3 +97,41 @@ case class StreamingGlobalLimitExec( UnsafeProjection.create(valueSchema)(new GenericInternalRow(Array[Any](value))) } } + + +/** + * A physical operator for executing limits locally on each partition. The main difference from + * LocalLimitExec is that this will fully consume `child` plan's iterators to ensure that any + * stateful operation within `child` commits all the state changes (many stateful operations + * commit state changes only after the iterator is consumed). + */ +case class StreamingLocalLimitExec(limit: Int, child: SparkPlan) + extends LimitExec { + + override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter => + + var generatedCount = 0 + + new NextIterator[InternalRow]() { + override protected def getNext(): InternalRow = { + if (generatedCount < limit && iter.hasNext) { + generatedCount += 1 + iter.next() + } else { + finished = true + null + } + } + + override protected def close(): Unit = { + while (iter.hasNext) iter.next() // consume the iterator completely + } + } + } + + override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + override def outputPartitioning: Partitioning = child.outputPartitioning + + override def output: Seq[Attribute] = child.output +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index bf80962..b661882 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.plans.logical.Range import org.apache.spark.sql.catalyst.streaming.InternalOutputModes import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.execution.SimpleMode +import org.apache.spark.sql.execution.{LocalLimitExec, SimpleMode, SparkPlan} import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.sources.{ContinuousMemoryStream, MemorySink} @@ -976,24 +976,50 @@ class StreamSuite extends StreamTest { CheckAnswer(1 to 3: _*)) } - test("streaming limit in complete mode") { + test("SPARK-30658: streaming limit before agg in complete mode") { val inputData = MemoryStream[Int] val limited = inputData.toDF().limit(5).groupBy("value").count() testStream(limited, OutputMode.Complete())( AddData(inputData, 1 to 3: _*), CheckAnswer(Row(1, 1), Row(2, 1), Row(3, 1)), AddData(inputData, 1 to 9: _*), - CheckAnswer(Row(1, 2), Row(2, 2), Row(3, 2), Row(4, 1), Row(5, 1))) + CheckAnswer(Row(1, 2), Row(2, 2), Row(3, 1))) } - test("streaming limits in complete mode") { + test("SPARK-30658: streaming limits before and after agg in complete mode " + + "(after limit < before limit)") { val inputData = MemoryStream[Int] val limited = inputData.toDF().limit(4).groupBy("value").count().orderBy("value").limit(3) testStream(limited, OutputMode.Complete())( + StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "1")), AddData(inputData, 1 to 9: _*), + // only 1 to 4 should be allowed to aggregate, and counts for only 1 to 3 should be output CheckAnswer(Row(1, 1), Row(2, 1), Row(3, 1)), AddData(inputData, 2 to 6: _*), - CheckAnswer(Row(1, 1), Row(2, 2), Row(3, 2))) + // None of the new values should be allowed to aggregate, same 3 counts should be output + CheckAnswer(Row(1, 1), Row(2, 1), Row(3, 1))) + } + + test("SPARK-30658: streaming limits before and after agg in complete mode " + + "(before limit < after limit)") { + val inputData = MemoryStream[Int] + val limited = inputData.toDF().limit(2).groupBy("value").count().orderBy("value").limit(3) + testStream(limited, OutputMode.Complete())( + StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "1")), + AddData(inputData, 1 to 9: _*), + CheckAnswer(Row(1, 1), Row(2, 1)), + AddData(inputData, 2 to 6: _*), + CheckAnswer(Row(1, 1), Row(2, 1))) + } + + test("SPARK-30657: streaming limit after streaming dedup in append mode") { + val inputData = MemoryStream[Int] + val limited = inputData.toDF().dropDuplicates().limit(1) + testStream(limited)( + AddData(inputData, 1, 2), + CheckAnswer(Row(1)), + AddData(inputData, 3, 4), + CheckAnswer(Row(1))) } test("streaming limit in update mode") { @@ -1034,6 +1060,82 @@ class StreamSuite extends StreamTest { false)) } + test("SPARK-30657: streaming limit should not apply on limits on state subplans") { + val streanData = MemoryStream[Int] + val streamingDF = streanData.toDF().toDF("value") + val staticDF = spark.createDataset(Seq(1)).toDF("value").orderBy("value") + testStream(streamingDF.join(staticDF.limit(1), "value"))( + AddData(streanData, 1, 2, 3), + CheckAnswer(Row(1)), + AddData(streanData, 1, 3, 5), + CheckAnswer(Row(1), Row(1))) + } + + test("SPARK-30657: streaming limit optimization from StreamingLocalLimitExec to LocalLimitExec") { + val inputData = MemoryStream[Int] + val inputDF = inputData.toDF() + + /** Verify whether the local limit in the plan is a streaming limit or is a simple */ + def verifyLocalLimit( + df: DataFrame, + expectStreamingLimit: Boolean, + outputMode: OutputMode = OutputMode.Append): Unit = { + + var execPlan: SparkPlan = null + testStream(df, outputMode)( + AddData(inputData, 1), + AssertOnQuery { q => + q.processAllAvailable() + execPlan = q.lastExecution.executedPlan + true + } + ) + require(execPlan != null) + + val localLimits = execPlan.collect { + case l: LocalLimitExec => l + case l: StreamingLocalLimitExec => l + } + + require( + localLimits.size == 1, + s"Cant verify local limit optimization with this plan:\n$execPlan") + + if (expectStreamingLimit) { + assert( + localLimits.head.isInstanceOf[StreamingLocalLimitExec], + s"Local limit was not StreamingLocalLimitExec:\n$execPlan") + } else { + assert( + localLimits.head.isInstanceOf[LocalLimitExec], + s"Local limit was not LocalLimitExec:\n$execPlan") + } + } + + // Should not be optimized, so StreamingLocalLimitExec should be present + verifyLocalLimit(inputDF.dropDuplicates().limit(1), expectStreamingLimit = true) + + // Should be optimized from StreamingLocalLimitExec to LocalLimitExec + verifyLocalLimit(inputDF.limit(1), expectStreamingLimit = false) + verifyLocalLimit( + inputDF.limit(1).groupBy().count(), + expectStreamingLimit = false, + outputMode = OutputMode.Complete()) + + // Should be optimized as repartition is sufficient to ensure that the iterators of + // StreamingDeduplicationExec should be consumed completely by the repartition exchange. + verifyLocalLimit(inputDF.dropDuplicates().repartition(1).limit(1), expectStreamingLimit = false) + + // Should be LocalLimitExec in the first place, not from optimization of StreamingLocalLimitExec + val staticDF = spark.range(1).toDF("value").limit(1) + verifyLocalLimit(inputDF.toDF("value").join(staticDF, "value"), expectStreamingLimit = false) + + verifyLocalLimit( + inputDF.groupBy().count().limit(1), + expectStreamingLimit = false, + outputMode = OutputMode.Complete()) + } + test("is_continuous_processing property should be false for microbatch processing") { val input = MemoryStream[Int] val df = input.toDS() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org