MaxGekk commented on code in PR #36704: URL: https://github.com/apache/spark/pull/36704#discussion_r885944596
########## connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala: ########## @@ -1376,6 +1376,13 @@ class KafkaMicroBatchV1SourceSuite extends KafkaMicroBatchSourceSuiteBase { classOf[KafkaSourceProvider].getCanonicalName) } + override def expectOffsetChange(): ExpectFailure[_] = { + ExpectFailure[IllegalStateException](e => { Review Comment: The top level method which the exception comes from is `runStream()`: https://github.com/apache/spark/blob/ea215279b0a4785d48723f5f24c96b8d7d9aa355/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L267 ```java "stream execution thread for [id = 382fb843-79a4-42c0-be4c-4c4cb40e46a8, runId = 0d8b1a41-c8be-4337-99e8-f0f557c0acc1]@17862" daemon prio=5 tid=0xda nid=NA runnable java.lang.Thread.State: RUNNABLE at org.apache.spark.sql.kafka010.KafkaSource.reportDataLoss(KafkaSource.scala:346) at org.apache.spark.sql.kafka010.KafkaSource.$anonfun$getBatch$2(KafkaSource.scala:314) at org.apache.spark.sql.kafka010.KafkaSource.$anonfun$getBatch$2$adapted(KafkaSource.scala:314) at org.apache.spark.sql.kafka010.KafkaSource$$Lambda$3305.201433992.apply(Unknown Source:-1) at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$getOffsetRangesFromResolvedOffsets$6(KafkaOffsetReaderConsumer.scala:535) at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer$$Lambda$3311.430021093.apply(Unknown Source:-1) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.TraversableLike$$Lambda$67.1881129850.apply(Unknown Source:-1) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.getOffsetRangesFromResolvedOffsets(KafkaOffsetReaderConsumer.scala:530) at org.apache.spark.sql.kafka010.KafkaSource.getBatch(KafkaSource.scala:314) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$3(MicroBatchExecution.scala:548) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$3300.1940457280.apply(Unknown Source:-1) at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293) at scala.collection.TraversableLike$$Lambda$272.176683244.apply(Unknown Source:-1) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:27) at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293) at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290) at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:27) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$2(MicroBatchExecution.scala:544) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$3299.1391090515.apply(Unknown Source:-1) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:384) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:382) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:544) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:255) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$3021.1812599301.apply$mcV$sp(Unknown Source:-1) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:384) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:382) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:218) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$3017.1559486070.apply$mcZ$sp(Unknown Source:-1) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:212) at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306) at org.apache.spark.sql.execution.streaming.StreamExecution$$Lambda$2960.133381164.apply$mcV$sp(Unknown Source:-1) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org