This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 8506728 [SPARK-32776][SS] Limit in streaming should not be optimized away by PropagateEmptyRelation 8506728 is described below commit 850672870faed6677d166c0029335600a15f4153 Author: liwensun <liwen....@databricks.com> AuthorDate: Wed Sep 2 18:05:06 2020 +0900 [SPARK-32776][SS] Limit in streaming should not be optimized away by PropagateEmptyRelation PropagateEmptyRelation will not be applied to LIMIT operators in streaming queries. Right now, the limit operator in a streaming query may get optimized away when the relation is empty. This can be problematic for stateful streaming, as this empty batch will not write any state store files, and the next batch will fail when trying to read these state store files and throw a file not found error. We should not let PropagateEmptyRelation optimize away the Limit operator for streaming queries. This PR is intended as a small and safe fix for PropagateEmptyRelation. A fundamental fix that can prevent this from happening again in the future and in other optimizer rules is more desirable, but that's a much larger task. No unit tests. Closes #29623 from liwensun/spark-32776. Authored-by: liwensun <liwen....@databricks.com> Signed-off-by: HyukjinKwon <gurwls...@apache.org> (cherry picked from commit f0851e95c6a2c05716a32c961bf2842e18fe1bc7) Signed-off-by: HyukjinKwon <gurwls...@apache.org> --- .../optimizer/PropagateEmptyRelation.scala | 4 +-- .../optimizer/PropagateEmptyRelationSuite.scala | 6 ++++ .../spark/sql/streaming/StreamingQuerySuite.scala | 38 ++++++++++++++++++++++ 3 files changed, 46 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala index b19e138..7535f9c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala @@ -85,8 +85,8 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper wit case _: Filter => empty(p) case _: Sample => empty(p) case _: Sort => empty(p) - case _: GlobalLimit => empty(p) - case _: LocalLimit => empty(p) + case _: GlobalLimit if !p.isStreaming => empty(p) + case _: LocalLimit if !p.isStreaming => empty(p) case _: Repartition => empty(p) case _: RepartitionByExpression => empty(p) // An aggregate with non-empty group expression will return one output row per group when the diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala index 9c7d4c7..535be12 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala @@ -221,4 +221,10 @@ class PropagateEmptyRelationSuite extends PlanTest { val optimized = Optimize.execute(query.analyze) assert(optimized.resolved) } + + test("should not optimize away limit if streaming") { + val query = LocalRelation(Nil, Nil, isStreaming = true).limit(1).analyze + val optimized = Optimize.execute(query) + comparePlans(optimized, query) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 434b6a2..1f408d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -34,7 +34,9 @@ import org.scalatestplus.mockito.MockitoSugar import org.apache.spark.{SparkException, TestUtils} import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, Row} +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Literal, Rand, Randn, Shuffle, Uuid} +import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Complete import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2} @@ -1141,6 +1143,42 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi } } + testQuietly("limit on empty batch should not cause state store error") { + // The source only produces two batches, the first batch is empty and the second batch has data. + val source = new Source { + var batchId = 0 + override def stop(): Unit = {} + override def getOffset: Option[Offset] = { + Some(LongOffset(batchId + 1)) + } + override def getBatch(start: Option[Offset], end: Offset): DataFrame = { + if (batchId == 0) { + batchId += 1 + Dataset.ofRows(spark, LocalRelation(schema.toAttributes, Nil, isStreaming = true)) + } else { + Dataset.ofRows(spark, + LocalRelation(schema.toAttributes, InternalRow(10) :: Nil, isStreaming = true)) + } + } + override def schema: StructType = MockSourceProvider.fakeSchema + } + + MockSourceProvider.withMockSources(source) { + val df = spark.readStream + .format("org.apache.spark.sql.streaming.util.MockSourceProvider") + .load() + .limit(1) + + testStream(df)( + StartStream(), + AssertOnQuery { q => + q.processAllAvailable() + true + }, + CheckAnswer(10)) + } + } + private def checkExceptionMessage(df: DataFrame): Unit = { withTempDir { outputDir => withTempDir { checkpointDir => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org