This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new 172a23f780ae [SPARK-48267][SS] Regression e2e test with SPARK-47305 172a23f780ae is described below commit 172a23f780ae2a603908421b49683aff6748e419 Author: Jungtaek Lim <kabhwan.opensou...@gmail.com> AuthorDate: Tue May 14 15:40:51 2024 +0900 [SPARK-48267][SS] Regression e2e test with SPARK-47305 ### What changes were proposed in this pull request? This PR proposes to add a regression test (e2e) with SPARK-47305. As of commit cae2248bc13 (pre-Spark 4.0), the query in new unit test is represented as below logical plans: > Batch 0 >> analyzed plan ``` WriteToMicroBatchDataSource MemorySink, 5067923b-e1d0-484c-914c-b111c9e60aac, Append, 0 +- Project [value#1] +- Join Inner, (cast(code#5 as bigint) = ref_code#14L) :- Union false, false : :- Project [value#1, 1 AS code#5] : : +- StreamingDataSourceV2ScanRelation[value#1] MemoryStreamDataSource : +- Project [value#3, cast(code#9 as int) AS code#16] : +- Project [value#3, null AS code#9] : +- LocalRelation <empty>, [value#3] +- Project [id#12L AS ref_code#14L] +- Range (1, 5, step=1, splits=Some(2)) ``` >> optimized plan ``` WriteToDataSourceV2 MicroBatchWrite[epoch: 0, writer: ...] +- Join Inner :- StreamingDataSourceV2ScanRelation[value#1] MemoryStreamDataSource +- Project +- Filter (1 = id#12L) +- Range (1, 5, step=1, splits=Some(2)) ``` > Batch 1 >> analyzed plan ``` WriteToMicroBatchDataSource MemorySink, d1c8be66-88e7-437a-9f25-6b87db8efe17, Append, 1 +- Project [value#1] +- Join Inner, (cast(code#5 as bigint) = ref_code#14L) :- Union false, false : :- Project [value#1, 1 AS code#5] : : +- LocalRelation <empty>, [value#1] : +- Project [value#3, cast(code#9 as int) AS code#16] : +- Project [value#3, null AS code#9] : +- StreamingDataSourceV2ScanRelation[value#3] MemoryStreamDataSource +- Project [id#12L AS ref_code#14L] +- Range (1, 5, step=1, splits=Some(2)) ``` >> optimized plan ``` WriteToDataSourceV2 MicroBatchWrite[epoch: 1, writer: ...] +- Join Inner :- StreamingDataSourceV2ScanRelation[value#3] MemoryStreamDataSource +- LocalRelation <empty> ``` Notice the difference in optimized plan between batch 0 and batch 1. In optimized plan for batch 1, the batch side is pruned out, which goes with the path of PruneFilters. The sequence of optimization is, 1) left stream side is collapsed with empty local relation 2) union is replaced with subtree for right stream side as left stream side is simply an empty local relation 3) the value of 'code' column is now known to be 'null' and it's propagated to the join criteria (`null = ref_code`) 4) join criteria is extracted out from join, and being pushed to the batch side 5) the value of 'ref_code' column can never be null, hence the filter is optimized as `filter false` 6) `filter false` triggers PruneFilters (where we fix a bug in SPARK-47305) Before SPARK-47305, a new empty local relation was incorrectly marked as streaming. NOTE: I intentionally didn't put the detail like above as code comment, as optimization result is subject to change for Spark versions. ### Why are the changes needed? In the PR of SPARK-47305 we only added an unit test to verify the fix, but it wasn't e2e about the workload we encountered an issue. Given the complexity of QO, it'd be ideal to put an e2e reproducer (despite simplified) as regression test. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New UT. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46569 from HeartSaVioR/SPARK-48267. Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- ...treamingQueryOptimizationCorrectnessSuite.scala | 37 +++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryOptimizationCorrectnessSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryOptimizationCorrectnessSuite.scala index efc84c8e4c7c..d17da5d31edd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryOptimizationCorrectnessSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryOptimizationCorrectnessSuite.scala @@ -21,7 +21,7 @@ import java.sql.Timestamp import org.apache.spark.sql.Row import org.apache.spark.sql.execution.streaming.MemoryStream -import org.apache.spark.sql.functions.{lit, window} +import org.apache.spark.sql.functions.{expr, lit, window} /** * This test ensures that any optimizations done by Spark SQL optimizer are @@ -416,4 +416,39 @@ class StreamingQueryOptimizationCorrectnessSuite extends StreamTest { ) } } + + test("SPARK-48267: regression test, stream-stream union followed by stream-batch join") { + withTempDir { dir => + val input1 = MemoryStream[Int] + val input2 = MemoryStream[Int] + + val df1 = input1.toDF().withColumn("code", lit(1)) + val df2 = input2.toDF().withColumn("code", lit(null)) + + // NOTE: The column 'ref_code' is known to be non-nullable. + val batchDf = spark.range(1, 5).select($"id".as("ref_code")) + + val unionDf = df1.union(df2) + .join(batchDf, expr("code = ref_code")) + .select("value") + + testStream(unionDf)( + StartStream(checkpointLocation = dir.getAbsolutePath), + + AddData(input1, 1, 2, 3), + CheckNewAnswer(1, 2, 3), + + AddData(input2, 1, 2, 3), + // The test failed before SPARK-47305 - the test failed with below error message: + // org.apache.spark.sql.streaming.StreamingQueryException: Stream-stream join without + // equality predicate is not supported.; + // Join Inner + // :- StreamingDataSourceV2ScanRelation[value#3] MemoryStreamDataSource + // +- LocalRelation <empty> + // Note that LocalRelation <empty> is actually a batch source (Range) but due to + // a bug, it was incorrect marked to the streaming. SPARK-47305 fixed the bug. + CheckNewAnswer() + ) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org