MaxGekk commented on code in PR #36704:
URL: https://github.com/apache/spark/pull/36704#discussion_r885944596


##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala:
##########
@@ -1376,6 +1376,13 @@ class KafkaMicroBatchV1SourceSuite extends 
KafkaMicroBatchSourceSuiteBase {
       classOf[KafkaSourceProvider].getCanonicalName)
   }
 
+  override def expectOffsetChange(): ExpectFailure[_] = {
+    ExpectFailure[IllegalStateException](e => {

Review Comment:
   The top level method which the exception comes from is `runStream()`:
   
https://github.com/apache/spark/blob/ea215279b0a4785d48723f5f24c96b8d7d9aa355/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L267
   
   ```java
   "stream execution thread for [id = 382fb843-79a4-42c0-be4c-4c4cb40e46a8, 
runId = 0d8b1a41-c8be-4337-99e8-f0f557c0acc1]@17862" daemon prio=5 tid=0xda 
nid=NA runnable
     java.lang.Thread.State: RUNNABLE
          at 
org.apache.spark.sql.kafka010.KafkaSource.reportDataLoss(KafkaSource.scala:346)
          at 
org.apache.spark.sql.kafka010.KafkaSource.$anonfun$getBatch$2(KafkaSource.scala:314)
          at 
org.apache.spark.sql.kafka010.KafkaSource.$anonfun$getBatch$2$adapted(KafkaSource.scala:314)
          at 
org.apache.spark.sql.kafka010.KafkaSource$$Lambda$3305.201433992.apply(Unknown 
Source:-1)
          at 
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$getOffsetRangesFromResolvedOffsets$6(KafkaOffsetReaderConsumer.scala:535)
          at 
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer$$Lambda$3311.430021093.apply(Unknown
 Source:-1)
          at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
          at 
scala.collection.TraversableLike$$Lambda$67.1881129850.apply(Unknown Source:-1)
          at scala.collection.Iterator.foreach(Iterator.scala:943)
          at scala.collection.Iterator.foreach$(Iterator.scala:943)
          at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
          at scala.collection.IterableLike.foreach(IterableLike.scala:74)
          at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
          at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
          at scala.collection.TraversableLike.map(TraversableLike.scala:286)
          at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
          at scala.collection.AbstractTraversable.map(Traversable.scala:108)
          at 
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.getOffsetRangesFromResolvedOffsets(KafkaOffsetReaderConsumer.scala:530)
          at 
org.apache.spark.sql.kafka010.KafkaSource.getBatch(KafkaSource.scala:314)
          at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$3(MicroBatchExecution.scala:548)
          at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$3300.1940457280.apply(Unknown
 Source:-1)
          at 
scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)
          at 
scala.collection.TraversableLike$$Lambda$272.176683244.apply(Unknown Source:-1)
          at scala.collection.Iterator.foreach(Iterator.scala:943)
          at scala.collection.Iterator.foreach$(Iterator.scala:943)
          at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
          at scala.collection.IterableLike.foreach(IterableLike.scala:74)
          at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
          at 
org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:27)
          at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
          at 
scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
          at 
org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:27)
          at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$2(MicroBatchExecution.scala:544)
          at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$3299.1391090515.apply(Unknown
 Source:-1)
          at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:384)
          at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:382)
          at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
          at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:544)
          at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:255)
          at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$3021.1812599301.apply$mcV$sp(Unknown
 Source:-1)
          at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
          at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:384)
          at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:382)
          at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
          at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:218)
          at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$3017.1559486070.apply$mcZ$sp(Unknown
 Source:-1)
          at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
          at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:212)
          at 
org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306)
          at 
org.apache.spark.sql.execution.streaming.StreamExecution$$Lambda$2960.133381164.apply$mcV$sp(Unknown
 Source:-1)
          at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
          at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
          at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284)
          at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to