Hi,

I have a spark kafka streaming application that works when I run with a local 
spark context, but not with a remote one.

My code consists of:
1.      A spring-boot application that creates the context
2.      A shaded jar file containing all of my spark code

On my pc (windows), I have a spark (1.5.1) master and worker running.

The entry point for my application is the start() method.

The code is:

  @throws(classOf[Exception])
  def start {
    val ssc: StreamingContext = createStreamingContext

    val messagesRDD = createKafkaDStream(ssc, "myTopic", 2)

    def datasRDD = messagesRDD.map((line : String) => 
MapFunctions.lineToSparkEventData(line))

    def count = datasRDD.count()
   datasRDD.print(1)

    ssc.start
    ssc.awaitTermination
  }


  private def createStreamingContext: StreamingContext = {
    System.setProperty(HADOOP_HOME_DIR, configContainer.getHadoopHomeDir)
    System.setProperty("spark.streaming.concurrentJobs", 
String.valueOf(configContainer.getStreamingConcurrentJobs))
    def sparkConf = createSparkConf()

    val ssc = new StreamingContext(sparkConf, 
Durations.seconds(configContainer.getStreamingContextDurationSeconds))
    ssc.sparkContext.setJobGroup("main_streaming_job", "streaming context 
start")
    ssc.sparkContext.setLocalProperty("spark.scheduler.pool", "real_time_pool")
    ssc
  }


  private def createSparkConf() : SparkConf = {
  def masterString = "spark://<<my_pc>>:7077"
    def conf = new SparkConf().setMaster(masterString).setAppName("devAppRem")  
// This is not working
//    def conf = new 
SparkConf().setMaster("local[4]").setAppName("devAppLocal")  // This IS working

    conf.set("spark.scheduler.allocation.file", 
"D:\\valid_path_to\\fairscheduler.xml");


    val pathToShadedApplicationJar: String = 
configContainer.getApplicationJarPaths.get(0)
    val jars: Array[String] = Array[String](pathToShadedApplicationJar)
    conf.setJars(jars)

    conf.set("spark.scheduler.mode", "FAIR")

  }


  private def createKafkaDStream(ssc: StreamingContext, topics: String, 
numThreads: Int): DStream[String] = {
    val zkQuorum: String = configContainer.getZkQuorum
    val groupId: String = configContainer.getGroupId
    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    def lines = KafkaUtils.createStream(ssc, zkQuorum, groupId, topicMap, 
StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
    lines
  }
}


The Error that I get is:

2015-11-18 10:41:19.191  WARN 3044 --- [result-getter-3] 
o.apache.spark.scheduler.TaskSetManager  : Lost task 0.0 in stage 2.0 (TID 70, 
169.254.95.56): java.io.IOException: java.lang.ClassNotFoundException: 
org.apache.spark.streaming.kafka.KafkaReceiver
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)
        at 
org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70)
        at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
        at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: 
org.apache.spark.streaming.kafka.KafkaReceiver
        at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:274)
        at 
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
        at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at 
java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
        at 
org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:74)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160)
        ... 19 more


I'm building using maven - to create a shaded jar file.

Properties in the parent pom are:

  <properties>
        <springframework.version>3.2.13.RELEASE</springframework.version>
        
<springframework.integration.version>2.2.6.RELEASE</springframework.integration.version>
        <ibm.mq.version>7.0.1.3</ibm.mq.version>
        <hibernate.version>3.5.6-Final</hibernate.version>
        <hibernate.jpa.version>1.0.0-CR-1</hibernate.jpa.version>
        <validation-api.version>1.0.0.GA</validation-api.version>
        <hibernate-validator.version>4.3.0.FINAL</hibernate-validator.version>
        <jackson.version>1.8.5</jackson.version>
        <powermock.version>1.4.12</powermock.version>
        <scala.binary.version>2.10</scala.binary.version>
        <spark.version>1.5.1</spark.version>
        <scala.version>2.10.5</scala.version>
    </properties>


The pom file for the shaded jar is:

<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>my-spark-client</artifactId>
...
    <build>
        <finalName>my-spark-client</finalName>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>org/datanucleus/**</exclude>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer 
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                                <transformer 
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"
 />
                            </transformers>
                            <artifactSet>
                                <excludes>
                                    <exclude>classworlds:classworlds</exclude>
                                    <exclude>junit:junit</exclude>
                                    <exclude>jmock:*</exclude>
                                </excludes>
                            </artifactSet>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.scalatest</groupId>
                <artifactId>scalatest-maven-plugin</artifactId>
                <version>1.0</version>
                <configuration>
                    
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
                    <junitxml>.</junitxml>
                    <filereports>WDF TestSuite.txt</filereports>
                </configuration>
                <executions>
                    <execution>
                        <id>test</id>
                        <goals>
                            <goal>test</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.0</version>
                <executions>
                    <execution>
                        <id>compile-scala</id>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>

                    <execution>
                        <id>compile-tests-scala</id>
                        <phase>compile</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

        </plugins>
    </build>

    <dependencies>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.5</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <scope>test</scope>
        </dependency>
...
...
        <dependency>
            <groupId>com.databricks</groupId>
            <artifactId>spark-csv_2.10</artifactId>
            <version>1.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>${scala.version}</version>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.scalatest</groupId>
            <artifactId>scalatest_${scala.binary.version}</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_${scala.binary.version}</artifactId>
            <version>0.8.2.1</version>
            <scope>compile</scope>
            <exclusions>
                <exclusion>
                    <artifactId>jmxri</artifactId>
                    <groupId>com.sun.jmx</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jms</artifactId>
                    <groupId>javax.jms</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jmxtools</artifactId>
                    <groupId>com.sun.jdmk</groupId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            
<artifactId>spark-streaming-kafka_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>jcl-over-slf4j</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>javax.servlet-api</artifactId>
                    <groupId>javax.servlet</groupId>
                </exclusion>
                <exclusion>
                    <groupId>javax.servlet</groupId>
                    <artifactId>servlet-api</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>jcl-over-slf4j</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <artifactId>jcl-over-slf4j</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>javax.servlet-api</artifactId>
                    <groupId>javax.servlet</groupId>
                </exclusion>
                <exclusion>
                    <groupId>javax.servlet</groupId>
                    <artifactId>servlet-api</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>jcl-over-slf4j</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.mockito</groupId>
            <artifactId>mockito-all</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
        </dependency>

    </dependencies>

....

</project>


org.apache.spark.streaming.kafka.KafkaReceiver is inside the 
spark-streaming-kafka jar file, so I'm not sure why I get the 
ClassNotFoundException.

Please help.

Thanks,
Tim





_____________________________________________________________________

The information transmitted in this message and its attachments (if any) is 
intended 
only for the person or entity to which it is addressed.
The message may contain confidential and/or privileged material. Any review, 
retransmission, dissemination or other use of, or taking of any action in 
reliance 
upon this information, by persons or entities other than the intended recipient 
is 
prohibited.

If you have received this in error, please contact the sender and delete this 
e-mail 
and associated material from any computer.

The intended recipient of this e-mail may only use, reproduce, disclose or 
distribute 
the information contained in this e-mail and any attached files, with the 
permission 
of the sender.

This message has been scanned for viruses.
_____________________________________________________________________

Reply via email to