[jira] [Commented] (SPARK-30208) A race condition when reading from Kafka in PySpark

2019-12-11 Thread Hyukjin Kwon (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16994105#comment-16994105
 ] 

Hyukjin Kwon commented on SPARK-30208:
--

Possibly it's a duplicate of SPARK-22340

> A race condition when reading from Kafka in PySpark
> ---
>
> Key: SPARK-30208
> URL: https://issues.apache.org/jira/browse/SPARK-30208
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.4
>Reporter: Jiawen Zhu
>Priority: Major
>
> When using PySpark to read from Kafka, there is a race condition that Spark 
> may use KafkaConsumer in multiple threads at the same time and throw the 
> following error:
> {code}
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
> multi-threaded access
> at 
> kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2215)
> at 
> kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2104)
> at 
> kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2059)
> at 
> org.apache.spark.sql.kafka010.InternalKafkaConsumer.close(KafkaDataConsumer.scala:451)
> at 
> org.apache.spark.sql.kafka010.KafkaDataConsumer$NonCachedKafkaDataConsumer.release(KafkaDataConsumer.scala:508)
> at 
> org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.close(KafkaSourceRDD.scala:126)
> at 
> org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:66)
> at 
> org.apache.spark.sql.kafka010.KafkaSourceRDD$$anonfun$compute$3.apply(KafkaSourceRDD.scala:131)
> at 
> org.apache.spark.sql.kafka010.KafkaSourceRDD$$anonfun$compute$3.apply(KafkaSourceRDD.scala:130)
> at 
> org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:162)
> at 
> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:131)
> at 
> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:131)
> at 
> org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:144)
> at 
> org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:142)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:142)
> at 
> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:130)
> at org.apache.spark.scheduler.Task.doRunTask(Task.scala:155)
> at org.apache.spark.scheduler.Task.run(Task.scala:112)
> at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> When using PySpark, reading from Kafka is actually happening in a separate 
> writer thread rather that the task thread.  When a task is early terminated 
> (e.g., there is a limit operator), the task thread may stop the KafkaConsumer 
> when the writer thread is using it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-30208) A race condition when reading from Kafka in PySpark

2019-12-11 Thread Jungtaek Lim (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16994288#comment-16994288
 ] 

Jungtaek Lim commented on SPARK-30208:
--

Please correct me if I'm missing anything here.

I'm not looking into details of SPARK-22340, but the sentence here ("reading 
from Kafka is actually happening in a separate writer thread rather that the 
task thread") seems to be still valid with SPARK-22340.

Please refer KafkaSourceRDD; when we release Kafka consumer, we assume that 
there's no further usage of consumer, as iterator has been exhausted or task 
completion listener has been called. PythonRunner may break the assumption as 
it also interrupts the writer thread upon task completion callback. (It doesn't 
even join the writer thread which might be another possible problem, but let's 
assume interruption takes effect immediately for now, for simplicity.)

So depending on the orders of registration, callback being registered in 
KafkaSourceRDD may be called earlier than callback being registered in 
PythonRunner, and then task thread will try to release consumer where writer 
thread in PythonRunner still doesn't indicate whether task is completed. 

 

> A race condition when reading from Kafka in PySpark
> ---
>
> Key: SPARK-30208
> URL: https://issues.apache.org/jira/browse/SPARK-30208
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.4
>Reporter: Jiawen Zhu
>Priority: Major
>
> When using PySpark to read from Kafka, there is a race condition that Spark 
> may use KafkaConsumer in multiple threads at the same time and throw the 
> following error:
> {code}
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
> multi-threaded access
> at 
> kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2215)
> at 
> kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2104)
> at 
> kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2059)
> at 
> org.apache.spark.sql.kafka010.InternalKafkaConsumer.close(KafkaDataConsumer.scala:451)
> at 
> org.apache.spark.sql.kafka010.KafkaDataConsumer$NonCachedKafkaDataConsumer.release(KafkaDataConsumer.scala:508)
> at 
> org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.close(KafkaSourceRDD.scala:126)
> at 
> org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:66)
> at 
> org.apache.spark.sql.kafka010.KafkaSourceRDD$$anonfun$compute$3.apply(KafkaSourceRDD.scala:131)
> at 
> org.apache.spark.sql.kafka010.KafkaSourceRDD$$anonfun$compute$3.apply(KafkaSourceRDD.scala:130)
> at 
> org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:162)
> at 
> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:131)
> at 
> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:131)
> at 
> org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:144)
> at 
> org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:142)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:142)
> at 
> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:130)
> at org.apache.spark.scheduler.Task.doRunTask(Task.scala:155)
> at org.apache.spark.scheduler.Task.run(Task.scala:112)
> at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> When using PySpark, reading from Kafka is actually happening in a separate 
> writer thread rather that the task thread.  When a task is early terminated 
> (e.g., there is a limit operator), the task thread may stop the KafkaConsumer 
> when the writer thread is using it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: i

[jira] [Commented] (SPARK-30208) A race condition when reading from Kafka in PySpark

2019-12-13 Thread Jungtaek Lim (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16995532#comment-16995532
 ] 

Jungtaek Lim commented on SPARK-30208:
--

I've just tested it simply with additional logging:
{code:java}
from pyspark.sql import SparkSession

spark = 
SparkSession.builder.appName("TaskCompletionListenerTesting").getOrCreate()

df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "topic1") \
  .load()

def f(rows):
  for row in rows:
print(row.key)

df.foreachPartition(f) {code}
and no, KafkaRDD registers earlier than PythonRunner which would mean callback 
from PythonRunner will be called earlier. It sounds natural as KafkaRDD is a 
data source hence should be placed first. (I can't imagine the other case)

So my guess seems wrong; there's another slightly possible case - complete 
callback of PythonRunner doesn't even join the writer thread - but given this 
is about race-condition so I'm not 100% sure.

> A race condition when reading from Kafka in PySpark
> ---
>
> Key: SPARK-30208
> URL: https://issues.apache.org/jira/browse/SPARK-30208
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.4
>Reporter: Jiawen Zhu
>Priority: Major
>
> When using PySpark to read from Kafka, there is a race condition that Spark 
> may use KafkaConsumer in multiple threads at the same time and throw the 
> following error:
> {code}
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
> multi-threaded access
> at 
> kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2215)
> at 
> kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2104)
> at 
> kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2059)
> at 
> org.apache.spark.sql.kafka010.InternalKafkaConsumer.close(KafkaDataConsumer.scala:451)
> at 
> org.apache.spark.sql.kafka010.KafkaDataConsumer$NonCachedKafkaDataConsumer.release(KafkaDataConsumer.scala:508)
> at 
> org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.close(KafkaSourceRDD.scala:126)
> at 
> org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:66)
> at 
> org.apache.spark.sql.kafka010.KafkaSourceRDD$$anonfun$compute$3.apply(KafkaSourceRDD.scala:131)
> at 
> org.apache.spark.sql.kafka010.KafkaSourceRDD$$anonfun$compute$3.apply(KafkaSourceRDD.scala:130)
> at 
> org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:162)
> at 
> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:131)
> at 
> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:131)
> at 
> org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:144)
> at 
> org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:142)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:142)
> at 
> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:130)
> at org.apache.spark.scheduler.Task.doRunTask(Task.scala:155)
> at org.apache.spark.scheduler.Task.run(Task.scala:112)
> at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> When using PySpark, reading from Kafka is actually happening in a separate 
> writer thread rather that the task thread.  When a task is early terminated 
> (e.g., there is a limit operator), the task thread may stop the KafkaConsumer 
> when the writer thread is using it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org