[ https://issues.apache.org/jira/browse/SPARK-30208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16994288#comment-16994288 ]
Jungtaek Lim commented on SPARK-30208: -------------------------------------- Please correct me if I'm missing anything here. I'm not looking into details of SPARK-22340, but the sentence here ("reading from Kafka is actually happening in a separate writer thread rather that the task thread") seems to be still valid with SPARK-22340. Please refer KafkaSourceRDD; when we release Kafka consumer, we assume that there's no further usage of consumer, as iterator has been exhausted or task completion listener has been called. PythonRunner may break the assumption as it also interrupts the writer thread upon task completion callback. (It doesn't even join the writer thread which might be another possible problem, but let's assume interruption takes effect immediately for now, for simplicity.) So depending on the orders of registration, callback being registered in KafkaSourceRDD may be called earlier than callback being registered in PythonRunner, and then task thread will try to release consumer where writer thread in PythonRunner still doesn't indicate whether task is completed. > A race condition when reading from Kafka in PySpark > --------------------------------------------------- > > Key: SPARK-30208 > URL: https://issues.apache.org/jira/browse/SPARK-30208 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.4.4 > Reporter: Jiawen Zhu > Priority: Major > > When using PySpark to read from Kafka, there is a race condition that Spark > may use KafkaConsumer in multiple threads at the same time and throw the > following error: > {code} > java.util.ConcurrentModificationException: KafkaConsumer is not safe for > multi-threaded access > at > kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2215) > at > kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2104) > at > kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2059) > at > org.apache.spark.sql.kafka010.InternalKafkaConsumer.close(KafkaDataConsumer.scala:451) > at > org.apache.spark.sql.kafka010.KafkaDataConsumer$NonCachedKafkaDataConsumer.release(KafkaDataConsumer.scala:508) > at > org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.close(KafkaSourceRDD.scala:126) > at > org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:66) > at > org.apache.spark.sql.kafka010.KafkaSourceRDD$$anonfun$compute$3.apply(KafkaSourceRDD.scala:131) > at > org.apache.spark.sql.kafka010.KafkaSourceRDD$$anonfun$compute$3.apply(KafkaSourceRDD.scala:130) > at > org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:162) > at > org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:131) > at > org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:131) > at > org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:144) > at > org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:142) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:142) > at > org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:130) > at org.apache.spark.scheduler.Task.doRunTask(Task.scala:155) > at org.apache.spark.scheduler.Task.run(Task.scala:112) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {code} > When using PySpark, reading from Kafka is actually happening in a separate > writer thread rather that the task thread. When a task is early terminated > (e.g., there is a limit operator), the task thread may stop the KafkaConsumer > when the writer thread is using it. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org