[ 
https://issues.apache.org/jira/browse/SPARK-23526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16381349#comment-16381349
 ] 

Gabor Somogyi commented on SPARK-23526:
---------------------------------------

Reminds meĀ [this|https://github.com/apache/spark/pull/18234].
If nobody started I would like to work on it.


> KafkaMicroBatchV2SourceSuite.ensure stream-stream self-join generates only 
> one offset in offset log
> ---------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-23526
>                 URL: https://issues.apache.org/jira/browse/SPARK-23526
>             Project: Spark
>          Issue Type: Test
>          Components: Structured Streaming
>    Affects Versions: 2.3.0
>            Reporter: Wenchen Fan
>            Priority: Major
>              Labels: flaky-test
>
> See it failed in PR builder with error message:
> {code:java}
> sbt.ForkMain$ForkError: 
> org.apache.spark.sql.streaming.StreamingQueryException: Query [id = 
> 676a8b08-c89b-450b-8cd8-fbf9868cd240, runId = 
> 46bb7aae-138b-420d-9b4f-44f42a2a4a0f] terminated with exception: Job aborted 
> due to stage failure: Task 0 in stage 163.0 failed 1 times, most recent 
> failure: Lost task 0.0 in stage 163.0 (TID 799, localhost, executor driver): 
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
> multi-threaded access at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132) 
> at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:305)
>  at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:216)
>  at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:122)
>  at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106)
>  at 
> org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
>  at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68)
>  at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106)
>  at 
> org.apache.spark.sql.kafka010.KafkaMicroBatchDataReader.next(KafkaMicroBatchReader.scala:353)
>  at 
> org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:49)
>  at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>  at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown
>  Source) at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>  at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:616)
>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>  at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at 
> org.apache.spark.scheduler.Task.run(Task.scala:109) at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to