[ 
https://issues.apache.org/jira/browse/SPARK-21453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16969995#comment-16969995
 ] 

Julian commented on SPARK-21453:
--------------------------------

As an update, the issue "{color:#172b4d}Failed to send SSL Close 
message{color}" is not causing major issues, as in the spark batches are still 
completing regardless of this failure. 

Importantly, the other error I was getting 
"{color:#172b4d}java.lang.NullPointerException{color}" appears unrelated. I 
have seemingly managed to avoid that error by dropping the kafka.jar / 
kafka-client.jar to the latest 0.11 kafka release instead of 1.0. This seems to 
have resolved that issue for now, so that specific issue appears to be coming 
from the kafka libraries. That error was causing the batches to fail 
periodically.

The SSL Close message error still comes in the logs files every so many hours 
(for all our different streaming flows that are built in a similar way and 
where there is continual load over many hours), so not hard to reproduce.

> Cached Kafka consumer may be closed too early
> ---------------------------------------------
>
>                 Key: SPARK-21453
>                 URL: https://issues.apache.org/jira/browse/SPARK-21453
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.2.0
>         Environment: Spark 2.2.0 and kafka 0.10.2.0
>            Reporter: Pablo Panero
>            Priority: Major
>
> On a streaming job using built-in kafka source and sink (over SSL), with  I 
> am getting the following exception:
> Config of the source:
> {code:java}
> val df = spark.readStream
>       .format("kafka")
>       .option("kafka.bootstrap.servers", config.bootstrapServers)
>       .option("failOnDataLoss", value = false)
>       .option("kafka.connections.max.idle.ms", 3600000)
>       //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
>       .option("kafka.security.protocol", "SASL_SSL")
>       .option("kafka.sasl.mechanism", "GSSAPI")
>       .option("kafka.sasl.kerberos.service.name", "kafka")
>       .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
>       .option("kafka.ssl.truststore.password", "changeit")
>       .option("subscribe", config.topicConfigList.keys.mkString(","))
>       .load()
> {code}
> Config of the sink:
> {code:java}
> .writeStream
>         .option("checkpointLocation", 
> s"${config.checkpointDir}/${topicConfig._1}/")
>         .format("kafka")
>         .option("kafka.bootstrap.servers", config.bootstrapServers)
>         .option("kafka.connections.max.idle.ms", 3600000)
>         //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
>         .option("kafka.security.protocol", "SASL_SSL")
>         .option("kafka.sasl.mechanism", "GSSAPI")
>         .option("kafka.sasl.kerberos.service.name", "kafka")
>         .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
>         .option("kafka.ssl.truststore.password", "changeit")
>         .start()
> {code}
> {code:java}
> 17/07/18 10:11:58 WARN SslTransportLayer: Failed to send SSL Close message 
> java.io.IOException: Broken pipe
>       at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>       at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>       at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>       at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>       at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
>       at 
> org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195)
>       at 
> org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163)
>       at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:731)
>       at 
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:54)
>       at org.apache.kafka.common.network.Selector.doClose(Selector.java:540)
>       at org.apache.kafka.common.network.Selector.close(Selector.java:531)
>       at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:378)
>       at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
>       at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>       at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:298)
>       at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:206)
>       at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117)
>       at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106)
>       at 
> org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85)
>       at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68)
>       at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106)
>       at 
> org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:157)
>       at 
> org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:148)
>       at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>       at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>       at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>       at 
> org.apache.spark.sql.kafka010.KafkaWriteTask.execute(KafkaWriteTask.scala:47)
>       at 
> org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply$mcV$sp(KafkaWriter.scala:91)
>       at 
> org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(KafkaWriter.scala:91)
>       at 
> org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(KafkaWriter.scala:91)
>       at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
>       at 
> org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:91)
>       at 
> org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:89)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
>       at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
>       at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>       at org.apache.spark.scheduler.Task.run(Task.scala:108)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to