After a brief check, I found KAFKA-5649 where almost identical error was reported.
There is also KAFKA-3702 which is related but currently open. I will dig some more to see what I can find. Cheers On Mon, Aug 20, 2018 at 3:53 PM Basil Hariri <basil.har...@microsoft.com> wrote: > I am pretty sure I got those changes with the jar I compiled (I pulled > from master on 8/8 and it looks like SPARK-18057 was resolved on 8/3) but > no luck, here is a copy-paste of the error I’m seeing. The semantics for > Event Hubs’ Kafka head is highlighted for reference – we connect to port > 9093 on a FQDN instead of port 9092 on a Kafka broker’s IP address, but I > don’t think that should change anything. > > > > > > 18/08/20 22:29:13 INFO AbstractCoordinator: Discovered coordinator <broker > FQDN>:9093 (id: 2147483647 rack: null) for group > spark-kafka-source-1aa50598-99d1-4c53-a73c-fa6637a219b2--1338794993-driver-0. > > 18/08/20 22:29:13 INFO AbstractCoordinator: (Re-)joining group > spark-kafka-source-1aa50598-99d1-4c53-a73c-fa6637a219b2--1338794993-driver-0 > > 18/08/20 22:29:33 WARN SslTransportLayer: Failed to send SSL Close message > > java.io.IOException: Broken pipe > > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > > at > sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > > at > sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > > at > kafkashaded.org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195) > > at > kafkashaded.org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163) > > at > kafkashaded.org.apache.kafka.common.utils.Utils.closeAll(Utils.java:690) > > at > kafkashaded.org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:47) > > at > kafkashaded.org.apache.kafka.common.network.Selector.close(Selector.java:471) > > at > kafkashaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:348) > > at > kafkashaded.org.apache.kafka.common.network.Selector.poll(Selector.java:283) > > at > kafkashaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260) > > at > kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > > at > kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > > at > kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > > at > kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > > at > kafkashaded.org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) > > at > kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:366) > > at > kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:978) > > at > kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938) > > at > org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9.apply(KafkaOffsetReader.scala:214) > > at > org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9.apply(KafkaOffsetReader.scala:212) > > at > org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply$mcV$sp(KafkaOffsetReader.scala:303) > > at > org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:302) > > at > org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:302) > > at > org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77) > > at org.apache.spark.sql.kafka010.KafkaOffsetReader.org > $apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt(KafkaOffsetReader.scala:301) > > at > org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1.apply(KafkaOffsetReader.scala:212) > > at > org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1.apply(KafkaOffsetReader.scala:212) > > at > org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:270) > > at > org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchLatestOffsets(KafkaOffsetReader.scala:211) > > at > org.apache.spark.sql.kafka010.KafkaMicroBatchReader$$anonfun$getOrCreateInitialPartitionOffsets$1.apply(KafkaMicroBatchReader.scala:212) > > at > org.apache.spark.sql.kafka010.KafkaMicroBatchReader$$anonfun$getOrCreateInitialPartitionOffsets$1.apply(KafkaMicroBatchReader.scala:207) > > at scala.Option.getOrElse(Option.scala:121) > > at > org.apache.spark.sql.kafka010.KafkaMicroBatchReader.getOrCreateInitialPartitionOffsets(KafkaMicroBatchReader.scala:207) > > at org.apache.spark.sql.kafka010.KafkaMicroBatchReader.org > $apache$spark$sql$kafka010$KafkaMicroBatchReader$$initialPartitionOffsets$lzycompute(KafkaMicroBatchReader.scala:82) > > at org.apache.spark.sql.kafka010.KafkaMicroBatchReader.org > $apache$spark$sql$kafka010$KafkaMicroBatchReader$$initialPartitionOffsets(KafkaMicroBatchReader.scala:82) > > at > org.apache.spark.sql.kafka010.KafkaMicroBatchReader.setOffsetRange(KafkaMicroBatchReader.scala:89) > > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$6$$anonfun$apply$2.apply$mcV$sp(MicroBatchExecution.scala:364) > > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$6$$anonfun$apply$2.apply(MicroBatchExecution.scala:358) > > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$6$$anonfun$apply$2.apply(MicroBatchExecution.scala:358) > > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:379) > > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:60) > > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$6.apply(MicroBatchExecution.scala:353) > > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$6.apply(MicroBatchExecution.scala:340) > > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > > at > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > > at > scala.collection.AbstractTraversable.map(Traversable.scala:104) > > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:340) > > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:336) > > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:336) > > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:563) > > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.org > $apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:336) > > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:189) > > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:172) > > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:172) > > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:379) > > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:60) > > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:172) > > at > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) > > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:166) > > at > org.apache.spark.sql.execution.streaming.StreamExecution.org > $apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:293) > > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:203) > > 18/08/20 22:29:33 INFO AbstractCoordinator: Marking the coordinator > <broker FQDN>:9093 (id: 2147483647 rack: null) dead for group > spark-kafka-source-1aa50598-99d1-4c53-a73c-fa6637a219b2--1338794993-driver-0 > > 18/08/20 22:29:34 INFO AbstractCoordinator: Discovered coordinator <broker > FQDN>:9093 (id: 2147483647 rack: null) for group > spark-kafka-source-1aa50598-99d1-4c53-a73c-fa6637a219b2--1338794993-driver-0. > > <repeat this error over and over until the job is terminated> > > > > Also, I’m not sure if it’s relevant but I am running on Databricks > (currently working on running it on a local cluster to verify that it isn’t > a Databricks issue). The only jars I’m using are the Spark-Kafka connector > from github master on 8/8/18 and Kafka v2.0. Thanks so much for your help, > let me know if there’s anything else I can provide > > > > Sincerely, > > Basil > > > > *From:* Ted Yu <yuzhih...@gmail.com> > *Sent:* Friday, August 17, 2018 4:20 PM > *To:* basil.har...@microsoft.com.invalid > *Cc:* dev <dev@spark.apache.org> > *Subject:* Re: Spark Kafka adapter questions > > > > If you have picked up all the changes for SPARK-18057, the Kafka “broker” > supporting v1.0+ should be compatible with Spark's Kafka adapter. > > > > Can you post more details about the “failed to send SSL close message” > errors ? > > > > (The default Kafka version is 2.0.0 in Spark Kafka adapter > after SPARK-18057) > > > > Thanks > > > > On Fri, Aug 17, 2018 at 3:53 PM Basil Hariri < > basil.har...@microsoft.com.invalid> wrote: > > Hi all, > > > > I work on Azure Event Hubs (Microsoft’s PaaS offering similar to Apache > Kafka) and am trying to get our new Kafka head > <https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fazure.microsoft.com%2Fen-us%2Fblog%2Fazure-event-hubs-for-kafka-ecosystems-in-public-preview%2F&data=02%7C01%7CBasil.Hariri%40microsoft.com%7C9c2387763e53418d4b4e08d6049813a9%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636701448547293693&sdata=kNuSO1yNNJzOOyg%2FDRlyv4ZKB568f%2FKKn0zCnWQDK0A%3D&reserved=0> > to play nice with Spark’s Kafka adapter. The goal is for our Kafka endpoint > to be completely compatible with Spark’s Kafka adapter, but I’m running > into some issues that I think are related to versioning. I’ve been trying > to tinker with the kafka-0-10-sql > <https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fspark%2Ftree%2Fmaster%2Fexternal%2Fkafka-0-10-sql&data=02%7C01%7CBasil.Hariri%40microsoft.com%7C9c2387763e53418d4b4e08d6049813a9%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636701448547293693&sdata=s5BoYXcUhrVb5uaj3Y2soxjn8Zm3LFVOyGD8bwDZkkM%3D&reserved=0> > and kafka-0-10 > <https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fspark%2Ftree%2Fmaster%2Fexternal%2Fkafka-0-10-sql&data=02%7C01%7CBasil.Hariri%40microsoft.com%7C9c2387763e53418d4b4e08d6049813a9%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636701448547303703&sdata=5H9%2FFGxz1VsL0OfWx7mrsQU2cGIR7zB3VuMADZop9RE%3D&reserved=0> > adapters on Github and was wondering if someone could take a second to > point me in the right direction with: > > > > 1. What is the difference between those two adapters? My hunch is that > kafka-0-10-sql supports structured streaming while kafka-10-0 still uses > Spark streaming, but I haven’t found anything to verify that. > 2. Event Hubs’ Kafka endpoint only supports Kafka 1.0 and later, and > the errors I get when trying to connect to Spark (“failed to send SSL close > message” / broken pipe errors) have usually shown up when using Kafka v0.10 > applications with our endpoint. I built from source after I saw that both > libraries were updated for Kafka 2.0 support (late last week), but I’m > still running into the same issues. Do Spark’s Kafka adapters generally > downgrade to Kafka v0.10 protocols? If not, is there any other reason to > believe that a Kafka “broker” that doesn’t support v0.10 protocols but > supports v1.0+ would be incompatible with Spark’s Kafka adapter? > > > > Thanks in advance, please let me know if there’s a different place I > should be posting this > > > > Sincerely, > > Basil > > > >