[
https://issues.apache.org/jira/browse/SPARK-54058?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated SPARK-54058:
-----------------------------------
Labels: pull-request-available (was: )
> Fix Flaky Test: `KafkaMicroBatchV1SourceWithConsumerSuite.Query with
> Trigger.AvailableNow should throw error when topic partitions got unavailable
> during subsequent batches`
> -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-54058
> URL: https://issues.apache.org/jira/browse/SPARK-54058
> Project: Spark
> Issue Type: Sub-task
> Components: SQL, Structured Streaming, Tests
> Affects Versions: 4.1.0
> Reporter: Kousuke Saruta
> Priority: Major
> Labels: pull-request-available
>
> Recently, `KafkaMicroBatchV1SourceWithConsumerSuite.Query with
> Trigger.AvailableNow should throw error when topic partitions got unavailable
> during subsequent batches` frequently fails.
> [https://github.com/apache/spark/actions/runs/18872699886/job/53854858890]
>
> {code:java}
> [info] - Query with Trigger.AvailableNow should throw error when topic
> partitions got unavailable during subsequent batches *** FAILED *** (1 minute)
> [info] java.lang.AssertionError: assertion failed: Exception tree doesn't
> contain the expected exception with message: Some of partitions in Kafka
> topic(s) have been lost during running query with Trigger.AvailableNow.
> [info] org.scalatest.exceptions.TestFailedException: isPropagated was false
> Partition [topic-41, 1] metadata not propagated after timeout
> [info] at
> org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
> [info] at
> org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
> [info] at
> org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
> [info] at
> org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
> [info] at
> org.apache.spark.sql.kafka010.KafkaTestUtils.$anonfun$waitUntilMetadataIsPropagated$1(KafkaTestUtils.scala:614)
> [info] at
> org.scalatest.enablers.Retrying$$anon$4.makeAValiantAttempt$1(Retrying.scala:184)
> [info] at
> org.scalatest.enablers.Retrying$$anon$4.tryTryAgain$2(Retrying.scala:196)
> [info] at
> org.scalatest.enablers.Retrying$$anon$4.retry(Retrying.scala:226)
> [info] at
> org.scalatest.concurrent.Eventually.eventually(Eventually.scala:348)
> [info] at
> org.scalatest.concurrent.Eventually.eventually$(Eventually.scala:347)
> [info] at
> org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:457)
> [info] at
> org.apache.spark.sql.kafka010.KafkaTestUtils.waitUntilMetadataIsPropagated(KafkaTestUtils.scala:613)
> [info] at
> org.apache.spark.sql.kafka010.KafkaTestUtils.$anonfun$createTopic$1(KafkaTestUtils.scala:378)
> [info] at
> scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:256)
> [info] at
> org.apache.spark.sql.kafka010.KafkaTestUtils.createTopic(KafkaTestUtils.scala:377)
> [info] at
> org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase.$anonfun$new$11(KafkaMicroBatchSourceSuite.scala:352)
> [info] at
> org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase.$anonfun$new$11$adapted(KafkaMicroBatchSourceSuite.scala:349)
> [info] at
> org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.callBatchWriter(ForeachBatchSink.scala:56)
> [info] at
> org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:49)
> [info] at
> org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.$anonfun$runBatch$19(MicroBatchExecution.scala:1063)
> [info] at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:176)
> [info] at
> org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:284)
> [info] at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:138)
> [info] at
> org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
> [info] at
> org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
> [info] at
> org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:106)
> [info] at
> org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
> [info] at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:138)
> [info] at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:307)
> [info] at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:137)
> [info] at
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
> [info] at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:91)
> [info] at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:249)
> [info] at
> org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.$anonfun$runBatch$18(MicroBatchExecution.scala:1054)
> [info] at
> org.apache.spark.sql.execution.streaming.runtime.ProgressContext.reportTimeTaken(ProgressReporter.scala:200)
> [info] at
> org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.runBatch(MicroBatchExecution.scala:1054)
> [info] at
> org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.$anonfun$executeOneBatch$2(MicroBatchExecution.scala:513)
> [info] at
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
> [info] at
> org.apache.spark.sql.execution.streaming.runtime.ProgressContext.reportTimeTaken(ProgressReporter.scala:200)
> [info] at
> org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.executeOneBatch(MicroBatchExecution.scala:478)
> [info] at
> org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:458)
> [info] at
> org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.$anonfun$runActivatedStream$1$adapted(MicroBatchExecution.scala:458)
> [info] at
> org.apache.spark.sql.execution.streaming.runtime.TriggerExecutor.runOneBatch(TriggerExecutor.scala:40)
> [info] at
> org.apache.spark.sql.execution.streaming.runtime.TriggerExecutor.runOneBatch$(TriggerExecutor.scala:38)
> [info] at
> org.apache.spark.sql.execution.streaming.runtime.MultiBatchExecutor.runOneBatch(TriggerExecutor.scala:60)
> [info] at
> org.apache.spark.sql.execution.streaming.runtime.MultiBatchExecutor.execute(TriggerExecutor.scala:65)
> [info] at
> org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:458)
> [info] at
> org.apache.spark.sql.execution.streaming.runtime.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:347)
> [info] at
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
> [info] at
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
> [info] at
> org.apache.spark.sql.execution.streaming.runtime.StreamExecution.org$apache$spark$sql$execution$streaming$runtime$StreamExecution$$runStream(StreamExecution.scala:307)
> [info] at
> org.apache.spark.sql.execution.streaming.runtime.StreamExecution$$anon$1.run(StreamExecution.scala:230)
> [info] at scala.Predef$.assert(Predef.scala:279)
> [info] at
> org.apache.spark.TestUtils$.assertExceptionMsg(TestUtils.scala:198)
> [info] at
> org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase.$anonfun$new$9(KafkaMicroBatchSourceSuite.scala:374)
> [info] at
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
> [info] at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
> [info] at
> org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
> [info] at
> org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
> [info] at
> org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
> [info] at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:68)
> [info] at
> org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:154)
> [info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
> [info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
> [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> [info] at org.scalatest.Transformer.apply(Transformer.scala:22)
> [info] at org.scalatest.Transformer.apply(Transformer.scala:20)
> [info] at
> org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
> [info] at
> org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:226)
> [info] at
> org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
> [info] at
> org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
> [info] at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
> [info] at
> org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
> [info] at
> org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
> [info] at
> org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:68)
> [info] at
> org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
> [info] at
> org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
> [info] at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:68)
> [info] at
> org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
> [info] at
> org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
> [info] at scala.collection.immutable.List.foreach(List.scala:323)
> [info] at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
> [info] at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
> [info] at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
> [info] at
> org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
> [info] at
> org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
> [info] at
> org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
> [info] at org.scalatest.Suite.run(Suite.scala:1114)
> [info] at org.scalatest.Suite.run$(Suite.scala:1096)
> [info] at
> org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
> [info] at
> org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
> [info] at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
> [info] at
> org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
> [info] at
> org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
> [info] at
> org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:68)
> [info] at
> org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
> [info] at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
> [info] at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
> [info] at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:68)
> [info] at
> org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
> [info] at
> org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
> [info] at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
> [info] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> [info] at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
> [info] at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
> [info] at java.base/java.lang.Thread.run(Thread.java:840) {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]