The jars I am submitting are the following: bin/spark-submit --class topology.SimpleProcessingTopology --master spark://10.0.0.8:7077 --jars /tmp/spark_streaming-1.0-SNAPSHOT.jar /tmp//tmp/spark_streaming-1.0-SNAPSHOT.jar /tmp/streaming.properties
I’ve even tried using the spark.executor.extraClassPath option but unfortunately unsuccessfully. What do you mean by conflicting copies of Spark classes? Could you elaborate it? > On 1 Mar 2017, at 14:51, Sean Owen <so...@cloudera.com> wrote: > > What is the --jars you are submitting? You may have conflicting copies of > Spark classes that interfere. > > > On Wed, Mar 1, 2017, 14:20 Dominik Safaric <dominiksafa...@gmail.com > <mailto:dominiksafa...@gmail.com>> wrote: > I've been trying to submit a Spark Streaming application using spark-submit > to a cluster of mine consisting of a master and two worker nodes. The > application has been written in Scala, and build using Maven. Importantly, > the Maven build is configured to produce a fat JAR containing all > dependencies. Furthermore, the JAR has been distributed to all of nodes. The > streaming job has been submitted using the following command: > > bin/spark-submit --class topology.SimpleProcessingTopology --jars > /tmp/spark_streaming-1.0-SNAPSHOT.jar --master spark://10.0.0.8:7077 > <http://10.0.0.8:7077/> --verbose /tmp/spark_streaming-1.0-SNAPSHOT.jar > /tmp/streaming-benchmark.properties > where 10.0.0.8 is the IP address of the master node within the VNET. > > However, I keep getting the following exception while starting the streaming > application: > > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) > > Caused by: java.lang.ClassNotFoundException: > topology.SimpleProcessingTopology$$anonfun$main$1$$anonfun$apply$1 > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:348) > at > org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67) > I've checked the content of the JAR using jar tvf and as you can see in the > output below, it does contain the class in question. > > 1735 Wed Mar 01 12:29:20 UTC 2017 > topology/SimpleProcessingTopology$$anonfun$main$1.class > 702 Wed Mar 01 12:29:20 UTC 2017 topology/SimpleProcessingTopology.class > 2415 Wed Mar 01 12:29:20 UTC 2017 > topology/SimpleProcessingTopology$$anonfun$main$1$$anonfun$apply$1$$anonfun$apply$2.class > 2500 Wed Mar 01 12:29:20 UTC 2017 > topology/SimpleProcessingTopology$$anonfun$main$1$$anonfun$apply$1.class > 7045 Wed Mar 01 12:29:20 UTC 2017 topology/SimpleProcessingTopology$.class > This exception has been caused due to the anonymous function of the > foreachPartition call: > > rdd.foreachPartition(partition => { > val outTopic = props.getString("application.simple.kafka.out.topic") > val producer = new KafkaProducer[Array[Byte],Array[Byte]](kafkaParams) > partition.foreach(record => { > val producerRecord = new ProducerRecord[Array[Byte], > Array[Byte]](outTopic, record.key(), record.value()) > producer.send(producerRecord) > }) > producer.close() > }) > Unfortunately, I am not able to find the root cause of this since so far. > Hence, I would appreciate if anyone could help me out fixing this issue. >