Hi,

Yeah, but I verified the version of spark in HDInsight and the one I used in 
code and both are same.

HDInsight(4.0) has spark version 3.0 and Scala version 2.12

[cid:image001.png@01D7BAD3.81444600]

I used Livy API in HDInsight to submit the job. This is an API available in 
HDInsight to submit job remotely. I have passed all dependent jars while 
calling the submit.


Regards,
Favas

From: Stelios Philippou <stevo...@gmail.com>
Sent: Wednesday, October 6, 2021 16:51 PM
To: Muhammed Favas <favas.muham...@expeedsoftware.com>
Cc: user@spark.apache.org
Subject: Re: Running spark Kafka streaming jo in Azure HDInsight

Hi Favas,

The error states that you are using different libraries version.


Exception in thread "streaming-start" java.lang.NoSuchMethodError: 
org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V


Have in mind that Spark uses its internal libraries for the majority of this. 
So those two must be aligned between Spark and your code.

Can you verify that your HDVersion is indeed 3.0 ?

Also how are you submitting the job ?


On Wed, 6 Oct 2021 at 14:10, Muhammed Favas 
<favas.muham...@expeedsoftware.com<mailto:favas.muham...@expeedsoftware.com>> 
wrote:
Hi,

I am facing some dependency issue in running a spark streaming job in Azure 
HDInsight. The job is connecting to a kafka broker which is hosted in a LAN and 
has public IP access to it.

Spark job porn.xml set up – spark version 3.0.0, Scala version 2.12

<dependency>
  <groupId>org.scala-lang</groupId>
  <artifactId>scala-library</artifactId>
  <version>2.12.12</version>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.12</artifactId>
  <version>3.0.0</version>
</dependency>
<dependency> <!-- Spark dependency -->
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.12</artifactId>
  <version>3.0.0</version>
  <!-- <scope>provided</scope> -->
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-common</artifactId>
  <version>2.7.4</version>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.12</artifactId>
  <version>3.0.0</version>
  <!-- <scope>provided</scope> -->
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
  <version>3.0.0</version>
</dependency>

HDInsight version - Spark 3.0 (HDI 4.0)
I am using Livy API to start job in azure remotely. Below is the list of files 
passed in “jars” option in livy

kafka-clients-2.7.0.jar<https://expeediotsparkstorage.blob.core.windows.net/sparkjobs/kafka-clients-2.7.0.jar>,
spark-streaming-kafka-0-10_2.12-3.0.0.jar<https://expeediotsparkstorage.blob.core.windows.net/sparkjobs/spark-streaming-kafka-0-10_2.12-3.0.0.jar>,
spark-token-provider-kafka-0-10_2.12-3.0.0.jar<https://expeediotsparkstorage.blob.core.windows.net/sparkjobs/spark-token-provider-kafka-0-10_2.12-3.0.0.jar>

The job is starting in azure spark cluster, but it is not receiving data from 
my kafka broker. Here is the error I am getting


Exception in thread "streaming-start" java.lang.NoSuchMethodError: 
org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V

        at 
org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:93)

        at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:73)

        at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:258)

        at 
org.apache.spark.streaming.DStreamGraph.$anonfun$start$7(DStreamGraph.scala:55)

        at 
org.apache.spark.streaming.DStreamGraph.$anonfun$start$7$adapted(DStreamGraph.scala:55)

        at scala.collection.Iterator.foreach(Iterator.scala:941)

        at scala.collection.Iterator.foreach$(Iterator.scala:941)

        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)

        at 
scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:974)

        at scala.collection.parallel.Task.$anonfun$tryLeaf$1(Tasks.scala:53)

        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

        at scala.util.control.Breaks$$anon$1.catchBreak(Breaks.scala:67)

        at scala.collection.parallel.Task.tryLeaf(Tasks.scala:56)

        at scala.collection.parallel.Task.tryLeaf$(Tasks.scala:50)

        at 
scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:971)

        at 
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute(Tasks.scala:153)

        at 
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute$(Tasks.scala:149)

        at 
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:440)

        at java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)

        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)

        at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)

        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)

        at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)

Here is the scala code which used to connect to broker.


import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe



val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> kafkaServer,
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id<http://group.id>" -> connectionID,
    "auto.offset.reset" -> "earliest",
    "enable.auto.commit" -> (true: java.lang.Boolean),
    "partition.assignment.strategy" 
->"org.apache.kafka.clients.consumer.RangeAssignor"
  )

  val topics = Array(connectionID)
  val inputMsg = KafkaUtils.createDirectStream[String, String](
    ssc,
    PreferConsistent,
    Subscribe[String, String](topics, kafkaParams)
  )

The same code with above list of dependencies is working in my local Hadoop 
cluster which runs on YARN.

Please help me to figure out what could be the specific issue in HDInsight 
cluster.

Regards,
Favas

Reply via email to