[ 
https://issues.apache.org/jira/browse/SPARK-47840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-47840.
----------------------------------
    Fix Version/s: 3.5.2
                   4.0.0
       Resolution: Fixed

Issue resolved by pull request 46035
[https://github.com/apache/spark/pull/46035]

> Remove foldable propagation across Streaming Aggregate/Join nodes
> -----------------------------------------------------------------
>
>                 Key: SPARK-47840
>                 URL: https://issues.apache.org/jira/browse/SPARK-47840
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 4.0.0, 3.5.1
>            Reporter: Bhuwan Sahni
>            Assignee: Bhuwan Sahni
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 3.5.2, 4.0.0
>
>
> Streaming queries with Union of 2 data streams followed by an Aggregate 
> (groupBy) can produce incorrect results if the grouping key is a constant 
> literal for micro-batch duration.
> The query produces incorrect results because the query optimizer recognizes 
> the literal value in the grouping key as foldable and replaces the grouping 
> key expression with the actual literal value. This optimization is correct 
> for batch queries. However Streaming queries also read information from 
> StateStore, and the output contains both the results from StateStore 
> (computed in previous microbatches) and data from input sources (computed in 
> this microbatch). The HashAggregate node after StateStore always reads 
> grouping key value as the optimized literal (as the grouping key expression 
> is optimized into a literal by the optimizer). This ends up replacing keys in 
> StateStore with the literal value resulting in incorrect output. 
> See an example logical and physical plan below for a query performing a union 
> on 2 data streams, followed by a groupBy. Note that the name#4 expression has 
> been optimized to ds1. The Streaming query Aggregate adds StateStoreSave node 
> as child of HashAggregate, however any grouping key read from StateStore will 
> still be read as ds1 due to the optimization. 
>  
> *Optimized Logical Plan*
> {quote}=== Applying Rule 
> org.apache.spark.sql.catalyst.optimizer.FoldablePropagation ===
> === Old Plan ===
> WriteToMicroBatchDataSource MemorySink, eb67645e-30fc-41a8-8006-35bb7649c202, 
> Complete, 0
> +- Aggregate [name#4|#4], [name#4, count(1) AS count#31L|#4, count(1) AS 
> count#31L]
>    +- Project [ds1 AS name#4|#4]
>       +- StreamingDataSourceV2ScanRelation[value#1|#1] MemoryStreamDataSource
> === New Plan ===
> WriteToMicroBatchDataSource MemorySink, eb67645e-30fc-41a8-8006-35bb7649c202, 
> Complete, 0
> +- Aggregate [ds1], [ds1 AS name#4, count(1) AS count#31L|#4, count(1) AS 
> count#31L]
>    +- Project [ds1 AS name#4|#4]
>       +- StreamingDataSourceV2ScanRelation[value#1|#1] MemoryStreamDataSource
> ====
> {quote}
> *Corresponding Physical Plan*
> {quote}WriteToDataSourceV2 MicroBatchWrite[epoch: 0, writer: 
> org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@2b4c6242],
>  
> org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$3143/1859075634@35709d26
> +- HashAggregate(keys=[ds1#39|#39], functions=[finalmerge_count(merge 
> count#38L) AS count(1)#30L|#38L) AS count(1)#30L], output=[name#4, 
> count#31L|#4, count#31L])
>    +- StateStoreSave [ds1#39|#39], state info [ checkpoint = 
> [file:/tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state|file:///tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state],
>  runId = 4dedecca-910c-4518-855e-456702617414, opId = 0, ver = 0, 
> numPartitions = 5], Complete, 0, 0, 2
>       +- HashAggregate(keys=[ds1#39|#39], functions=[merge_count(merge 
> count#38L) AS count#38L|#38L) AS count#38L], output=[ds1#39, count#38L|#39, 
> count#38L])
>          +- StateStoreRestore [ds1#39|#39], state info [ checkpoint = 
> [file:/tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state|file:///tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state],
>  runId = 4dedecca-910c-4518-855e-456702617414, opId = 0, ver = 0, 
> numPartitions = 5], 2
>             +- HashAggregate(keys=[ds1#39|#39], functions=[merge_count(merge 
> count#38L) AS count#38L|#38L) AS count#38L], output=[ds1#39, count#38L|#39, 
> count#38L])
>                +- HashAggregate(keys=[ds1 AS ds1#39|#39], 
> functions=[partial_count(1) AS count#38L|#38L], output=[ds1#39, 
> count#38L|#39, count#38L])
>                   +- Project
>                      +- MicroBatchScan[value#1|#1] MemoryStreamDataSource
> {quote}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to