[ https://issues.apache.org/jira/browse/SPARK-47840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jungtaek Lim resolved SPARK-47840. ---------------------------------- Fix Version/s: 3.5.2 4.0.0 Resolution: Fixed Issue resolved by pull request 46035 [https://github.com/apache/spark/pull/46035] > Remove foldable propagation across Streaming Aggregate/Join nodes > ----------------------------------------------------------------- > > Key: SPARK-47840 > URL: https://issues.apache.org/jira/browse/SPARK-47840 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 4.0.0, 3.5.1 > Reporter: Bhuwan Sahni > Assignee: Bhuwan Sahni > Priority: Major > Labels: pull-request-available > Fix For: 3.5.2, 4.0.0 > > > Streaming queries with Union of 2 data streams followed by an Aggregate > (groupBy) can produce incorrect results if the grouping key is a constant > literal for micro-batch duration. > The query produces incorrect results because the query optimizer recognizes > the literal value in the grouping key as foldable and replaces the grouping > key expression with the actual literal value. This optimization is correct > for batch queries. However Streaming queries also read information from > StateStore, and the output contains both the results from StateStore > (computed in previous microbatches) and data from input sources (computed in > this microbatch). The HashAggregate node after StateStore always reads > grouping key value as the optimized literal (as the grouping key expression > is optimized into a literal by the optimizer). This ends up replacing keys in > StateStore with the literal value resulting in incorrect output. > See an example logical and physical plan below for a query performing a union > on 2 data streams, followed by a groupBy. Note that the name#4 expression has > been optimized to ds1. The Streaming query Aggregate adds StateStoreSave node > as child of HashAggregate, however any grouping key read from StateStore will > still be read as ds1 due to the optimization. > > *Optimized Logical Plan* > {quote}=== Applying Rule > org.apache.spark.sql.catalyst.optimizer.FoldablePropagation === > === Old Plan === > WriteToMicroBatchDataSource MemorySink, eb67645e-30fc-41a8-8006-35bb7649c202, > Complete, 0 > +- Aggregate [name#4|#4], [name#4, count(1) AS count#31L|#4, count(1) AS > count#31L] > +- Project [ds1 AS name#4|#4] > +- StreamingDataSourceV2ScanRelation[value#1|#1] MemoryStreamDataSource > === New Plan === > WriteToMicroBatchDataSource MemorySink, eb67645e-30fc-41a8-8006-35bb7649c202, > Complete, 0 > +- Aggregate [ds1], [ds1 AS name#4, count(1) AS count#31L|#4, count(1) AS > count#31L] > +- Project [ds1 AS name#4|#4] > +- StreamingDataSourceV2ScanRelation[value#1|#1] MemoryStreamDataSource > ==== > {quote} > *Corresponding Physical Plan* > {quote}WriteToDataSourceV2 MicroBatchWrite[epoch: 0, writer: > org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@2b4c6242], > > org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$3143/1859075634@35709d26 > +- HashAggregate(keys=[ds1#39|#39], functions=[finalmerge_count(merge > count#38L) AS count(1)#30L|#38L) AS count(1)#30L], output=[name#4, > count#31L|#4, count#31L]) > +- StateStoreSave [ds1#39|#39], state info [ checkpoint = > [file:/tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state|file:///tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state], > runId = 4dedecca-910c-4518-855e-456702617414, opId = 0, ver = 0, > numPartitions = 5], Complete, 0, 0, 2 > +- HashAggregate(keys=[ds1#39|#39], functions=[merge_count(merge > count#38L) AS count#38L|#38L) AS count#38L], output=[ds1#39, count#38L|#39, > count#38L]) > +- StateStoreRestore [ds1#39|#39], state info [ checkpoint = > [file:/tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state|file:///tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state], > runId = 4dedecca-910c-4518-855e-456702617414, opId = 0, ver = 0, > numPartitions = 5], 2 > +- HashAggregate(keys=[ds1#39|#39], functions=[merge_count(merge > count#38L) AS count#38L|#38L) AS count#38L], output=[ds1#39, count#38L|#39, > count#38L]) > +- HashAggregate(keys=[ds1 AS ds1#39|#39], > functions=[partial_count(1) AS count#38L|#38L], output=[ds1#39, > count#38L|#39, count#38L]) > +- Project > +- MicroBatchScan[value#1|#1] MemoryStreamDataSource > {quote} > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org