[ https://issues.apache.org/jira/browse/SPARK-37987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jungtaek Lim resolved SPARK-37987. ---------------------------------- Fix Version/s: 3.3.0 3.2.1 Resolution: Fixed Issue resolved by pull request 35298 [https://github.com/apache/spark/pull/35298] > Flaky Test: StreamingAggregationSuite.changing schema of state when > restarting query - state format version 1 > ------------------------------------------------------------------------------------------------------------- > > Key: SPARK-37987 > URL: https://issues.apache.org/jira/browse/SPARK-37987 > Project: Spark > Issue Type: Test > Components: Structured Streaming, Tests > Affects Versions: 3.3.0 > Reporter: Hyukjin Kwon > Assignee: Jungtaek Lim > Priority: Major > Fix For: 3.3.0, 3.2.1 > > > {code} > StreamingAggregationSuite.changing schema of state when restarting query - > state format version 1 > {code} > {code} > org.scalatest.exceptions.TestFailedException: > Error while checking stream failure: stateSchemaExc.isDefined was false > org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) > > org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) > > org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231) > > org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295) > > org.apache.spark.sql.streaming.StreamingAggregationSuite.$anonfun$new$82(StreamingAggregationSuite.scala:781) > > org.apache.spark.sql.streaming.StreamingAggregationSuite.$anonfun$new$82$adapted(StreamingAggregationSuite.scala:779) > > org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$33(StreamTest.scala:644) > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127) > org.scalatest.concurrent.TimeLimits.failAfterImpl(TimeLimits.scala:239) > == Progress == > > StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@6c245252,Map(),/home/runner/work/spark/spark/target/tmp/spark-71fa9d86-7cad-4a5d-8666-be8c2f86deb2) > AddData to MemoryStream[value#12734]: 21 > => ExpectFailure[org.apache.spark.SparkException, isFatalError: false] > == Stream == > Output Mode: Update > Stream state: {MemoryStream[value#12734]: 0} > Thread state: dead > == Sink == > == Plan == > == Parsed Logical Plan == > WriteToDataSourceV2 > org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@47606c55 > +- Aggregate [id#12736], [id#12736, sum(value#12734) AS sum_value#12742L, > avg(value#12734) AS avg_value#12744, collect_list(value#12734, 0, 0) AS > values#12746] > +- Project [(value#12734 % 10) AS id#12736, value#12734] > +- StreamingDataSourceV2Relation [value#12734], > org.apache.spark.sql.execution.streaming.MemoryStreamScanBuilder@17b95883, > MemoryStream[value#12734], 0, 1 > == Analyzed Logical Plan == > WriteToDataSourceV2 > org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@47606c55 > +- Aggregate [id#12736], [id#12736, sum(value#12734) AS sum_value#12742L, > avg(value#12734) AS avg_value#12744, collect_list(value#12734, 0, 0) AS > values#12746] > +- Project [(value#12734 % 10) AS id#12736, value#12734] > +- StreamingDataSourceV2Relation [value#12734], > org.apache.spark.sql.execution.streaming.MemoryStreamScanBuilder@17b95883, > MemoryStream[value#12734], 0, 1 > == Optimized Logical Plan == > WriteToDataSourceV2 > org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@47606c55 > +- Aggregate [id#12736], [id#12736, sum(value#12734) AS sum_value#12742L, > avg(value#12734) AS avg_value#12744, collect_list(value#12734, 0, 0) AS > values#12746] > +- Project [(value#12734 % 10) AS id#12736, value#12734] > +- StreamingDataSourceV2Relation [value#12734], > org.apache.spark.sql.execution.streaming.MemoryStreamScanBuilder@17b95883, > MemoryStream[value#12734], 0, 1 > == Physical Plan == > WriteToDataSourceV2 > org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@47606c55, > org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$2001/1955131699@11177c0f > +- ObjectHashAggregate(keys=[id#12736], functions=[sum(value#12734), > avg(value#12734), collect_list(value#12734, 0, 0)], output=[id#12736, > sum_value#12742L, avg_value#12744, values#12746]) > +- StateStoreSave [id#12736], state info [ checkpoint = > file:/home/runner/work/spark/spark/target/tmp/spark-71fa9d86-7cad-4a5d-8666-be8c2f86deb2/state, > runId = 6ea16dfe-e274-46cf-af2f-3b8f1a532218, opId = 0, ver = 1, > numPartitions = 5], Update, 0, 1 > +- ObjectHashAggregate(keys=[id#12736], > functions=[merge_sum(value#12734), merge_avg(value#12734), > merge_collect_list(value#12734, 0, 0)], output=[id#12736, sum#12756L, > sum#12759, count#12760L, buf#12762]) > +- StateStoreRestore [id#12736], state info [ checkpoint = > file:/home/runner/work/spark/spark/target/tmp/spark-71fa9d86-7cad-4a5d-8666-be8c2f86deb2/state, > runId = 6ea16dfe-e274-46cf-af2f-3b8f1a532218, opId = 0, ver = 1, > numPartitions = 5], 1 > +- Exchange hashpartitioning(id#12736, 5), ENSURE_REQUIREMENTS, > [id=#50108] > +- ObjectHashAggregate(keys=[id#12736], > functions=[merge_sum(value#12734), merge_avg(value#12734), > merge_collect_list(value#12734, 0, 0)], output=[id#12736, sum#12756L, > sum#12759, count#12760L, buf#12762]) > +- ObjectHashAggregate(keys=[id#12736], > functions=[partial_sum(value#12734), partial_avg(value#12734), > partial_collect_list(value#12734, 0, 0)], output=[id#12736, sum#12756L, > sum#12759, count#12760L, buf#12762]) > +- *(1) Project [(value#12734 % 10) AS id#12736, > value#12734] > +- MicroBatchScan[value#12734] MemoryStreamDataSource > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org