Hi Favas,

The error states that you are using different libraries version.

Exception in thread "streaming-start" java.lang.NoSuchMethodError:
org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V



Have in mind that Spark uses its internal libraries for the majority of
this. So those two must be aligned between Spark and your code.

Can you verify that your HDVersion is indeed 3.0 ?

Also how are you submitting the job ?


On Wed, 6 Oct 2021 at 14:10, Muhammed Favas <
favas.muham...@expeedsoftware.com> wrote:

> Hi,
>
>
>
> I am facing some dependency issue in running a spark streaming job in
> Azure HDInsight. The job is connecting to a kafka broker which is hosted in
> a LAN and has public IP access to it.
>
>
>
> Spark job porn.xml set up – spark version 3.0.0, Scala version 2.12
>
>
>
> <dependency>
>   <groupId>org.scala-lang</groupId>
>   <artifactId>scala-library</artifactId>
>   <version>2.12.12</version>
> </dependency>
> <dependency>
>   <groupId>org.apache.spark</groupId>
>   <artifactId>spark-core_2.12</artifactId>
>   <version>3.0.0</version>
> </dependency>
> <dependency> <!-- Spark dependency -->
>   <groupId>org.apache.spark</groupId>
>   <artifactId>spark-sql_2.12</artifactId>
>   <version>3.0.0</version>
>   <!-- <scope>provided</scope> -->
> </dependency>
> <dependency>
>   <groupId>org.apache.hadoop</groupId>
>   <artifactId>hadoop-common</artifactId>
>   <version>2.7.4</version>
> </dependency>
> <dependency>
>   <groupId>org.apache.spark</groupId>
>   <artifactId>spark-streaming_2.12</artifactId>
>   <version>3.0.0</version>
>   <!-- <scope>provided</scope> -->
> </dependency>
> <dependency>
>   <groupId>org.apache.spark</groupId>
>   <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
>   <version>3.0.0</version>
> </dependency>
>
>
>
> HDInsight version - Spark 3.0 (HDI 4.0)
>
> I am using Livy API to start job in azure remotely. Below is the list of
> files passed in “jars” option in livy
>
>
>
> kafka-clients-2.7.0.jar
> <https://expeediotsparkstorage.blob.core.windows.net/sparkjobs/kafka-clients-2.7.0.jar>
> ,
>
> spark-streaming-kafka-0-10_2.12-3.0.0.jar
> <https://expeediotsparkstorage.blob.core.windows.net/sparkjobs/spark-streaming-kafka-0-10_2.12-3.0.0.jar>
> ,
>
> spark-token-provider-kafka-0-10_2.12-3.0.0.jar
> <https://expeediotsparkstorage.blob.core.windows.net/sparkjobs/spark-token-provider-kafka-0-10_2.12-3.0.0.jar>
>
>
>
> The job is starting in azure spark cluster, but it is not receiving data
> from my kafka broker. Here is the error I am getting
>
>
>
> Exception in thread "streaming-start" java.lang.NoSuchMethodError: 
> org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V
>
>         at 
> org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:93)
>
>         at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:73)
>
>         at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:258)
>
>         at 
> org.apache.spark.streaming.DStreamGraph.$anonfun$start$7(DStreamGraph.scala:55)
>
>         at 
> org.apache.spark.streaming.DStreamGraph.$anonfun$start$7$adapted(DStreamGraph.scala:55)
>
>         at scala.collection.Iterator.foreach(Iterator.scala:941)
>
>         at scala.collection.Iterator.foreach$(Iterator.scala:941)
>
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
>
>         at 
> scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:974)
>
>         at scala.collection.parallel.Task.$anonfun$tryLeaf$1(Tasks.scala:53)
>
>         at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>
>         at scala.util.control.Breaks$$anon$1.catchBreak(Breaks.scala:67)
>
>         at scala.collection.parallel.Task.tryLeaf(Tasks.scala:56)
>
>         at scala.collection.parallel.Task.tryLeaf$(Tasks.scala:50)
>
>         at 
> scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:971)
>
>         at 
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute(Tasks.scala:153)
>
>         at 
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute$(Tasks.scala:149)
>
>         at 
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:440)
>
>         at java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
>
>         at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>
>         at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>
>         at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>
>         at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
>
>
>
> Here is the scala code which used to connect to broker.
>
>
>
> import org.apache.spark.streaming.kafka010.KafkaUtils
> import org.apache.spark.streaming.kafka010.LocationStrategies.
> *PreferConsistent*import 
> org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
>
>
>
> val kafkaParams = *Map*[String, Object](
>     "bootstrap.servers" -> kafkaServer,
>     "key.deserializer" -> *classOf*[StringDeserializer],
>     "value.deserializer" -> *classOf*[StringDeserializer],
>     "group.id" -> connectionID,
>     "auto.offset.reset" -> "earliest",
>     "enable.auto.commit" -> (true: java.lang.Boolean),
>     "partition.assignment.strategy" 
> ->"org.apache.kafka.clients.consumer.RangeAssignor"
>   )
>
>   val topics = *Array*(connectionID)
>   val inputMsg = KafkaUtils.*createDirectStream*[String, String](
>     ssc,
>     *PreferConsistent*,
>     *Subscribe*[String, String](topics, kafkaParams)
>   )
>
>
>
> The same code with above list of dependencies is working in my local
> Hadoop cluster which runs on YARN.
>
>
>
> Please help me to figure out what could be the specific issue in HDInsight
> cluster.
>
>
>
> *Regards,*
>
> *Favas  *
>
>
>

Reply via email to