Bhuwan Sahni created SPARK-47840:
------------------------------------
Summary: Remove foldable propagation across Streaming
Aggregate/Join nodes
Key: SPARK-47840
URL: https://issues.apache.org/jira/browse/SPARK-47840
Project: Spark
Issue Type: Bug
Components: Structured Streaming
Affects Versions: 3.5.1, 4.0.0
Reporter: Bhuwan Sahni
Streaming queries with Union of 2 data streams followed by an Aggregate
(groupBy) can produce incorrect results if the grouping key is a constant
literal for micro-batch duration.
The query produces incorrect results because the query optimizer recognizes the
literal value in the grouping key as foldable and replaces the grouping key
expression with the actual literal value. This optimization is correct for
batch queries. However Streaming queries also read information from StateStore,
and the output contains both the results from StateStore (computed in previous
microbatches) and data from input sources (computed in this microbatch). The
HashAggregate node after StateStore always reads grouping key value as the
optimized literal (as the grouping key expression is optimized into a literal
by the optimizer). This ends up replacing keys in StateStore with the literal
value resulting in incorrect output.
See an example logical and physical plan below for a query performing a union
on 2 data streams, followed by a groupBy. Note that the name#4 expression has
been optimized to ds1. The Streaming query Aggregate adds StateStoreSave node
as child of HashAggregate, however any grouping key read from StateStore will
still be read as ds1 due to the optimization.
### Optimized Logical Plan
```
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.FoldablePropagation
===
=== Old Plan ===
WriteToMicroBatchDataSource MemorySink, eb67645e-30fc-41a8-8006-35bb7649c202,
Complete, 0
+- Aggregate [name#4], [name#4, count(1) AS count#31L]
+- Project [ds1 AS name#4]
+- StreamingDataSourceV2ScanRelation[value#1] MemoryStreamDataSource
=== New Plan ===
WriteToMicroBatchDataSource MemorySink, eb67645e-30fc-41a8-8006-35bb7649c202,
Complete, 0
+- Aggregate [ds1], [ds1 AS name#4, count(1) AS count#31L]
+- Project [ds1 AS name#4]
+- StreamingDataSourceV2ScanRelation[value#1] MemoryStreamDataSource
====
```
### Corresponding Physical Plan
```
WriteToDataSourceV2 MicroBatchWrite[epoch: 0, writer:
org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@2b4c6242],
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$3143/1859075634@35709d26
+- HashAggregate(keys=[ds1#39], functions=[finalmerge_count(merge count#38L) AS
count(1)#30L], output=[name#4, count#31L])
+- StateStoreSave [ds1#39], state info [ checkpoint =
file:/tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state, runId
= 4dedecca-910c-4518-855e-456702617414, opId = 0, ver = 0, numPartitions = 5],
Complete, 0, 0, 2
+- HashAggregate(keys=[ds1#39], functions=[merge_count(merge count#38L)
AS count#38L], output=[ds1#39, count#38L])
+- StateStoreRestore [ds1#39], state info [ checkpoint =
file:/tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state, runId
= 4dedecca-910c-4518-855e-456702617414, opId = 0, ver = 0, numPartitions = 5], 2
+- HashAggregate(keys=[ds1#39], functions=[merge_count(merge
count#38L) AS count#38L], output=[ds1#39, count#38L])
+- HashAggregate(keys=[ds1 AS ds1#39],
functions=[partial_count(1) AS count#38L], output=[ds1#39, count#38L])
+- Project
+- MicroBatchScan[value#1] MemoryStreamDataSource
```
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]