After a brief check, I found KAFKA-5649 where almost identical error was
reported.

There is also KAFKA-3702 which is related but currently open.

I will dig some more to see what I can find.

Cheers

On Mon, Aug 20, 2018 at 3:53 PM Basil Hariri <basil.har...@microsoft.com>
wrote:

> I am pretty sure I got those changes with the jar I compiled (I pulled
> from master on 8/8 and it looks like SPARK-18057 was resolved on 8/3) but
> no luck, here is a copy-paste of the error I’m seeing. The semantics for
> Event Hubs’ Kafka head is highlighted for reference – we connect to port
> 9093 on a FQDN instead of port 9092 on a Kafka broker’s IP address, but I
> don’t think that should change anything.
>
>
>
>
>
> 18/08/20 22:29:13 INFO AbstractCoordinator: Discovered coordinator <broker
> FQDN>:9093 (id: 2147483647 rack: null) for group
> spark-kafka-source-1aa50598-99d1-4c53-a73c-fa6637a219b2--1338794993-driver-0.
>
> 18/08/20 22:29:13 INFO AbstractCoordinator: (Re-)joining group
> spark-kafka-source-1aa50598-99d1-4c53-a73c-fa6637a219b2--1338794993-driver-0
>
> 18/08/20 22:29:33 WARN SslTransportLayer: Failed to send SSL Close message
>
> java.io.IOException: Broken pipe
>
>                 at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>
>                 at
> sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>
>                 at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>
>                 at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>
>                 at
> sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
>
>                 at
> kafkashaded.org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195)
>
>                 at
> kafkashaded.org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163)
>
>                 at
> kafkashaded.org.apache.kafka.common.utils.Utils.closeAll(Utils.java:690)
>
>                 at
> kafkashaded.org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:47)
>
>                 at
> kafkashaded.org.apache.kafka.common.network.Selector.close(Selector.java:471)
>
>                 at
> kafkashaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:348)
>
>                 at
> kafkashaded.org.apache.kafka.common.network.Selector.poll(Selector.java:283)
>
>                 at
> kafkashaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
>
>                 at
> kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>
>                 at
> kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>
>                 at
> kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>
>                 at
> kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>
>                 at
> kafkashaded.org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
>
>                 at
> kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:366)
>
>                 at
> kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:978)
>
>                 at
> kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
>
>                 at
> org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9.apply(KafkaOffsetReader.scala:214)
>
>                 at
> org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9.apply(KafkaOffsetReader.scala:212)
>
>                 at
> org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply$mcV$sp(KafkaOffsetReader.scala:303)
>
>                 at
> org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:302)
>
>                 at
> org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:302)
>
>                 at
> org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
>
>                 at org.apache.spark.sql.kafka010.KafkaOffsetReader.org
> $apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt(KafkaOffsetReader.scala:301)
>
>                 at
> org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1.apply(KafkaOffsetReader.scala:212)
>
>                 at
> org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1.apply(KafkaOffsetReader.scala:212)
>
>                 at
> org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:270)
>
>                 at
> org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchLatestOffsets(KafkaOffsetReader.scala:211)
>
>                 at
> org.apache.spark.sql.kafka010.KafkaMicroBatchReader$$anonfun$getOrCreateInitialPartitionOffsets$1.apply(KafkaMicroBatchReader.scala:212)
>
>                 at
> org.apache.spark.sql.kafka010.KafkaMicroBatchReader$$anonfun$getOrCreateInitialPartitionOffsets$1.apply(KafkaMicroBatchReader.scala:207)
>
>                 at scala.Option.getOrElse(Option.scala:121)
>
>                 at
> org.apache.spark.sql.kafka010.KafkaMicroBatchReader.getOrCreateInitialPartitionOffsets(KafkaMicroBatchReader.scala:207)
>
>                 at org.apache.spark.sql.kafka010.KafkaMicroBatchReader.org
> $apache$spark$sql$kafka010$KafkaMicroBatchReader$$initialPartitionOffsets$lzycompute(KafkaMicroBatchReader.scala:82)
>
>                 at org.apache.spark.sql.kafka010.KafkaMicroBatchReader.org
> $apache$spark$sql$kafka010$KafkaMicroBatchReader$$initialPartitionOffsets(KafkaMicroBatchReader.scala:82)
>
>                 at
> org.apache.spark.sql.kafka010.KafkaMicroBatchReader.setOffsetRange(KafkaMicroBatchReader.scala:89)
>
>                 at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$6$$anonfun$apply$2.apply$mcV$sp(MicroBatchExecution.scala:364)
>
>                 at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$6$$anonfun$apply$2.apply(MicroBatchExecution.scala:358)
>
>                 at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$6$$anonfun$apply$2.apply(MicroBatchExecution.scala:358)
>
>                 at
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:379)
>
>                 at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:60)
>
>                 at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$6.apply(MicroBatchExecution.scala:353)
>
>                 at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$6.apply(MicroBatchExecution.scala:340)
>
>                 at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>
>                 at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>
>                 at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>
>                 at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>
>                 at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>
>                 at
> scala.collection.AbstractTraversable.map(Traversable.scala:104)
>
>                 at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:340)
>
>                 at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:336)
>
>                 at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:336)
>
>                 at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:563)
>
>                 at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.org
> $apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:336)
>
>                 at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:189)
>
>                 at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:172)
>
>                 at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:172)
>
>                 at
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:379)
>
>                 at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:60)
>
>                 at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:172)
>
>                 at
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
>
>                 at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:166)
>
>                 at
> org.apache.spark.sql.execution.streaming.StreamExecution.org
> $apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:293)
>
>                 at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:203)
>
> 18/08/20 22:29:33 INFO AbstractCoordinator: Marking the coordinator
> <broker FQDN>:9093 (id: 2147483647 rack: null) dead for group
> spark-kafka-source-1aa50598-99d1-4c53-a73c-fa6637a219b2--1338794993-driver-0
>
> 18/08/20 22:29:34 INFO AbstractCoordinator: Discovered coordinator <broker
> FQDN>:9093 (id: 2147483647 rack: null) for group
> spark-kafka-source-1aa50598-99d1-4c53-a73c-fa6637a219b2--1338794993-driver-0.
>
> <repeat this error over and over until the job is terminated>
>
>
>
> Also, I’m not sure if it’s relevant but I am running on Databricks
> (currently working on running it on a local cluster to verify that it isn’t
> a Databricks issue). The only jars I’m using are the Spark-Kafka connector
> from github master on 8/8/18 and Kafka v2.0. Thanks so much for your help,
> let me know if there’s anything else I can provide
>
>
>
> Sincerely,
>
> Basil
>
>
>
> *From:* Ted Yu <yuzhih...@gmail.com>
> *Sent:* Friday, August 17, 2018 4:20 PM
> *To:* basil.har...@microsoft.com.invalid
> *Cc:* dev <dev@spark.apache.org>
> *Subject:* Re: Spark Kafka adapter questions
>
>
>
> If you have picked up all the changes for SPARK-18057, the Kafka “broker”
> supporting v1.0+ should be compatible with Spark's Kafka adapter.
>
>
>
> Can you post more details about the “failed to send SSL close message”
> errors ?
>
>
>
> (The default Kafka version is 2.0.0 in Spark Kafka adapter
> after SPARK-18057)
>
>
>
> Thanks
>
>
>
> On Fri, Aug 17, 2018 at 3:53 PM Basil Hariri <
> basil.har...@microsoft.com.invalid> wrote:
>
> Hi all,
>
>
>
> I work on Azure Event Hubs (Microsoft’s PaaS offering similar to Apache
> Kafka) and am trying to get our new Kafka head
> <https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fazure.microsoft.com%2Fen-us%2Fblog%2Fazure-event-hubs-for-kafka-ecosystems-in-public-preview%2F&data=02%7C01%7CBasil.Hariri%40microsoft.com%7C9c2387763e53418d4b4e08d6049813a9%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636701448547293693&sdata=kNuSO1yNNJzOOyg%2FDRlyv4ZKB568f%2FKKn0zCnWQDK0A%3D&reserved=0>
> to play nice with Spark’s Kafka adapter. The goal is for our Kafka endpoint
> to be completely compatible with Spark’s Kafka adapter, but I’m running
> into some issues that I think are related to versioning. I’ve been trying
> to tinker with the kafka-0-10-sql
> <https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fspark%2Ftree%2Fmaster%2Fexternal%2Fkafka-0-10-sql&data=02%7C01%7CBasil.Hariri%40microsoft.com%7C9c2387763e53418d4b4e08d6049813a9%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636701448547293693&sdata=s5BoYXcUhrVb5uaj3Y2soxjn8Zm3LFVOyGD8bwDZkkM%3D&reserved=0>
> and kafka-0-10
> <https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fspark%2Ftree%2Fmaster%2Fexternal%2Fkafka-0-10-sql&data=02%7C01%7CBasil.Hariri%40microsoft.com%7C9c2387763e53418d4b4e08d6049813a9%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636701448547303703&sdata=5H9%2FFGxz1VsL0OfWx7mrsQU2cGIR7zB3VuMADZop9RE%3D&reserved=0>
> adapters on Github and was wondering if someone could take a second to
> point me in the right direction with:
>
>
>
>    1. What is the difference between those two adapters? My hunch is that
>    kafka-0-10-sql supports structured streaming while kafka-10-0 still uses
>    Spark streaming, but I haven’t found anything to verify that.
>    2. Event Hubs’ Kafka endpoint only supports Kafka 1.0 and later, and
>    the errors I get when trying to connect to Spark (“failed to send SSL close
>    message” / broken pipe errors) have usually shown up when using Kafka v0.10
>    applications with our endpoint. I built from source after I saw that both
>    libraries were updated for Kafka 2.0 support (late last week), but I’m
>    still running into the same issues. Do Spark’s Kafka adapters generally
>    downgrade to Kafka v0.10 protocols? If not, is there any other reason to
>    believe that a Kafka “broker” that doesn’t support v0.10 protocols but
>    supports v1.0+ would be incompatible with Spark’s Kafka adapter?
>
>
>
> Thanks in advance, please let me know if there’s a different place I
> should be posting this
>
>
>
> Sincerely,
>
> Basil
>
>
>
>

Reply via email to