Hi,
 
I have a spark kafka streaming application that works when I run with a
local spark context, but not with a remote one.
 
My code consists of: 
1.      A spring-boot application that creates the context
2.      A shaded jar file containing all of my spark code
 
On my pc (windows), I have a spark (1.5.2) master and worker running
(spark-1.5.2-bin-hadoop2.6).
 
The entry point for my application is the start() method.
 
The code is:
 
  @throws(classOf[Exception])
  def start {
    val ssc: StreamingContext = createStreamingContext
 
    val messagesRDD = createKafkaDStream(ssc, "myTopic", 2)
 
    def datasRDD = messagesRDD.map((line : String) =>
MapFunctions.lineToSparkEventData(line))
 
    def count = datasRDD.count()
   datasRDD.print(1)
 
    ssc.start
    ssc.awaitTermination
  }
 
 
  private def createStreamingContext: StreamingContext = {
    System.setProperty(HADOOP_HOME_DIR, configContainer.getHadoopHomeDir)
    System.setProperty("spark.streaming.concurrentJobs",
String.valueOf(configContainer.getStreamingConcurrentJobs))
    def sparkConf = createSparkConf()
 
    val ssc = new StreamingContext(sparkConf,
Durations.seconds(configContainer.getStreamingContextDurationSeconds))
    ssc.sparkContext.setJobGroup("main_streaming_job", "streaming context
start")
    ssc.sparkContext.setLocalProperty("spark.scheduler.pool",
"real_time_pool")
    ssc
  }
 
 
  private def createSparkConf() : SparkConf = {
  def masterString = "spark://<<my_pc>>:7077"
    def conf = new
SparkConf().setMaster(masterString).setAppName("devAppRem")  // This is not
working
//    def conf = new
SparkConf().setMaster("local[4]").setAppName("devAppLocal")  // This IS
working
 
    conf.set("spark.scheduler.allocation.file",
"D:\\valid_path_to\\fairscheduler.xml");
 
 
    val pathToShadedApplicationJar: String =
configContainer.getApplicationJarPaths.get(0)
    val jars: Array[String] = Array[String](pathToShadedApplicationJar)
    conf.setJars(jars)
 
    conf.set("spark.scheduler.mode", "FAIR")
 
  }
 
 
  private def createKafkaDStream(ssc: StreamingContext, topics: String,
numThreads: Int): DStream[String] = {
    val zkQuorum: String = configContainer.getZkQuorum
    val groupId: String = configContainer.getGroupId
    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    def lines = KafkaUtils.createStream(ssc, zkQuorum, groupId, topicMap,
StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
    lines
  }
}
 
 
The Error that I get is:
 
2015-11-18 12:58:33.755  WARN 6132 --- [result-getter-2]
o.apache.spark.scheduler.TaskSetManager  : Lost task 0.0 in stage 2.0 (TID
70, 169.254.95.56): java.io.IOException: java.lang.ClassNotFoundException:
org.apache.spark.streaming.kafka.KafkaReceiver
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)
        at
org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70)
        at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
        at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException:
org.apache.spark.streaming.kafka.KafkaReceiver
        at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:274)
        at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
        at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at 
java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
        at
org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:74)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160)
        ... 19 more 
 

I’m building using maven to create a shaded jar file.
 
Properties in the parent pom are:
 
  <properties>
        <springframework.version>3.2.13.RELEASE</springframework.version>
       
<springframework.integration.version>2.2.6.RELEASE</springframework.integration.version>
        <ibm.mq.version>7.0.1.3</ibm.mq.version>
        <hibernate.version>3.5.6-Final</hibernate.version>
        <hibernate.jpa.version>1.0.0-CR-1</hibernate.jpa.version>
        <validation-api.version>1.0.0.GA</validation-api.version>
       
<hibernate-validator.version>4.3.0.FINAL</hibernate-validator.version>
        <jackson.version>1.8.5</jackson.version>
        <powermock.version>1.4.12</powermock.version>
        <scala.binary.version>2.10</scala.binary.version>
        <spark.version>1.5.2</spark.version>
        <scala.version>2.10.5</scala.version>
    </properties>
 
 
The pom file for the shaded jar is:
 
<project xmlns="http://maven.apache.org/POM/4.0.0";
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd";>
    <modelVersion>4.0.0</modelVersion>
 
    <artifactId>my-spark-client</artifactId>
...
    <build>
        <finalName>my-spark-client</finalName>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                       
<exclude>org/datanucleus/**</exclude>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                                <transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"
/>
                            </transformers>
                            <artifactSet>
                                <excludes>
                                   
<exclude>classworlds:classworlds</exclude>
                                    <exclude>junit:junit</exclude>
                                    <exclude>jmock:*</exclude>
                                </excludes>
                            </artifactSet>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
 
            <plugin>
                <groupId>org.scalatest</groupId>
                <artifactId>scalatest-maven-plugin</artifactId>
                <version>1.0</version>
                <configuration>
                   
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
                    <junitxml>.</junitxml>
                    <filereports>WDF TestSuite.txt</filereports>
                </configuration>
                <executions>
                    <execution>
                        <id>test</id>
                        <goals>
                            <goal>test</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
 
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.0</version>
                <executions>
                    <execution>
                        <id>compile-scala</id>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
 
                    <execution>
                        <id>compile-tests-scala</id>
                        <phase>compile</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
 
        </plugins>
    </build>
 
    <dependencies>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.5</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <scope>test</scope>
        </dependency>
...
...
        <dependency>
            <groupId>com.databricks</groupId>
            <artifactId>spark-csv_2.10</artifactId>
            <version>1.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>${scala.version}</version>
            <scope>compile</scope>
        </dependency>
 
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.scalatest</groupId>
            <artifactId>scalatest_${scala.binary.version}</artifactId>
            <scope>test</scope>
        </dependency>
 
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_${scala.binary.version}</artifactId>
            <version>0.8.2.1</version>
            <scope>compile</scope>
            <exclusions>
                <exclusion>
                    <artifactId>jmxri</artifactId>
                    <groupId>com.sun.jmx</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jms</artifactId>
                    <groupId>javax.jms</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jmxtools</artifactId>
                    <groupId>com.sun.jdmk</groupId>
                </exclusion>
            </exclusions>
        </dependency>
 
        <dependency>
            <groupId>org.apache.spark</groupId>
           
<artifactId>spark-streaming-kafka_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>jcl-over-slf4j</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>javax.servlet-api</artifactId>
                    <groupId>javax.servlet</groupId>
                </exclusion>
                <exclusion>
                    <groupId>javax.servlet</groupId>
                    <artifactId>servlet-api</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>jcl-over-slf4j</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <artifactId>jcl-over-slf4j</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>javax.servlet-api</artifactId>
                    <groupId>javax.servlet</groupId>
                </exclusion>
                <exclusion>
                    <groupId>javax.servlet</groupId>
                    <artifactId>servlet-api</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
 
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>jcl-over-slf4j</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.mockito</groupId>
            <artifactId>mockito-all</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
        </dependency>
 
    </dependencies>
 
....
 
</project>
 
 
org.apache.spark.streaming.kafka.KafkaReceiver is inside the
spark-streaming-kafka jar file, so I’m not sure why I get the
ClassNotFoundException.
 
Please help.
 
Thanks,
Tim
 
 
 
 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/kafka-streaminf-1-5-2-ClassNotFoundException-org-apache-spark-streaming-kafka-KafkaReceiver-tp25408.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to