[ 
https://issues.apache.org/jira/browse/SPARK-21453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16771709#comment-16771709
 ] 

Jungtaek Lim commented on SPARK-21453:
--------------------------------------

[~ppanero] [~Julescs0]
I guess it's too late, but I've missed this issue when I crafted my patch on 
consumer pool renewal - it leverages considered-as-stable implementation of 
pool (Apache Commons Pool) instead of leveraging its own implementation.
https://github.com/apache/spark/pull/22138

We haven't track down the issue so can't say my patch would mitigate the issue, 
but I found suspicious cases which current implementation could leak (never 
release) the idle consumers so I might say worth to give it a try if you're 
having pain and still haven't resolve the issue.

This would give a nice rationalization on my patch to get it reviewed soon and 
merged if it succeeds to resolve this issue.

Some information would be needed.

1) Which Spark version you're having difficulty with? This is to port back the 
patch to the version. (For now the patch is against master branch) You may want 
to note that Spark 2.2.x looks EOL (2.3.x also in this year) in Apache 
community so may not get support for 2.2.x. 

2) Is it easily reproducible? I mean you may be hesitant to apply 
not-yet-merged patch to your production, so it would be even better if we can 
reproduce it easily in stage env., or even locally.

Thanks in advance!

> Cached Kafka consumer may be closed too early
> ---------------------------------------------
>
>                 Key: SPARK-21453
>                 URL: https://issues.apache.org/jira/browse/SPARK-21453
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.2.0
>         Environment: Spark 2.2.0 and kafka 0.10.2.0
>            Reporter: Pablo Panero
>            Priority: Minor
>
> On a streaming job using built-in kafka source and sink (over SSL), with  I 
> am getting the following exception:
> Config of the source:
> {code:java}
> val df = spark.readStream
>       .format("kafka")
>       .option("kafka.bootstrap.servers", config.bootstrapServers)
>       .option("failOnDataLoss", value = false)
>       .option("kafka.connections.max.idle.ms", 3600000)
>       //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
>       .option("kafka.security.protocol", "SASL_SSL")
>       .option("kafka.sasl.mechanism", "GSSAPI")
>       .option("kafka.sasl.kerberos.service.name", "kafka")
>       .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
>       .option("kafka.ssl.truststore.password", "changeit")
>       .option("subscribe", config.topicConfigList.keys.mkString(","))
>       .load()
> {code}
> Config of the sink:
> {code:java}
> .writeStream
>         .option("checkpointLocation", 
> s"${config.checkpointDir}/${topicConfig._1}/")
>         .format("kafka")
>         .option("kafka.bootstrap.servers", config.bootstrapServers)
>         .option("kafka.connections.max.idle.ms", 3600000)
>         //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
>         .option("kafka.security.protocol", "SASL_SSL")
>         .option("kafka.sasl.mechanism", "GSSAPI")
>         .option("kafka.sasl.kerberos.service.name", "kafka")
>         .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
>         .option("kafka.ssl.truststore.password", "changeit")
>         .start()
> {code}
> {code:java}
> 17/07/18 10:11:58 WARN SslTransportLayer: Failed to send SSL Close message 
> java.io.IOException: Broken pipe
>       at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>       at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>       at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>       at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>       at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
>       at 
> org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195)
>       at 
> org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163)
>       at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:731)
>       at 
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:54)
>       at org.apache.kafka.common.network.Selector.doClose(Selector.java:540)
>       at org.apache.kafka.common.network.Selector.close(Selector.java:531)
>       at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:378)
>       at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
>       at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>       at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:298)
>       at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:206)
>       at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117)
>       at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106)
>       at 
> org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85)
>       at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68)
>       at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106)
>       at 
> org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:157)
>       at 
> org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:148)
>       at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>       at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>       at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>       at 
> org.apache.spark.sql.kafka010.KafkaWriteTask.execute(KafkaWriteTask.scala:47)
>       at 
> org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply$mcV$sp(KafkaWriter.scala:91)
>       at 
> org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(KafkaWriter.scala:91)
>       at 
> org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(KafkaWriter.scala:91)
>       at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
>       at 
> org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:91)
>       at 
> org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:89)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
>       at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
>       at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>       at org.apache.spark.scheduler.Task.run(Task.scala:108)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to