Hi, I have a spark kafka streaming application that works when I run with a local spark context, but not with a remote one.
My code consists of: 1. A spring-boot application that creates the context 2. A shaded jar file containing all of my spark code On my pc (windows), I have a spark (1.5.1) master and worker running. The entry point for my application is the start() method. The code is: @throws(classOf[Exception]) def start { val ssc: StreamingContext = createStreamingContext val messagesRDD = createKafkaDStream(ssc, "myTopic", 2) def datasRDD = messagesRDD.map((line : String) => MapFunctions.lineToSparkEventData(line)) def count = datasRDD.count() datasRDD.print(1) ssc.start ssc.awaitTermination } private def createStreamingContext: StreamingContext = { System.setProperty(HADOOP_HOME_DIR, configContainer.getHadoopHomeDir) System.setProperty("spark.streaming.concurrentJobs", String.valueOf(configContainer.getStreamingConcurrentJobs)) def sparkConf = createSparkConf() val ssc = new StreamingContext(sparkConf, Durations.seconds(configContainer.getStreamingContextDurationSeconds)) ssc.sparkContext.setJobGroup("main_streaming_job", "streaming context start") ssc.sparkContext.setLocalProperty("spark.scheduler.pool", "real_time_pool") ssc } private def createSparkConf() : SparkConf = { def masterString = "spark://<<my_pc>>:7077" def conf = new SparkConf().setMaster(masterString).setAppName("devAppRem") // This is not working // def conf = new SparkConf().setMaster("local[4]").setAppName("devAppLocal") // This IS working conf.set("spark.scheduler.allocation.file", "D:\\valid_path_to\\fairscheduler.xml"); val pathToShadedApplicationJar: String = configContainer.getApplicationJarPaths.get(0) val jars: Array[String] = Array[String](pathToShadedApplicationJar) conf.setJars(jars) conf.set("spark.scheduler.mode", "FAIR") } private def createKafkaDStream(ssc: StreamingContext, topics: String, numThreads: Int): DStream[String] = { val zkQuorum: String = configContainer.getZkQuorum val groupId: String = configContainer.getGroupId val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap def lines = KafkaUtils.createStream(ssc, zkQuorum, groupId, topicMap, StorageLevel.MEMORY_AND_DISK_SER).map(_._2) lines } } The Error that I get is: 2015-11-18 10:41:19.191 WARN 3044 --- [result-getter-3] o.apache.spark.scheduler.TaskSetManager : Lost task 0.0 in stage 2.0 (TID 70, 169.254.95.56): java.io.IOException: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaReceiver at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163) at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70) at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaReceiver at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:274) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:74) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160) ... 19 more I'm building using maven - to create a shaded jar file. Properties in the parent pom are: <properties> <springframework.version>3.2.13.RELEASE</springframework.version> <springframework.integration.version>2.2.6.RELEASE</springframework.integration.version> <ibm.mq.version>7.0.1.3</ibm.mq.version> <hibernate.version>3.5.6-Final</hibernate.version> <hibernate.jpa.version>1.0.0-CR-1</hibernate.jpa.version> <validation-api.version>1.0.0.GA</validation-api.version> <hibernate-validator.version>4.3.0.FINAL</hibernate-validator.version> <jackson.version>1.8.5</jackson.version> <powermock.version>1.4.12</powermock.version> <scala.binary.version>2.10</scala.binary.version> <spark.version>1.5.1</spark.version> <scala.version>2.10.5</scala.version> </properties> The pom file for the shaded jar is: <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <artifactId>my-spark-client</artifactId> ... <build> <finalName>my-spark-client</finalName> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>org/datanucleus/**</exclude> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>reference.conf</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> </transformers> <artifactSet> <excludes> <exclude>classworlds:classworlds</exclude> <exclude>junit:junit</exclude> <exclude>jmock:*</exclude> </excludes> </artifactSet> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.scalatest</groupId> <artifactId>scalatest-maven-plugin</artifactId> <version>1.0</version> <configuration> <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> <junitxml>.</junitxml> <filereports>WDF TestSuite.txt</filereports> </configuration> <executions> <execution> <id>test</id> <goals> <goal>test</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.15.0</version> <executions> <execution> <id>compile-scala</id> <phase>compile</phase> <goals> <goal>compile</goal> </goals> </execution> <execution> <id>compile-tests-scala</id> <phase>compile</phase> <goals> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.5</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.5</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <scope>test</scope> </dependency> ... ... <dependency> <groupId>com.databricks</groupId> <artifactId>spark-csv_2.10</artifactId> <version>1.2.0</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-compiler</artifactId> <version>${scala.version}</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_${scala.binary.version}</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_${scala.binary.version}</artifactId> <version>0.8.2.1</version> <scope>compile</scope> <exclusions> <exclusion> <artifactId>jmxri</artifactId> <groupId>com.sun.jmx</groupId> </exclusion> <exclusion> <artifactId>jms</artifactId> <groupId>javax.jms</groupId> </exclusion> <exclusion> <artifactId>jmxtools</artifactId> <groupId>com.sun.jdmk</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_${scala.binary.version}</artifactId> <version>${spark.version}</version> <exclusions> <exclusion> <artifactId>jcl-over-slf4j</artifactId> <groupId>org.slf4j</groupId> </exclusion> <exclusion> <artifactId>javax.servlet-api</artifactId> <groupId>javax.servlet</groupId> </exclusion> <exclusion> <groupId>javax.servlet</groupId> <artifactId>servlet-api</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${scala.binary.version}</artifactId> <version>${spark.version}</version> <exclusions> <exclusion> <artifactId>jcl-over-slf4j</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${spark.version}</version> <scope>provided</scope> <exclusions> <exclusion> <artifactId>jcl-over-slf4j</artifactId> <groupId>org.slf4j</groupId> </exclusion> <exclusion> <artifactId>javax.servlet-api</artifactId> <groupId>javax.servlet</groupId> </exclusion> <exclusion> <groupId>javax.servlet</groupId> <artifactId>servlet-api</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.binary.version}</artifactId> <version>${spark.version}</version> <exclusions> <exclusion> <artifactId>jcl-over-slf4j</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-all</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> </dependency> </dependencies> .... </project> org.apache.spark.streaming.kafka.KafkaReceiver is inside the spark-streaming-kafka jar file, so I'm not sure why I get the ClassNotFoundException. Please help. Thanks, Tim _____________________________________________________________________ The information transmitted in this message and its attachments (if any) is intended only for the person or entity to which it is addressed. The message may contain confidential and/or privileged material. Any review, retransmission, dissemination or other use of, or taking of any action in reliance upon this information, by persons or entities other than the intended recipient is prohibited. If you have received this in error, please contact the sender and delete this e-mail and associated material from any computer. The intended recipient of this e-mail may only use, reproduce, disclose or distribute the information contained in this e-mail and any attached files, with the permission of the sender. This message has been scanned for viruses. _____________________________________________________________________