Re: Spark Kafka streaming - ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaReceiver
Thanks Tobias for replying. The problem was that, I have to provide the dependency jars' paths to the StreamingContext within the code. So, providing all the jar paths, resolved my problem. Refer the below code snippet: *JavaStreamingContext ssc = new JavaStreamingContext(args[0], SparkStreamExample, new Duration(1000), System.getenv(SPARK_HOME), new String[] {JavaStreamingContext.jarOfClass(SparkStreamExample.class)[0], /usr/local/spark-0.9.1-bin-hadoop2/external/kafka/target/spark-streaming-kafka_2.10-0.9.1.jar, /usr/lib/hbase/lib/zookeeper.jar, /usr/local/kafka/kafka_2.10-0.8.1.1/libs/zkclient-0.3.jar, /usr/local/kafka/kafka_2.10-0.8.1.1/libs/kafka_2.10-0.8.1.1.jar, /usr/local/scala/lib/scala-library.jar, /usr/local/shark-0.9.1-bin-hadoop2/lib_managed/jars/com.yammer.metrics/metrics-core/metrics-core-2.1.2.jar, /usr/local/hbase.jar});* The question is that isn't there any other way of doing this? The above approach doesn't seem good to me. For example, what if I execute the application on some other cluster where dependency paths are different? It is also not feasible to parametrize these jar paths as user arguments. Any advise will be appreciated. Regards, Gaurav On Mon, Jun 9, 2014 at 6:23 AM, Tobias Pfeiffer [via Apache Spark User List] ml-node+s1001560n7216...@n3.nabble.com wrote: Gaurav, I am not sure that the * expands to what you expect it to do. Normally the bash expands * to a space-separated string, not colon-separated. Try specifying all the jars manually, maybe? Tobias On Thu, Jun 5, 2014 at 6:45 PM, Gaurav Dasgupta [hidden email] http://user/SendEmail.jtp?type=nodenode=7216i=0 wrote: Hi, I have written my own custom Spark streaming code which connects to Kafka server and fetch data. I have tested the code on local mode and it is working fine. But when I am executing the same code on YARN mode, I am getting KafkaReceiver class not found exception. I am providing the Spark Kafka jar in the classpath and ensured that the path is correct for all the nodes in my cluster. I am using Spark 0.9.1 hadoop pre-built and is deployed on all the nodes (10 node cluster) in the YARN cluster. I am using the following command to run my code on YARN mode: SPARK_YARN_MODE=true SPARK_JAR=assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar SPARK_YARN_APP_JAR=/usr/local/SparkStreamExample.jar java -cp /usr/local/SparkStreamExample.jar:assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar:external/kafka/target/spark-streaming-kafka_2.10-0.9.1.jar:/usr/local/kafka/kafka_2.10-0.8.1.1/libs/*:/usr/lib/hbase/lib/*:/etc/hadoop/conf/:/etc/hbase/conf/ SparkStreamExample yarn-client 10.10.5.32 myFirstGroup testTopic NewTestTable 1 Below is the error message I am getting: 14/06/05 04:29:12 INFO cluster.YarnClientClusterScheduler: Adding task set 2.0 with 1 tasks 14/06/05 04:29:12 INFO scheduler.TaskSetManager: Starting task 2.0:0 as TID 70 on executor 2: manny6.musigma.com (PROCESS_LOCAL) 14/06/05 04:29:12 INFO scheduler.TaskSetManager: Serialized task 2.0:0 as 2971 bytes in 2 ms 14/06/05 04:29:12 WARN scheduler.TaskSetManager: Lost TID 70 (task 2.0:0) 14/06/05 04:29:12 WARN scheduler.TaskSetManager: Loss was due to java.lang.ClassNotFoundException java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaReceiver at java.net.URLClassLoader$1.run(URLClassLoader.java:202) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:190) at java.lang.ClassLoader.loadClass(ClassLoader.java:306) at java.lang.ClassLoader.loadClass(ClassLoader.java:247) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:247) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1574) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1495) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1731) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1666) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1322) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:479) at
Re: Spark Kafka streaming - ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaReceiver
Gaurav, I am not sure that the * expands to what you expect it to do. Normally the bash expands * to a space-separated string, not colon-separated. Try specifying all the jars manually, maybe? Tobias On Thu, Jun 5, 2014 at 6:45 PM, Gaurav Dasgupta gaurav.d...@gmail.com wrote: Hi, I have written my own custom Spark streaming code which connects to Kafka server and fetch data. I have tested the code on local mode and it is working fine. But when I am executing the same code on YARN mode, I am getting KafkaReceiver class not found exception. I am providing the Spark Kafka jar in the classpath and ensured that the path is correct for all the nodes in my cluster. I am using Spark 0.9.1 hadoop pre-built and is deployed on all the nodes (10 node cluster) in the YARN cluster. I am using the following command to run my code on YARN mode: SPARK_YARN_MODE=true SPARK_JAR=assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar SPARK_YARN_APP_JAR=/usr/local/SparkStreamExample.jar java -cp /usr/local/SparkStreamExample.jar:assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar:external/kafka/target/spark-streaming-kafka_2.10-0.9.1.jar:/usr/local/kafka/kafka_2.10-0.8.1.1/libs/*:/usr/lib/hbase/lib/*:/etc/hadoop/conf/:/etc/hbase/conf/ SparkStreamExample yarn-client 10.10.5.32 myFirstGroup testTopic NewTestTable 1 Below is the error message I am getting: 14/06/05 04:29:12 INFO cluster.YarnClientClusterScheduler: Adding task set 2.0 with 1 tasks 14/06/05 04:29:12 INFO scheduler.TaskSetManager: Starting task 2.0:0 as TID 70 on executor 2: manny6.musigma.com (PROCESS_LOCAL) 14/06/05 04:29:12 INFO scheduler.TaskSetManager: Serialized task 2.0:0 as 2971 bytes in 2 ms 14/06/05 04:29:12 WARN scheduler.TaskSetManager: Lost TID 70 (task 2.0:0) 14/06/05 04:29:12 WARN scheduler.TaskSetManager: Loss was due to java.lang.ClassNotFoundException java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaReceiver at java.net.URLClassLoader$1.run(URLClassLoader.java:202) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:190) at java.lang.ClassLoader.loadClass(ClassLoader.java:306) at java.lang.ClassLoader.loadClass(ClassLoader.java:247) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:247) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1574) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1495) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1731) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1666) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1322) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:479) at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:72) at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350) at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:145) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1791) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:62) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:193) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) at
Spark Kafka streaming - ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaReceiver
Hi, I have written my own custom Spark streaming code which connects to Kafka server and fetch data. I have tested the code on local mode and it is working fine. But when I am executing the same code on YARN mode, I am getting KafkaReceiver class not found exception. I am providing the Spark Kafka jar in the classpath and ensured that the path is correct for all the nodes in my cluster. I am using Spark 0.9.1 hadoop pre-built and is deployed on all the nodes (10 node cluster) in the YARN cluster. I am using the following command to run my code on YARN mode: *SPARK_YARN_MODE=true SPARK_JAR=assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar SPARK_YARN_APP_JAR=/usr/local/SparkStreamExample.jar java -cp /usr/local/SparkStreamExample.jar:assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar:external/kafka/target/spark-streaming-kafka_2.10-0.9.1.jar:/usr/local/kafka/kafka_2.10-0.8.1.1/libs/*:/usr/lib/hbase/lib/*:/etc/hadoop/conf/:/etc/hbase/conf/ SparkStreamExample yarn-client 10.10.5.32 myFirstGroup testTopic NewTestTable 1* Below is the error message I am getting: *14/06/05 04:29:12 INFO cluster.YarnClientClusterScheduler: Adding task set 2.0 with 1 tasks14/06/05 04:29:12 INFO scheduler.TaskSetManager: Starting task 2.0:0 as TID 70 on executor 2: manny6.musigma.com http://manny6.musigma.com (PROCESS_LOCAL)14/06/05 04:29:12 INFO scheduler.TaskSetManager: Serialized task 2.0:0 as 2971 bytes in 2 ms14/06/05 04:29:12 WARN scheduler.TaskSetManager: Lost TID 70 (task 2.0:0)14/06/05 04:29:12 WARN scheduler.TaskSetManager: Loss was due to java.lang.ClassNotFoundExceptionjava.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaReceiverat java.net.URLClassLoader$1.run(URLClassLoader.java:202)at java.security.AccessController.doPrivileged(Native Method)at java.net.URLClassLoader.findClass(URLClassLoader.java:190)at java.lang.ClassLoader.loadClass(ClassLoader.java:306)at java.lang.ClassLoader.loadClass(ClassLoader.java:247)at java.lang.Class.forName0(Native Method)at java.lang.Class.forName(Class.java:247)at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1574) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1495) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1731) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1666)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1322)at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:479) at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:72) at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597)at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:145) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1791) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:62) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:193) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) at java.security.AccessController.doPrivileged(Native Method)at javax.security.auth.Subject.doAs(Subject.java:396)at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) at