Are you creating a fat assembly jar with spark-streaming-kafka included and
using that to run your code? If yes, I am not sure why it is not finding
it. If not, then make sure that your framework places the
spark-stremaing-kafka jara in the runtime classpath.

On Tue, Nov 17, 2015 at 6:04 PM, tim_b123 <tim.barth...@iag.com.au> wrote:

>
> Hi,
>
> I have a spark kafka streaming application that works when I run with a
> local spark context, but not with a remote one.
>
> My code consists of:
> 1.      A spring-boot application that creates the context
> 2.      A shaded jar file containing all of my spark code
>
> On my pc (windows), I have a spark (1.5.2) master and worker running
> (spark-1.5.2-bin-hadoop2.6).
>
> The entry point for my application is the start() method.
>
> The code is:
>
>   @throws(classOf[Exception])
>   def start {
>     val ssc: StreamingContext = createStreamingContext
>
>     val messagesRDD = createKafkaDStream(ssc, "myTopic", 2)
>
>     def datasRDD = messagesRDD.map((line : String) =>
> MapFunctions.lineToSparkEventData(line))
>
>     def count = datasRDD.count()
>    datasRDD.print(1)
>
>     ssc.start
>     ssc.awaitTermination
>   }
>
>
>   private def createStreamingContext: StreamingContext = {
>     System.setProperty(HADOOP_HOME_DIR, configContainer.getHadoopHomeDir)
>     System.setProperty("spark.streaming.concurrentJobs",
> String.valueOf(configContainer.getStreamingConcurrentJobs))
>     def sparkConf = createSparkConf()
>
>     val ssc = new StreamingContext(sparkConf,
> Durations.seconds(configContainer.getStreamingContextDurationSeconds))
>     ssc.sparkContext.setJobGroup("main_streaming_job", "streaming context
> start")
>     ssc.sparkContext.setLocalProperty("spark.scheduler.pool",
> "real_time_pool")
>     ssc
>   }
>
>
>   private def createSparkConf() : SparkConf = {
>   def masterString = "spark://<<my_pc>>:7077"
>     def conf = new
> SparkConf().setMaster(masterString).setAppName("devAppRem")  // This is not
> working
> //    def conf = new
> SparkConf().setMaster("local[4]").setAppName("devAppLocal")  // This IS
> working
>
>     conf.set("spark.scheduler.allocation.file",
> "D:\\valid_path_to\\fairscheduler.xml");
>
>
>     val pathToShadedApplicationJar: String =
> configContainer.getApplicationJarPaths.get(0)
>     val jars: Array[String] = Array[String](pathToShadedApplicationJar)
>     conf.setJars(jars)
>
>     conf.set("spark.scheduler.mode", "FAIR")
>
>   }
>
>
>   private def createKafkaDStream(ssc: StreamingContext, topics: String,
> numThreads: Int): DStream[String] = {
>     val zkQuorum: String = configContainer.getZkQuorum
>     val groupId: String = configContainer.getGroupId
>     val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
>     def lines = KafkaUtils.createStream(ssc, zkQuorum, groupId, topicMap,
> StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
>     lines
>   }
> }
>
>
> The Error that I get is:
>
> 2015-11-18 12:58:33.755  WARN 6132 --- [result-getter-2]
> o.apache.spark.scheduler.TaskSetManager  : Lost task 0.0 in stage 2.0 (TID
> 70, 169.254.95.56): java.io.IOException: java.lang.ClassNotFoundException:
> org.apache.spark.streaming.kafka.KafkaReceiver
>         at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)
>         at
>
> org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70)
>         at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
>         at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>         at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>         at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>         at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>         at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>         at
>
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
>         at
>
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
>         at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.spark.streaming.kafka.KafkaReceiver
>         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>         at java.lang.Class.forName0(Native Method)
>         at java.lang.Class.forName(Class.java:274)
>         at
>
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
>         at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>         at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>         at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
>         at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
>         at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>         at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>         at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>         at
> java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
>         at
>
> org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:74)
>         at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160)
>         ... 19 more
>
>
> I’m building using maven to create a shaded jar file.
>
> Properties in the parent pom are:
>
>   <properties>
>         <springframework.version>3.2.13.RELEASE</springframework.version>
>
>
> <springframework.integration.version>2.2.6.RELEASE</springframework.integration.version>
>         <ibm.mq.version>7.0.1.3</ibm.mq.version>
>         <hibernate.version>3.5.6-Final</hibernate.version>
>         <hibernate.jpa.version>1.0.0-CR-1</hibernate.jpa.version>
>         <validation-api.version>1.0.0.GA</validation-api.version>
>
> <hibernate-validator.version>4.3.0.FINAL</hibernate-validator.version>
>         <jackson.version>1.8.5</jackson.version>
>         <powermock.version>1.4.12</powermock.version>
>         <scala.binary.version>2.10</scala.binary.version>
>         <spark.version>1.5.2</spark.version>
>         <scala.version>2.10.5</scala.version>
>     </properties>
>
>
> The pom file for the shaded jar is:
>
> <project xmlns="http://maven.apache.org/POM/4.0.0";
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
> http://maven.apache.org/maven-v4_0_0.xsd";>
>     <modelVersion>4.0.0</modelVersion>
>
>     <artifactId>my-spark-client</artifactId>
> ...
>     <build>
>         <finalName>my-spark-client</finalName>
>         <plugins>
>             <plugin>
>                 <groupId>org.apache.maven.plugins</groupId>
>                 <artifactId>maven-shade-plugin</artifactId>
>                 <version>2.3</version>
>                 <executions>
>                     <execution>
>                         <phase>package</phase>
>                         <goals>
>                             <goal>shade</goal>
>                         </goals>
>                         <configuration>
>                             <filters>
>                                 <filter>
>                                     <artifact>*:*</artifact>
>                                     <excludes>
>
> <exclude>org/datanucleus/**</exclude>
>                                         <exclude>META-INF/*.SF</exclude>
>                                         <exclude>META-INF/*.DSA</exclude>
>                                         <exclude>META-INF/*.RSA</exclude>
>                                     </excludes>
>                                 </filter>
>                             </filters>
>                             <transformers>
>                                 <transformer
>
> implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
>                                     <resource>reference.conf</resource>
>                                 </transformer>
>                                 <transformer
>
> implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"
> />
>                             </transformers>
>                             <artifactSet>
>                                 <excludes>
>
> <exclude>classworlds:classworlds</exclude>
>                                     <exclude>junit:junit</exclude>
>                                     <exclude>jmock:*</exclude>
>                                 </excludes>
>                             </artifactSet>
>                         </configuration>
>                     </execution>
>                 </executions>
>             </plugin>
>
>             <plugin>
>                 <groupId>org.scalatest</groupId>
>                 <artifactId>scalatest-maven-plugin</artifactId>
>                 <version>1.0</version>
>                 <configuration>
>
>
> <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
>                     <junitxml>.</junitxml>
>                     <filereports>WDF TestSuite.txt</filereports>
>                 </configuration>
>                 <executions>
>                     <execution>
>                         <id>test</id>
>                         <goals>
>                             <goal>test</goal>
>                         </goals>
>                     </execution>
>                 </executions>
>             </plugin>
>
>             <plugin>
>                 <groupId>org.scala-tools</groupId>
>                 <artifactId>maven-scala-plugin</artifactId>
>                 <version>2.15.0</version>
>                 <executions>
>                     <execution>
>                         <id>compile-scala</id>
>                         <phase>compile</phase>
>                         <goals>
>                             <goal>compile</goal>
>                         </goals>
>                     </execution>
>
>                     <execution>
>                         <id>compile-tests-scala</id>
>                         <phase>compile</phase>
>                         <goals>
>                             <goal>testCompile</goal>
>                         </goals>
>                     </execution>
>                 </executions>
>             </plugin>
>
>         </plugins>
>     </build>
>
>     <dependencies>
>         <dependency>
>             <groupId>org.slf4j</groupId>
>             <artifactId>slf4j-log4j12</artifactId>
>             <version>1.7.5</version>
>         </dependency>
>         <dependency>
>             <groupId>org.slf4j</groupId>
>             <artifactId>slf4j-api</artifactId>
>             <version>1.7.5</version>
>         </dependency>
>         <dependency>
>             <groupId>org.springframework</groupId>
>             <artifactId>spring-test</artifactId>
>             <scope>test</scope>
>         </dependency>
> ...
> ...
>         <dependency>
>             <groupId>com.databricks</groupId>
>             <artifactId>spark-csv_2.10</artifactId>
>             <version>1.2.0</version>
>         </dependency>
>         <dependency>
>             <groupId>org.scala-lang</groupId>
>             <artifactId>scala-compiler</artifactId>
>             <version>${scala.version}</version>
>             <scope>compile</scope>
>         </dependency>
>
>         <dependency>
>             <groupId>org.scala-lang</groupId>
>             <artifactId>scala-library</artifactId>
>             <version>${scala.version}</version>
>         </dependency>
>         <dependency>
>             <groupId>org.scalatest</groupId>
>             <artifactId>scalatest_${scala.binary.version}</artifactId>
>             <scope>test</scope>
>         </dependency>
>
>         <dependency>
>             <groupId>org.apache.kafka</groupId>
>             <artifactId>kafka_${scala.binary.version}</artifactId>
>             <version>0.8.2.1</version>
>             <scope>compile</scope>
>             <exclusions>
>                 <exclusion>
>                     <artifactId>jmxri</artifactId>
>                     <groupId>com.sun.jmx</groupId>
>                 </exclusion>
>                 <exclusion>
>                     <artifactId>jms</artifactId>
>                     <groupId>javax.jms</groupId>
>                 </exclusion>
>                 <exclusion>
>                     <artifactId>jmxtools</artifactId>
>                     <groupId>com.sun.jdmk</groupId>
>                 </exclusion>
>             </exclusions>
>         </dependency>
>
>         <dependency>
>             <groupId>org.apache.spark</groupId>
>
> <artifactId>spark-streaming-kafka_${scala.binary.version}</artifactId>
>             <version>${spark.version}</version>
>             <exclusions>
>                 <exclusion>
>                     <artifactId>jcl-over-slf4j</artifactId>
>                     <groupId>org.slf4j</groupId>
>                 </exclusion>
>                 <exclusion>
>                     <artifactId>javax.servlet-api</artifactId>
>                     <groupId>javax.servlet</groupId>
>                 </exclusion>
>                 <exclusion>
>                     <groupId>javax.servlet</groupId>
>                     <artifactId>servlet-api</artifactId>
>                 </exclusion>
>             </exclusions>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.spark</groupId>
>
> <artifactId>spark-streaming_${scala.binary.version}</artifactId>
>             <version>${spark.version}</version>
>             <exclusions>
>                 <exclusion>
>                     <artifactId>jcl-over-slf4j</artifactId>
>                     <groupId>org.slf4j</groupId>
>                 </exclusion>
>             </exclusions>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.spark</groupId>
>             <artifactId>spark-core_${scala.binary.version}</artifactId>
>             <version>${spark.version}</version>
>             <scope>provided</scope>
>             <exclusions>
>                 <exclusion>
>                     <artifactId>jcl-over-slf4j</artifactId>
>                     <groupId>org.slf4j</groupId>
>                 </exclusion>
>                 <exclusion>
>                     <artifactId>javax.servlet-api</artifactId>
>                     <groupId>javax.servlet</groupId>
>                 </exclusion>
>                 <exclusion>
>                     <groupId>javax.servlet</groupId>
>                     <artifactId>servlet-api</artifactId>
>                 </exclusion>
>             </exclusions>
>         </dependency>
>
>         <dependency>
>             <groupId>org.apache.spark</groupId>
>             <artifactId>spark-sql_${scala.binary.version}</artifactId>
>             <version>${spark.version}</version>
>             <exclusions>
>                 <exclusion>
>                     <artifactId>jcl-over-slf4j</artifactId>
>                     <groupId>org.slf4j</groupId>
>                 </exclusion>
>             </exclusions>
>         </dependency>
>         <dependency>
>             <groupId>org.mockito</groupId>
>             <artifactId>mockito-all</artifactId>
>             <scope>test</scope>
>         </dependency>
>         <dependency>
>             <groupId>junit</groupId>
>             <artifactId>junit</artifactId>
>             <scope>test</scope>
>         </dependency>
>         <dependency>
>             <groupId>joda-time</groupId>
>             <artifactId>joda-time</artifactId>
>         </dependency>
>
>     </dependencies>
>
> ....
>
> </project>
>
>
> org.apache.spark.streaming.kafka.KafkaReceiver is inside the
> spark-streaming-kafka jar file, so I’m not sure why I get the
> ClassNotFoundException.
>
> Please help.
>
> Thanks,
> Tim
>
>
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/kafka-streaminf-1-5-2-ClassNotFoundException-org-apache-spark-streaming-kafka-KafkaReceiver-tp25408.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to