[ 
https://issues.apache.org/jira/browse/SPARK-38046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim reassigned SPARK-38046:
------------------------------------

    Assignee: Boyang Jerry Peng

> Fix KafkaSource/KafkaMicroBatch flaky test due to non-deterministic timing
> --------------------------------------------------------------------------
>
>                 Key: SPARK-38046
>                 URL: https://issues.apache.org/jira/browse/SPARK-38046
>             Project: Spark
>          Issue Type: Improvement
>          Components: Tests
>    Affects Versions: 3.2.0
>            Reporter: Boyang Jerry Peng
>            Assignee: Boyang Jerry Peng
>            Priority: Major
>
> There is a test call "compositeReadLimit"
>  
> [https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala#L460]
>  
> that is flaky.  The problem is because the Kakfa connector is always getting 
> the actual system time and not advancing it manually, thus leaving room for 
> non-deterministic behaviors especially since the source determines if 
> "maxTriggerDelayMs" is satisfied by comparing the last trigger time with the 
> current system time.  One can simply "sleep" at points in the test to 
> generate different outcomes.
>  
> Example output when test fails:
>  
> {code:java}
> - compositeReadLimit *** FAILED *** (7 seconds, 862 milliseconds)
>   == Results ==
>   !== Correct Answer - 0 ==   == Spark Answer - 14 ==
>   !struct<>                   struct<value:int>
>   !                           [112]
>   !                           [113]
>   !                           [114]
>   !                           [115]
>   !                           [116]
>   !                           [117]
>   !                           [118]
>   !                           [119]
>   !                           [120]
>   !                           [16]
>   !                           [17]
>   !                           [18]
>   !                           [19]
>   !                           [20]
>       
>   
>   == Progress ==
>      
> StartStream(ProcessingTimeTrigger(100),org.apache.spark.sql.streaming.util.StreamManualClock@30075210,Map(),null)
>      AssertOnQuery(<condition>, )
>      CheckAnswer: 
> [1],[10],[100],[101],[102],[103],[104],[105],[106],[107],[11],[108],[109],[110],[111],[12],[13],[14],[15]
>      AdvanceManualClock(100)
>      AssertOnQuery(<condition>, )
>   => CheckNewAnswer: 
>      Assert(<condition>, )
>      AdvanceManualClock(100)
>      AssertOnQuery(<condition>, )
>      CheckAnswer: 
> [1],[10],[100],[101],[102],[103],[104],[105],[106],[107],[11],[108],[109],[110],[111],[112],[113],[114],[115],[116],[12],[117],[118],[119],[120],[121],[13],[14],[15],[16],[17],[18],[19],[2],[20],[21],[22],[23],[24]
>      AdvanceManualClock(100)
>      AssertOnQuery(<condition>, )
>      CheckNewAnswer: 
>      Assert(<condition>, )
>      AdvanceManualClock(100)
>      AssertOnQuery(<condition>, )
>      CheckAnswer: 
> [1],[10],[100],[101],[102],[103],[104],[105],[106],[107],[11],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[12],[120],[121],[122],[123],[124],[125],[126],[127],[128],[13],[14],[15],[16],[17],[18],[19],[2],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30]
>   
>   == Stream ==
>   Output Mode: Append
>   Stream state: {KafkaSourceV1[Subscribe[topic-41]]: 
> {"topic-41":{"2":1,"1":11,"0":21}}}
>   Thread state: alive
>   Thread stack trace: java.lang.Object.wait(Native Method)
>   org.apache.spark.util.ManualClock.waitTillTime(ManualClock.scala:67)
>   
> org.apache.spark.sql.streaming.util.StreamManualClock.waitTillTime(StreamManualClock.scala:34)
>   
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:76)
>   
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:222)
>   
> org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:350)
>   
> org.apache.spark.sql.execution.streaming.StreamExecution$$Lambda$3081/1859014229.apply$mcV$sp(Unknown
>  Source)
>   scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>   org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:857)
>   
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:325)
>   
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:252)
>   
>   
>   == Sink ==
>   0: [100] [101] [102] [103] [104] [105] [106] [107] [108] [109] [110] [111] 
> [1] [10] [11] [12] [13] [14] [15]
>   1: [112] [113] [114] [115] [116] [117] [118] [119] [120] [16] [17] [18] 
> [19] [20]
>   
>   
>   == Plan ==
>   == Parsed Logical Plan ==
>   WriteToDataSourceV2 
> org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@48d73c40
>   +- SerializeFromObject [input[0, int, false] AS value#7455]
>      +- MapElements 
> org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$Lambda$6114/1322446561@4df69693,
>  class scala.Tuple2, [StructField(_1,StringType,true), 
> StructField(_2,StringType,true)], obj#7454: int
>         +- DeserializeToObject newInstance(class scala.Tuple2), obj#7453: 
> scala.Tuple2
>            +- Project [cast(key#7429 as string) AS key#7443, cast(value#7430 
> as string) AS value#7444]
>               +- Project [key#7501 AS key#7429, value#7502 AS value#7430, 
> topic#7503 AS topic#7431, partition#7504 AS partition#7432, offset#7505L AS 
> offset#7433L, timestamp#7506 AS timestamp#7434, timestampType#7507 AS 
> timestampType#7435]
>                  +- LogicalRDD [key#7501, value#7502, topic#7503, 
> partition#7504, offset#7505L, timestamp#7506, timestampType#7507], true
>   
>   == Analyzed Logical Plan ==
>   WriteToDataSourceV2 
> org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@48d73c40
>   +- SerializeFromObject [input[0, int, false] AS value#7455]
>      +- MapElements 
> org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$Lambda$6114/1322446561@4df69693,
>  class scala.Tuple2, [StructField(_1,StringType,true), 
> StructField(_2,StringType,true)], obj#7454: int
>         +- DeserializeToObject newInstance(class scala.Tuple2), obj#7453: 
> scala.Tuple2
>            +- Project [cast(key#7429 as string) AS key#7443, cast(value#7430 
> as string) AS value#7444]
>               +- Project [key#7501 AS key#7429, value#7502 AS value#7430, 
> topic#7503 AS topic#7431, partition#7504 AS partition#7432, offset#7505L AS 
> offset#7433L, timestamp#7506 AS timestamp#7434, timestampType#7507 AS 
> timestampType#7435]
>                  +- LogicalRDD [key#7501, value#7502, topic#7503, 
> partition#7504, offset#7505L, timestamp#7506, timestampType#7507], true
>   
>   == Optimized Logical Plan ==
>   WriteToDataSourceV2 
> org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@48d73c40
>   +- SerializeFromObject [input[0, int, false] AS value#7455]
>      +- MapElements 
> org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$Lambda$6114/1322446561@4df69693,
>  class scala.Tuple2, [StructField(_1,StringType,true), 
> StructField(_2,StringType,true)], obj#7454: int
>         +- DeserializeToObject newInstance(class scala.Tuple2), obj#7453: 
> scala.Tuple2
>            +- Project [cast(key#7501 as string) AS key#7443, cast(value#7502 
> as string) AS value#7444]
>               +- LogicalRDD [key#7501, value#7502, topic#7503, 
> partition#7504, offset#7505L, timestamp#7506, timestampType#7507], true
>   
>   == Physical Plan ==
>   WriteToDataSourceV2 
> org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@48d73c40, 
> org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$3704/659759030@5d8e6616
>   +- *(1) SerializeFromObject [input[0, int, false] AS value#7455]
>      +- *(1) MapElements 
> org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$Lambda$6114/1322446561@4df69693,
>  obj#7454: int
>         +- *(1) DeserializeToObject newInstance(class scala.Tuple2), 
> obj#7453: scala.Tuple2
>            +- *(1) Project [cast(key#7501 as string) AS key#7443, 
> cast(value#7502 as string) AS value#7444]
>               +- *(1) Scan ExistingRDD 
> kafka[key#7501,value#7502,topic#7503,partition#7504,offset#7505L,timestamp#7506,timestampType#7507]
>  (StreamTest.scala:466)
>   org.scalatest.exceptions.TestFailedException:
>   at 
> org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530)
>   at 
> org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529)
>   at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
>   at org.scalatest.Assertions.fail(Assertions.scala:1091)
>   at org.scalatest.Assertions.fail$(Assertions.scala:1087)
>   at org.scalatest.FunSuite.fail(FunSuite.scala:1560)
>   at 
> org.apache.spark.sql.streaming.StreamTest.failTest$1(StreamTest.scala:466)
>   at org.apache.spark.sql.streamin2022-01-07 10:25:45,899 INFO  logger - 
> [Controller id=0 epoch=1] Changed partition topic-42-0 from NewPartition to 
> OnlinePartition with state LeaderAndIsr(leader=0, leaderEpoch=0, isr=List(0), 
> zkVersion=0){code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to