[ 
https://issues.apache.org/jira/browse/KAFKA-5649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17285170#comment-17285170
 ] 

Julian edited comment on KAFKA-5649 at 2/16/21, 12:45 PM:
----------------------------------------------------------

We have now moved to HDP 3.1.5 having spark 2.3.2 and Kafka 2+ but we are 
forced to use kafka 0.10 libraries as no structured streaming build for these 
combined versions. On this new cluster, the issue is still happening in spark 
streaming from kafka. See https://issues.apache.org/jira/browse/SPARK-21453 
which I also updated and linked back to this case. I also see 
https://issues.apache.org/jira/browse/KAFKA-3702 maybe related here.

Maybe kafka 2 is solving this, but unfortunately we have a long way to go until 
we get to spark 2.4,  kafka 2+ and the relevant structured streaming builds 
supporting these two.


was (Author: julescs0):
We have now moved to HDP 3.1.5 having spark 2.3.2 and Kafka 2+ but we are 
forced to use kafka 0.10 libraries as no structured streaming build for these 
combined versions. On this new cluster, the issue is still happening in spark 
streaming from kafka. See https://issues.apache.org/jira/browse/SPARK-21453 
which I also updated and linked back to this case. I also see 
https://issues.apache.org/jira/browse/KAFKA-3702  and 
https://issues.apache.org/jira/browse/KAFKA-5649 maybe related here.

Maybe kafka 2 is solving this, but unfortunately we have a long way to go until 
we get to spark 2.4,  kafka 2+ and the relevant structured streaming builds 
supporting these two.

> Producer is being closed generating ssl exception
> -------------------------------------------------
>
>                 Key: KAFKA-5649
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5649
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 0.10.2.1
>         Environment: Spark 2.2.0 and kafka 0.10.2.0
>            Reporter: Pablo Panero
>            Priority: Major
>
> On a streaming job using built-in kafka source and sink (over SSL), with I am 
> getting the following exception:
> On a streaming job using built-in kafka source and sink (over SSL), with  I 
> am getting the following exception:
> Config of the source:
> {code:java}
> val df = spark.readStream
>       .format("kafka")
>       .option("kafka.bootstrap.servers", config.bootstrapServers)
>       .option("failOnDataLoss", value = false)
>       .option("kafka.connections.max.idle.ms", 3600000)
>       //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
>       .option("kafka.security.protocol", "SASL_SSL")
>       .option("kafka.sasl.mechanism", "GSSAPI")
>       .option("kafka.sasl.kerberos.service.name", "kafka")
>       .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
>       .option("kafka.ssl.truststore.password", "changeit")
>       .option("subscribe", config.topicConfigList.keys.mkString(","))
>       .load()
> {code}
> Config of the sink:
> {code:java}
> .writeStream
>         .option("checkpointLocation", 
> s"${config.checkpointDir}/${topicConfig._1}/")
>         .format("kafka")
>         .option("kafka.bootstrap.servers", config.bootstrapServers)
>         .option("kafka.connections.max.idle.ms", 3600000)
>         //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
>         .option("kafka.security.protocol", "SASL_SSL")
>         .option("kafka.sasl.mechanism", "GSSAPI")
>         .option("kafka.sasl.kerberos.service.name", "kafka")
>         .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
>         .option("kafka.ssl.truststore.password", "changeit")
>         .start()
> {code}
> And in some cases it throws the exception making the spark job stuck in that 
> step. Exception stack trace is the following:
> {code:java}
> 17/07/18 10:11:58 WARN SslTransportLayer: Failed to send SSL Close message 
> java.io.IOException: Broken pipe
>       at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>       at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>       at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>       at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>       at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
>       at 
> org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195)
>       at 
> org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163)
>       at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:731)
>       at 
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:54)
>       at org.apache.kafka.common.network.Selector.doClose(Selector.java:540)
>       at org.apache.kafka.common.network.Selector.close(Selector.java:531)
>       at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:378)
>       at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
>       at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>       at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:298)
>       at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:206)
>       at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117)
>       at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106)
>       at 
> org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85)
>       at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68)
>       at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106)
>       at 
> org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:157)
>       at 
> org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:148)
>       at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>       at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>       at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>       at 
> org.apache.spark.sql.kafka010.KafkaWriteTask.execute(KafkaWriteTask.scala:47)
>       at 
> org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply$mcV$sp(KafkaWriter.scala:91)
>       at 
> org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(KafkaWriter.scala:91)
>       at 
> org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(KafkaWriter.scala:91)
>       at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
>       at 
> org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:91)
>       at 
> org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:89)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
>       at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
>       at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>       at org.apache.spark.scheduler.Task.run(Task.scala:108)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to