[ 
https://issues.apache.org/jira/browse/SPARK-30208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16994288#comment-16994288
 ] 

Jungtaek Lim commented on SPARK-30208:
--------------------------------------

Please correct me if I'm missing anything here.

I'm not looking into details of SPARK-22340, but the sentence here ("reading 
from Kafka is actually happening in a separate writer thread rather that the 
task thread") seems to be still valid with SPARK-22340.

Please refer KafkaSourceRDD; when we release Kafka consumer, we assume that 
there's no further usage of consumer, as iterator has been exhausted or task 
completion listener has been called. PythonRunner may break the assumption as 
it also interrupts the writer thread upon task completion callback. (It doesn't 
even join the writer thread which might be another possible problem, but let's 
assume interruption takes effect immediately for now, for simplicity.)

So depending on the orders of registration, callback being registered in 
KafkaSourceRDD may be called earlier than callback being registered in 
PythonRunner, and then task thread will try to release consumer where writer 
thread in PythonRunner still doesn't indicate whether task is completed. 

 

> A race condition when reading from Kafka in PySpark
> ---------------------------------------------------
>
>                 Key: SPARK-30208
>                 URL: https://issues.apache.org/jira/browse/SPARK-30208
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.4
>            Reporter: Jiawen Zhu
>            Priority: Major
>
> When using PySpark to read from Kafka, there is a race condition that Spark 
> may use KafkaConsumer in multiple threads at the same time and throw the 
> following error:
> {code}
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
> multi-threaded access
>         at 
> kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2215)
>         at 
> kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2104)
>         at 
> kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2059)
>         at 
> org.apache.spark.sql.kafka010.InternalKafkaConsumer.close(KafkaDataConsumer.scala:451)
>         at 
> org.apache.spark.sql.kafka010.KafkaDataConsumer$NonCachedKafkaDataConsumer.release(KafkaDataConsumer.scala:508)
>         at 
> org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.close(KafkaSourceRDD.scala:126)
>         at 
> org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:66)
>         at 
> org.apache.spark.sql.kafka010.KafkaSourceRDD$$anonfun$compute$3.apply(KafkaSourceRDD.scala:131)
>         at 
> org.apache.spark.sql.kafka010.KafkaSourceRDD$$anonfun$compute$3.apply(KafkaSourceRDD.scala:130)
>         at 
> org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:162)
>         at 
> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:131)
>         at 
> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:131)
>         at 
> org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:144)
>         at 
> org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:142)
>         at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>         at 
> org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:142)
>         at 
> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:130)
>         at org.apache.spark.scheduler.Task.doRunTask(Task.scala:155)
>         at org.apache.spark.scheduler.Task.run(Task.scala:112)
>         at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
>         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> {code}
> When using PySpark, reading from Kafka is actually happening in a separate 
> writer thread rather that the task thread.  When a task is early terminated 
> (e.g., there is a limit operator), the task thread may stop the KafkaConsumer 
> when the writer thread is using it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to