navin created KAFKA-10682:
-----------------------------

             Summary: Windows Kafka cluster not reachable via Azure data Bricks
                 Key: KAFKA-10682
                 URL: https://issues.apache.org/jira/browse/KAFKA-10682
             Project: Kafka
          Issue Type: Bug
          Components: consumer
    Affects Versions: 2.6.0
            Reporter: navin


We have windows Kafka cluster,
 * We enabled inbound and outbound for port 9092/9093
 * Topic return results on local windows cmd used
 ** ./kafka-console-consumer.bat --topic SIP.SIP.SHIPMENT --from-beginning 
--bootstrap-server 10.53.56.140:9092
 * We trying to consume the topic from Azure data bricks
 ** df = spark \
 .readStream \
 .format("kafka") \
 .option("kafka.bootstrap.servers", "10.53.56.140:9092") \
 .option("subscribe", "SIP.SIP.SHIPMENT") \
 .option("minPartitions", "10") \
 .option("startingOffsets", "earliest") \
 .load()
#df.isStreaming() # Returns True for DataFrames that have streaming sources

df.printSchema()

 ** Display(df)

On using display command after before amount of time we got below error:

Lost connection to cluster. The notebook may have been detached or the cluster 
may have been terminated due to an error in the driver such as an 
OutOfMemoryError.

  What we see in Logs is below error

20/11/04 18:23:52 WARN NetworkClient: [Consumer 
clientId=consumer-spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0-5,
 
groupId=spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0]
 Error connecting to node Navin.us.corp.tim.com:9092 (id: 0 rack: null)20/11/04 
18:23:52 WARN NetworkClient: [Consumer 
clientId=consumer-spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0-5,
 
groupId=spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0]
 Error connecting to node Navin.us.corp.tim.com:9092 (id: 0 rack: 
null)java.net.UnknownHostException: Navin.us.corp.tim.com at 
java.net.InetAddress.getAllByName0(InetAddress.java:1281) at 
java.net.InetAddress.getAllByName(InetAddress.java:1193) at 
java.net.InetAddress.getAllByName(InetAddress.java:1127) at 
kafkashaded.org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:104) 
at 
kafkashaded.org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403)
 at 
kafkashaded.org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363)
 at 
kafkashaded.org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151)
 at 
kafkashaded.org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:949)
 at 
kafkashaded.org.apache.kafka.clients.NetworkClient.access$500(NetworkClient.java:71)
 at 
kafkashaded.org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1122)
 at 
kafkashaded.org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1010)
 at 
kafkashaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:545) 
at 
kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
 at 
kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
 at 
kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
 at 
kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:161)
 at 
kafkashaded.org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:240)
 at 
kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:444)
 at 
kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
 at 
kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1235)
 at 
kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168)
 at 
org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReader.scala:540)
 at 
org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReader.scala:602)
 at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at 
org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
 at 
org.apache.spark.sql.kafka010.KafkaOffsetReader.withRetriesWithoutInterrupt(KafkaOffsetReader.scala:601)
 at 
org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReader.scala:538)
 at 
org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:569)
 at 
org.apache.spark.sql.kafka010.KafkaOffsetReader.partitionsAssignedToConsumer(KafkaOffsetReader.scala:538)
 at 
org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchEarliestOffsets(KafkaOffsetReader.scala:300)
 at 
org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$getOrCreateInitialPartitionOffsets$1(KafkaMicroBatchStream.scala:151)
 at scala.Option.getOrElse(Option.scala:189) at 
org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:148)
 at 
org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:76)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$5(MicroBatchExecution.scala:398)
 at scala.Option.getOrElse(Option.scala:189) at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$3(MicroBatchExecution.scala:398)
 at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:276)
 at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:274)
 at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:71)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:391)
 at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) 
at scala.collection.immutable.Map$Map1.foreach(Map.scala:128) at 
scala.collection.TraversableLike.map(TraversableLike.scala:238) at 
scala.collection.TraversableLike.map$(TraversableLike.scala:231) at 
scala.collection.AbstractTraversable.map(Traversable.scala:108) at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:388)
 at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:619)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:384)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:216)
 at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:276)
 at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:274)
 at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:71)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:199)
 at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:193)
 at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:346)
 at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:259)

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to