Hi,

I am facing some dependency issue in running a spark streaming job in Azure 
HDInsight. The job is connecting to a kafka broker which is hosted in a LAN and 
has public IP access to it.

Spark job porn.xml set up - spark version 3.0.0, Scala version 2.12

<dependency>
  <groupId>org.scala-lang</groupId>
  <artifactId>scala-library</artifactId>
  <version>2.12.12</version>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.12</artifactId>
  <version>3.0.0</version>
</dependency>
<dependency> <!-- Spark dependency -->
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.12</artifactId>
  <version>3.0.0</version>
  <!-- <scope>provided</scope> -->
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-common</artifactId>
  <version>2.7.4</version>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.12</artifactId>
  <version>3.0.0</version>
  <!-- <scope>provided</scope> -->
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
  <version>3.0.0</version>
</dependency>

HDInsight version - Spark 3.0 (HDI 4.0)
I am using Livy API to start job in azure remotely. Below is the list of files 
passed in "jars" option in livy

kafka-clients-2.7.0.jar<https://expeediotsparkstorage.blob.core.windows.net/sparkjobs/kafka-clients-2.7.0.jar>,
spark-streaming-kafka-0-10_2.12-3.0.0.jar<https://expeediotsparkstorage.blob.core.windows.net/sparkjobs/spark-streaming-kafka-0-10_2.12-3.0.0.jar>,
spark-token-provider-kafka-0-10_2.12-3.0.0.jar<https://expeediotsparkstorage.blob.core.windows.net/sparkjobs/spark-token-provider-kafka-0-10_2.12-3.0.0.jar>

The job is starting in azure spark cluster, but it is not receiving data from 
my kafka broker. Here is the error I am getting


Exception in thread "streaming-start" java.lang.NoSuchMethodError: 
org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V

        at 
org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:93)

        at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:73)

        at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:258)

        at 
org.apache.spark.streaming.DStreamGraph.$anonfun$start$7(DStreamGraph.scala:55)

        at 
org.apache.spark.streaming.DStreamGraph.$anonfun$start$7$adapted(DStreamGraph.scala:55)

        at scala.collection.Iterator.foreach(Iterator.scala:941)

        at scala.collection.Iterator.foreach$(Iterator.scala:941)

        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)

        at 
scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:974)

        at scala.collection.parallel.Task.$anonfun$tryLeaf$1(Tasks.scala:53)

        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

        at scala.util.control.Breaks$$anon$1.catchBreak(Breaks.scala:67)

        at scala.collection.parallel.Task.tryLeaf(Tasks.scala:56)

        at scala.collection.parallel.Task.tryLeaf$(Tasks.scala:50)

        at 
scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:971)

        at 
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute(Tasks.scala:153)

        at 
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute$(Tasks.scala:149)

        at 
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:440)

        at java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)

        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)

        at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)

        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)

        at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)

Here is the scala code which used to connect to broker.


import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe



val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> kafkaServer,
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> connectionID,
    "auto.offset.reset" -> "earliest",
    "enable.auto.commit" -> (true: java.lang.Boolean),
    "partition.assignment.strategy" 
->"org.apache.kafka.clients.consumer.RangeAssignor"
  )

  val topics = Array(connectionID)
  val inputMsg = KafkaUtils.createDirectStream[String, String](
    ssc,
    PreferConsistent,
    Subscribe[String, String](topics, kafkaParams)
  )

The same code with above list of dependencies is working in my local Hadoop 
cluster which runs on YARN.

Please help me to figure out what could be the specific issue in HDInsight 
cluster.

Regards,
Favas

Reply via email to