[ https://issues.apache.org/jira/browse/SPARK-30208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16995532#comment-16995532 ]
Jungtaek Lim commented on SPARK-30208: -------------------------------------- I've just tested it simply with additional logging: {code:java} from pyspark.sql import SparkSession spark = SparkSession.builder.appName("TaskCompletionListenerTesting").getOrCreate() df = spark \ .read \ .format("kafka") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("subscribe", "topic1") \ .load() def f(rows): for row in rows: print(row.key) df.foreachPartition(f) {code} and no, KafkaRDD registers earlier than PythonRunner which would mean callback from PythonRunner will be called earlier. It sounds natural as KafkaRDD is a data source hence should be placed first. (I can't imagine the other case) So my guess seems wrong; there's another slightly possible case - complete callback of PythonRunner doesn't even join the writer thread - but given this is about race-condition so I'm not 100% sure. > A race condition when reading from Kafka in PySpark > --------------------------------------------------- > > Key: SPARK-30208 > URL: https://issues.apache.org/jira/browse/SPARK-30208 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.4.4 > Reporter: Jiawen Zhu > Priority: Major > > When using PySpark to read from Kafka, there is a race condition that Spark > may use KafkaConsumer in multiple threads at the same time and throw the > following error: > {code} > java.util.ConcurrentModificationException: KafkaConsumer is not safe for > multi-threaded access > at > kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2215) > at > kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2104) > at > kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2059) > at > org.apache.spark.sql.kafka010.InternalKafkaConsumer.close(KafkaDataConsumer.scala:451) > at > org.apache.spark.sql.kafka010.KafkaDataConsumer$NonCachedKafkaDataConsumer.release(KafkaDataConsumer.scala:508) > at > org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.close(KafkaSourceRDD.scala:126) > at > org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:66) > at > org.apache.spark.sql.kafka010.KafkaSourceRDD$$anonfun$compute$3.apply(KafkaSourceRDD.scala:131) > at > org.apache.spark.sql.kafka010.KafkaSourceRDD$$anonfun$compute$3.apply(KafkaSourceRDD.scala:130) > at > org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:162) > at > org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:131) > at > org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:131) > at > org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:144) > at > org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:142) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:142) > at > org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:130) > at org.apache.spark.scheduler.Task.doRunTask(Task.scala:155) > at org.apache.spark.scheduler.Task.run(Task.scala:112) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {code} > When using PySpark, reading from Kafka is actually happening in a separate > writer thread rather that the task thread. When a task is early terminated > (e.g., there is a limit operator), the task thread may stop the KafkaConsumer > when the writer thread is using it. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org