[ 
https://issues.apache.org/jira/browse/SPARK-30208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16995532#comment-16995532
 ] 

Jungtaek Lim commented on SPARK-30208:
--------------------------------------

I've just tested it simply with additional logging:
{code:java}
from pyspark.sql import SparkSession

spark = 
SparkSession.builder.appName("TaskCompletionListenerTesting").getOrCreate()

df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "topic1") \
  .load()

def f(rows):
  for row in rows:
    print(row.key)

df.foreachPartition(f) {code}
and no, KafkaRDD registers earlier than PythonRunner which would mean callback 
from PythonRunner will be called earlier. It sounds natural as KafkaRDD is a 
data source hence should be placed first. (I can't imagine the other case)

So my guess seems wrong; there's another slightly possible case - complete 
callback of PythonRunner doesn't even join the writer thread - but given this 
is about race-condition so I'm not 100% sure.

> A race condition when reading from Kafka in PySpark
> ---------------------------------------------------
>
>                 Key: SPARK-30208
>                 URL: https://issues.apache.org/jira/browse/SPARK-30208
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.4
>            Reporter: Jiawen Zhu
>            Priority: Major
>
> When using PySpark to read from Kafka, there is a race condition that Spark 
> may use KafkaConsumer in multiple threads at the same time and throw the 
> following error:
> {code}
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
> multi-threaded access
>         at 
> kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2215)
>         at 
> kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2104)
>         at 
> kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2059)
>         at 
> org.apache.spark.sql.kafka010.InternalKafkaConsumer.close(KafkaDataConsumer.scala:451)
>         at 
> org.apache.spark.sql.kafka010.KafkaDataConsumer$NonCachedKafkaDataConsumer.release(KafkaDataConsumer.scala:508)
>         at 
> org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.close(KafkaSourceRDD.scala:126)
>         at 
> org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:66)
>         at 
> org.apache.spark.sql.kafka010.KafkaSourceRDD$$anonfun$compute$3.apply(KafkaSourceRDD.scala:131)
>         at 
> org.apache.spark.sql.kafka010.KafkaSourceRDD$$anonfun$compute$3.apply(KafkaSourceRDD.scala:130)
>         at 
> org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:162)
>         at 
> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:131)
>         at 
> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:131)
>         at 
> org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:144)
>         at 
> org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:142)
>         at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>         at 
> org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:142)
>         at 
> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:130)
>         at org.apache.spark.scheduler.Task.doRunTask(Task.scala:155)
>         at org.apache.spark.scheduler.Task.run(Task.scala:112)
>         at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
>         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> {code}
> When using PySpark, reading from Kafka is actually happening in a separate 
> writer thread rather that the task thread.  When a task is early terminated 
> (e.g., there is a limit operator), the task thread may stop the KafkaConsumer 
> when the writer thread is using it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to