Hi, I have written my own custom Spark streaming code which connects to Kafka server and fetch data. I have tested the code on local mode and it is working fine. But when I am executing the same code on YARN mode, I am getting KafkaReceiver class not found exception. I am providing the Spark Kafka jar in the classpath and ensured that the path is correct for all the nodes in my cluster.
I am using Spark 0.9.1 hadoop pre-built and is deployed on all the nodes (10 node cluster) in the YARN cluster. I am using the following command to run my code on YARN mode: *SPARK_YARN_MODE=true SPARK_JAR=assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar SPARK_YARN_APP_JAR=/usr/local/SparkStreamExample.jar java -cp /usr/local/SparkStreamExample.jar:assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar:external/kafka/target/spark-streaming-kafka_2.10-0.9.1.jar:/usr/local/kafka/kafka_2.10-0.8.1.1/libs/*:/usr/lib/hbase/lib/*:/etc/hadoop/conf/:/etc/hbase/conf/ SparkStreamExample yarn-client 10.10.5.32 myFirstGroup testTopic NewTestTable 1* Below is the error message I am getting: *14/06/05 04:29:12 INFO cluster.YarnClientClusterScheduler: Adding task set 2.0 with 1 tasks14/06/05 04:29:12 INFO scheduler.TaskSetManager: Starting task 2.0:0 as TID 70 on executor 2: manny6.musigma.com <http://manny6.musigma.com> (PROCESS_LOCAL)14/06/05 04:29:12 INFO scheduler.TaskSetManager: Serialized task 2.0:0 as 2971 bytes in 2 ms14/06/05 04:29:12 WARN scheduler.TaskSetManager: Lost TID 70 (task 2.0:0)14/06/05 04:29:12 WARN scheduler.TaskSetManager: Loss was due to java.lang.ClassNotFoundExceptionjava.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaReceiver at java.net.URLClassLoader$1.run(URLClassLoader.java:202) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:190) at java.lang.ClassLoader.loadClass(ClassLoader.java:306) at java.lang.ClassLoader.loadClass(ClassLoader.java:247) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:247) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1574) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1495) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1731) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1666) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1322) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:479) at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:72) at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350) at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:145) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1791) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:62) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:193) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662)* What might be the problem? Can someone help me solving this issue? Regards, Gaurav