[ https://issues.apache.org/jira/browse/SPARK-38046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jungtaek Lim resolved SPARK-38046. ---------------------------------- Fix Version/s: 3.3.0 3.2.2 Resolution: Fixed Issue resolved by pull request 35343 [https://github.com/apache/spark/pull/35343] > Fix KafkaSource/KafkaMicroBatch flaky test due to non-deterministic timing > -------------------------------------------------------------------------- > > Key: SPARK-38046 > URL: https://issues.apache.org/jira/browse/SPARK-38046 > Project: Spark > Issue Type: Improvement > Components: Tests > Affects Versions: 3.2.0 > Reporter: Boyang Jerry Peng > Assignee: Boyang Jerry Peng > Priority: Major > Fix For: 3.3.0, 3.2.2 > > > There is a test call "compositeReadLimit" > > [https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala#L460] > > that is flaky. The problem is because the Kakfa connector is always getting > the actual system time and not advancing it manually, thus leaving room for > non-deterministic behaviors especially since the source determines if > "maxTriggerDelayMs" is satisfied by comparing the last trigger time with the > current system time. One can simply "sleep" at points in the test to > generate different outcomes. > > Example output when test fails: > > {code:java} > - compositeReadLimit *** FAILED *** (7 seconds, 862 milliseconds) > == Results == > !== Correct Answer - 0 == == Spark Answer - 14 == > !struct<> struct<value:int> > ! [112] > ! [113] > ! [114] > ! [115] > ! [116] > ! [117] > ! [118] > ! [119] > ! [120] > ! [16] > ! [17] > ! [18] > ! [19] > ! [20] > > > == Progress == > > StartStream(ProcessingTimeTrigger(100),org.apache.spark.sql.streaming.util.StreamManualClock@30075210,Map(),null) > AssertOnQuery(<condition>, ) > CheckAnswer: > [1],[10],[100],[101],[102],[103],[104],[105],[106],[107],[11],[108],[109],[110],[111],[12],[13],[14],[15] > AdvanceManualClock(100) > AssertOnQuery(<condition>, ) > => CheckNewAnswer: > Assert(<condition>, ) > AdvanceManualClock(100) > AssertOnQuery(<condition>, ) > CheckAnswer: > [1],[10],[100],[101],[102],[103],[104],[105],[106],[107],[11],[108],[109],[110],[111],[112],[113],[114],[115],[116],[12],[117],[118],[119],[120],[121],[13],[14],[15],[16],[17],[18],[19],[2],[20],[21],[22],[23],[24] > AdvanceManualClock(100) > AssertOnQuery(<condition>, ) > CheckNewAnswer: > Assert(<condition>, ) > AdvanceManualClock(100) > AssertOnQuery(<condition>, ) > CheckAnswer: > [1],[10],[100],[101],[102],[103],[104],[105],[106],[107],[11],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[12],[120],[121],[122],[123],[124],[125],[126],[127],[128],[13],[14],[15],[16],[17],[18],[19],[2],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30] > > == Stream == > Output Mode: Append > Stream state: {KafkaSourceV1[Subscribe[topic-41]]: > {"topic-41":{"2":1,"1":11,"0":21}}} > Thread state: alive > Thread stack trace: java.lang.Object.wait(Native Method) > org.apache.spark.util.ManualClock.waitTillTime(ManualClock.scala:67) > > org.apache.spark.sql.streaming.util.StreamManualClock.waitTillTime(StreamManualClock.scala:34) > > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:76) > > org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:222) > > org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:350) > > org.apache.spark.sql.execution.streaming.StreamExecution$$Lambda$3081/1859014229.apply$mcV$sp(Unknown > Source) > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:857) > > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:325) > > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:252) > > > == Sink == > 0: [100] [101] [102] [103] [104] [105] [106] [107] [108] [109] [110] [111] > [1] [10] [11] [12] [13] [14] [15] > 1: [112] [113] [114] [115] [116] [117] [118] [119] [120] [16] [17] [18] > [19] [20] > > > == Plan == > == Parsed Logical Plan == > WriteToDataSourceV2 > org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@48d73c40 > +- SerializeFromObject [input[0, int, false] AS value#7455] > +- MapElements > org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$Lambda$6114/1322446561@4df69693, > class scala.Tuple2, [StructField(_1,StringType,true), > StructField(_2,StringType,true)], obj#7454: int > +- DeserializeToObject newInstance(class scala.Tuple2), obj#7453: > scala.Tuple2 > +- Project [cast(key#7429 as string) AS key#7443, cast(value#7430 > as string) AS value#7444] > +- Project [key#7501 AS key#7429, value#7502 AS value#7430, > topic#7503 AS topic#7431, partition#7504 AS partition#7432, offset#7505L AS > offset#7433L, timestamp#7506 AS timestamp#7434, timestampType#7507 AS > timestampType#7435] > +- LogicalRDD [key#7501, value#7502, topic#7503, > partition#7504, offset#7505L, timestamp#7506, timestampType#7507], true > > == Analyzed Logical Plan == > WriteToDataSourceV2 > org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@48d73c40 > +- SerializeFromObject [input[0, int, false] AS value#7455] > +- MapElements > org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$Lambda$6114/1322446561@4df69693, > class scala.Tuple2, [StructField(_1,StringType,true), > StructField(_2,StringType,true)], obj#7454: int > +- DeserializeToObject newInstance(class scala.Tuple2), obj#7453: > scala.Tuple2 > +- Project [cast(key#7429 as string) AS key#7443, cast(value#7430 > as string) AS value#7444] > +- Project [key#7501 AS key#7429, value#7502 AS value#7430, > topic#7503 AS topic#7431, partition#7504 AS partition#7432, offset#7505L AS > offset#7433L, timestamp#7506 AS timestamp#7434, timestampType#7507 AS > timestampType#7435] > +- LogicalRDD [key#7501, value#7502, topic#7503, > partition#7504, offset#7505L, timestamp#7506, timestampType#7507], true > > == Optimized Logical Plan == > WriteToDataSourceV2 > org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@48d73c40 > +- SerializeFromObject [input[0, int, false] AS value#7455] > +- MapElements > org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$Lambda$6114/1322446561@4df69693, > class scala.Tuple2, [StructField(_1,StringType,true), > StructField(_2,StringType,true)], obj#7454: int > +- DeserializeToObject newInstance(class scala.Tuple2), obj#7453: > scala.Tuple2 > +- Project [cast(key#7501 as string) AS key#7443, cast(value#7502 > as string) AS value#7444] > +- LogicalRDD [key#7501, value#7502, topic#7503, > partition#7504, offset#7505L, timestamp#7506, timestampType#7507], true > > == Physical Plan == > WriteToDataSourceV2 > org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@48d73c40, > org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$3704/659759030@5d8e6616 > +- *(1) SerializeFromObject [input[0, int, false] AS value#7455] > +- *(1) MapElements > org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$Lambda$6114/1322446561@4df69693, > obj#7454: int > +- *(1) DeserializeToObject newInstance(class scala.Tuple2), > obj#7453: scala.Tuple2 > +- *(1) Project [cast(key#7501 as string) AS key#7443, > cast(value#7502 as string) AS value#7444] > +- *(1) Scan ExistingRDD > kafka[key#7501,value#7502,topic#7503,partition#7504,offset#7505L,timestamp#7506,timestampType#7507] > (StreamTest.scala:466) > org.scalatest.exceptions.TestFailedException: > at > org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) > at > org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) > at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560) > at org.scalatest.Assertions.fail(Assertions.scala:1091) > at org.scalatest.Assertions.fail$(Assertions.scala:1087) > at org.scalatest.FunSuite.fail(FunSuite.scala:1560) > at > org.apache.spark.sql.streaming.StreamTest.failTest$1(StreamTest.scala:466) > at org.apache.spark.sql.streamin2022-01-07 10:25:45,899 INFO logger - > [Controller id=0 epoch=1] Changed partition topic-42-0 from NewPartition to > OnlinePartition with state LeaderAndIsr(leader=0, leaderEpoch=0, isr=List(0), > zkVersion=0){code} -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org