Hi,

I have written my own custom Spark streaming code which connects to Kafka
server and fetch data. I have tested the code on local mode and it is
working fine. But when I am executing the same code on YARN mode, I am
getting KafkaReceiver class not found exception. I am providing the Spark
Kafka jar in the classpath and ensured that the path is correct for all the
nodes in my cluster.

I am using Spark 0.9.1 hadoop pre-built and is deployed on all the nodes
(10 node cluster) in the YARN cluster.
I am using the following command to run my code on YARN mode:

*SPARK_YARN_MODE=true
SPARK_JAR=assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar
SPARK_YARN_APP_JAR=/usr/local/SparkStreamExample.jar java -cp
/usr/local/SparkStreamExample.jar:assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar:external/kafka/target/spark-streaming-kafka_2.10-0.9.1.jar:/usr/local/kafka/kafka_2.10-0.8.1.1/libs/*:/usr/lib/hbase/lib/*:/etc/hadoop/conf/:/etc/hbase/conf/
SparkStreamExample yarn-client 10.10.5.32 myFirstGroup testTopic
NewTestTable 1*

Below is the error message I am getting:





















































*14/06/05 04:29:12 INFO cluster.YarnClientClusterScheduler: Adding task set
2.0 with 1 tasks14/06/05 04:29:12 INFO scheduler.TaskSetManager: Starting
task 2.0:0 as TID 70 on executor 2: manny6.musigma.com
<http://manny6.musigma.com> (PROCESS_LOCAL)14/06/05 04:29:12 INFO
scheduler.TaskSetManager: Serialized task 2.0:0 as 2971 bytes in 2
ms14/06/05 04:29:12 WARN scheduler.TaskSetManager: Lost TID 70 (task
2.0:0)14/06/05 04:29:12 WARN scheduler.TaskSetManager: Loss was due to
java.lang.ClassNotFoundExceptionjava.lang.ClassNotFoundException:
org.apache.spark.streaming.kafka.KafkaReceiver    at
java.net.URLClassLoader$1.run(URLClassLoader.java:202)    at
java.security.AccessController.doPrivileged(Native Method)    at
java.net.URLClassLoader.findClass(URLClassLoader.java:190)    at
java.lang.ClassLoader.loadClass(ClassLoader.java:306)    at
java.lang.ClassLoader.loadClass(ClassLoader.java:247)    at
java.lang.Class.forName0(Native Method)    at
java.lang.Class.forName(Class.java:247)    at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37)
at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1574)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1495)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1731)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)    at
java.io.ObjectInputStream.readArray(ObjectInputStream.java:1666)    at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1322)    at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)    at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)
at
java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:479)
at
org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:72)
at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)    at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)    at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)    at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)    at
org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:145)
at
java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1791)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)    at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)    at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:62)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:193)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
at java.security.AccessController.doPrivileged(Native Method)    at
javax.security.auth.Subject.doAs(Subject.java:396)    at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)*

What might be the problem? Can someone help me solving this issue?

Regards,
Gaurav

Reply via email to