[ 
https://issues.apache.org/jira/browse/SPARK-37987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-37987.
----------------------------------
    Fix Version/s: 3.3.0
                   3.2.1
       Resolution: Fixed

Issue resolved by pull request 35298
[https://github.com/apache/spark/pull/35298]

> Flaky Test: StreamingAggregationSuite.changing schema of state when 
> restarting query - state format version 1
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-37987
>                 URL: https://issues.apache.org/jira/browse/SPARK-37987
>             Project: Spark
>          Issue Type: Test
>          Components: Structured Streaming, Tests
>    Affects Versions: 3.3.0
>            Reporter: Hyukjin Kwon
>            Assignee: Jungtaek Lim
>            Priority: Major
>             Fix For: 3.3.0, 3.2.1
>
>
> {code}
> StreamingAggregationSuite.changing schema of state when restarting query - 
> state format version 1
> {code}
> {code}
> org.scalatest.exceptions.TestFailedException: 
> Error while checking stream failure: stateSchemaExc.isDefined was false
> org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
>       
> org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
>       
> org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
>       
> org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
>       
> org.apache.spark.sql.streaming.StreamingAggregationSuite.$anonfun$new$82(StreamingAggregationSuite.scala:781)
>       
> org.apache.spark.sql.streaming.StreamingAggregationSuite.$anonfun$new$82$adapted(StreamingAggregationSuite.scala:779)
>       
> org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$33(StreamTest.scala:644)
>       scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>       org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
>       org.scalatest.concurrent.TimeLimits.failAfterImpl(TimeLimits.scala:239)
> == Progress ==
>    
> StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@6c245252,Map(),/home/runner/work/spark/spark/target/tmp/spark-71fa9d86-7cad-4a5d-8666-be8c2f86deb2)
>    AddData to MemoryStream[value#12734]: 21
> => ExpectFailure[org.apache.spark.SparkException, isFatalError: false]
> == Stream ==
> Output Mode: Update
> Stream state: {MemoryStream[value#12734]: 0}
> Thread state: dead
> == Sink ==
> == Plan ==
> == Parsed Logical Plan ==
> WriteToDataSourceV2 
> org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@47606c55
> +- Aggregate [id#12736], [id#12736, sum(value#12734) AS sum_value#12742L, 
> avg(value#12734) AS avg_value#12744, collect_list(value#12734, 0, 0) AS 
> values#12746]
>    +- Project [(value#12734 % 10) AS id#12736, value#12734]
>       +- StreamingDataSourceV2Relation [value#12734], 
> org.apache.spark.sql.execution.streaming.MemoryStreamScanBuilder@17b95883, 
> MemoryStream[value#12734], 0, 1
> == Analyzed Logical Plan ==
> WriteToDataSourceV2 
> org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@47606c55
> +- Aggregate [id#12736], [id#12736, sum(value#12734) AS sum_value#12742L, 
> avg(value#12734) AS avg_value#12744, collect_list(value#12734, 0, 0) AS 
> values#12746]
>    +- Project [(value#12734 % 10) AS id#12736, value#12734]
>       +- StreamingDataSourceV2Relation [value#12734], 
> org.apache.spark.sql.execution.streaming.MemoryStreamScanBuilder@17b95883, 
> MemoryStream[value#12734], 0, 1
> == Optimized Logical Plan ==
> WriteToDataSourceV2 
> org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@47606c55
> +- Aggregate [id#12736], [id#12736, sum(value#12734) AS sum_value#12742L, 
> avg(value#12734) AS avg_value#12744, collect_list(value#12734, 0, 0) AS 
> values#12746]
>    +- Project [(value#12734 % 10) AS id#12736, value#12734]
>       +- StreamingDataSourceV2Relation [value#12734], 
> org.apache.spark.sql.execution.streaming.MemoryStreamScanBuilder@17b95883, 
> MemoryStream[value#12734], 0, 1
> == Physical Plan ==
> WriteToDataSourceV2 
> org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@47606c55, 
> org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$2001/1955131699@11177c0f
> +- ObjectHashAggregate(keys=[id#12736], functions=[sum(value#12734), 
> avg(value#12734), collect_list(value#12734, 0, 0)], output=[id#12736, 
> sum_value#12742L, avg_value#12744, values#12746])
>    +- StateStoreSave [id#12736], state info [ checkpoint = 
> file:/home/runner/work/spark/spark/target/tmp/spark-71fa9d86-7cad-4a5d-8666-be8c2f86deb2/state,
>  runId = 6ea16dfe-e274-46cf-af2f-3b8f1a532218, opId = 0, ver = 1, 
> numPartitions = 5], Update, 0, 1
>       +- ObjectHashAggregate(keys=[id#12736], 
> functions=[merge_sum(value#12734), merge_avg(value#12734), 
> merge_collect_list(value#12734, 0, 0)], output=[id#12736, sum#12756L, 
> sum#12759, count#12760L, buf#12762])
>          +- StateStoreRestore [id#12736], state info [ checkpoint = 
> file:/home/runner/work/spark/spark/target/tmp/spark-71fa9d86-7cad-4a5d-8666-be8c2f86deb2/state,
>  runId = 6ea16dfe-e274-46cf-af2f-3b8f1a532218, opId = 0, ver = 1, 
> numPartitions = 5], 1
>             +- Exchange hashpartitioning(id#12736, 5), ENSURE_REQUIREMENTS, 
> [id=#50108]
>                +- ObjectHashAggregate(keys=[id#12736], 
> functions=[merge_sum(value#12734), merge_avg(value#12734), 
> merge_collect_list(value#12734, 0, 0)], output=[id#12736, sum#12756L, 
> sum#12759, count#12760L, buf#12762])
>                   +- ObjectHashAggregate(keys=[id#12736], 
> functions=[partial_sum(value#12734), partial_avg(value#12734), 
> partial_collect_list(value#12734, 0, 0)], output=[id#12736, sum#12756L, 
> sum#12759, count#12760L, buf#12762])
>                      +- *(1) Project [(value#12734 % 10) AS id#12736, 
> value#12734]
>                         +- MicroBatchScan[value#12734] MemoryStreamDataSource
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to