Hi, I am facing some dependency issue in running a spark streaming job in Azure HDInsight. The job is connecting to a kafka broker which is hosted in a LAN and has public IP access to it.
Spark job porn.xml set up - spark version 3.0.0, Scala version 2.12 <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.12.12</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.0.0</version> </dependency> <dependency> <!-- Spark dependency --> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.0.0</version> <!-- <scope>provided</scope> --> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.4</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.12</artifactId> <version>3.0.0</version> <!-- <scope>provided</scope> --> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.12</artifactId> <version>3.0.0</version> </dependency> HDInsight version - Spark 3.0 (HDI 4.0) I am using Livy API to start job in azure remotely. Below is the list of files passed in "jars" option in livy kafka-clients-2.7.0.jar<https://expeediotsparkstorage.blob.core.windows.net/sparkjobs/kafka-clients-2.7.0.jar>, spark-streaming-kafka-0-10_2.12-3.0.0.jar<https://expeediotsparkstorage.blob.core.windows.net/sparkjobs/spark-streaming-kafka-0-10_2.12-3.0.0.jar>, spark-token-provider-kafka-0-10_2.12-3.0.0.jar<https://expeediotsparkstorage.blob.core.windows.net/sparkjobs/spark-token-provider-kafka-0-10_2.12-3.0.0.jar> The job is starting in azure spark cluster, but it is not receiving data from my kafka broker. Here is the error I am getting Exception in thread "streaming-start" java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:93) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:73) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:258) at org.apache.spark.streaming.DStreamGraph.$anonfun$start$7(DStreamGraph.scala:55) at org.apache.spark.streaming.DStreamGraph.$anonfun$start$7$adapted(DStreamGraph.scala:55) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:974) at scala.collection.parallel.Task.$anonfun$tryLeaf$1(Tasks.scala:53) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at scala.util.control.Breaks$$anon$1.catchBreak(Breaks.scala:67) at scala.collection.parallel.Task.tryLeaf(Tasks.scala:56) at scala.collection.parallel.Task.tryLeaf$(Tasks.scala:50) at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:971) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute(Tasks.scala:153) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute$(Tasks.scala:149) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:440) at java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) Here is the scala code which used to connect to broker. import org.apache.spark.streaming.kafka010.KafkaUtils import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe val kafkaParams = Map[String, Object]( "bootstrap.servers" -> kafkaServer, "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> connectionID, "auto.offset.reset" -> "earliest", "enable.auto.commit" -> (true: java.lang.Boolean), "partition.assignment.strategy" ->"org.apache.kafka.clients.consumer.RangeAssignor" ) val topics = Array(connectionID) val inputMsg = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ) The same code with above list of dependencies is working in my local Hadoop cluster which runs on YARN. Please help me to figure out what could be the specific issue in HDInsight cluster. Regards, Favas