Also, just to keep it simple, I am trying to use 1.6.0CDH5.7.0 in the pom.xml as the cluster I am trying to run on is CDH5.7.0 with spark 1.6.0.
Here is my pom setting: <cdh.spark.version>1.6.0-cdh5.7.0</cdh.spark.version> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>${cdh.spark.version}</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>${cdh.spark.version}</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>${cdh.spark.version}</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.10</artifactId> <version>${cdh.spark.version}</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.2.1</version> <scope>compile</scope> </dependency> But trying to execute the application throws errors like below: Exception in thread "main" java.lang.NoClassDefFoundError: kafka/cluster/BrokerEndPoint at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:90) at scala.Option.map(Option.scala:145) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:90) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:87) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:87) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:86) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:86) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:85) at scala.util.Either$RightProjection.flatMap(Either.scala:523) at org.apache.spark.streaming.kafka.KafkaCluster.findLeaders(KafkaCluster.scala:85) at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:179) at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:161) at org.apache.spark.streaming.kafka.KafkaCluster.getLatestLeaderOffsets(KafkaCluster.scala:150) at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:215) at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:211) at scala.util.Either$RightProjection.flatMap(Either.scala:523) at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484) at com.edgecast.engine.ConcurrentOps$.createDataStreamFromKafka(ConcurrentOps.scala:68) at com.edgecast.engine.ConcurrentOps$.startProcessing(ConcurrentOps.scala:32) at com.edgecast.engine.ProcessingEngine$.main(ProcessingEngine.scala:33) at com.edgecast.engine.ProcessingEngine.main(ProcessingEngine.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) Caused by: java.lang.ClassNotFoundException: kafka.cluster.BrokerEndPoint at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 38 more 16/06/23 11:09:53 INFO SparkContext: Invoking stop() from shutdown hook I've tried kafka version 0.8.2.0, 0.8.2.2, 0.9.0.0. With 0.9.0.0 the processing hangs much sooner. Can someone help with this error? regards Sunita On Wed, Jun 22, 2016 at 8:20 PM, Sunita Arvind <sunitarv...@gmail.com> wrote: > Hello Experts, > > I am getting this error repeatedly: > > 16/06/23 03:06:59 ERROR streaming.StreamingContext: Error starting the > context, marking it as stopped > java.lang.NullPointerException > at > com.typesafe.config.impl.SerializedConfigValue.writeOrigin(SerializedConfigValue.java:202) > at > com.typesafe.config.impl.ConfigImplUtil.writeOrigin(ConfigImplUtil.java:228) > at > com.typesafe.config.ConfigException.writeObject(ConfigException.java:58) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440) > at java.lang.Throwable.writeObject(Throwable.java:985) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1576) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350) > at > org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:141) > at > org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:141) > at > org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:141) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1251) > at > org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:142) > at > org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:554) > at > org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601) > at > org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600) > at > com.edgecast.engine.ProcessingEngine$$anonfun$main$1.apply(ProcessingEngine.scala:73) > at > com.edgecast.engine.ProcessingEngine$$anonfun$main$1.apply(ProcessingEngine.scala:67) > at scala.collection.immutable.List.foreach(List.scala:318) > at com.edgecast.engine.ProcessingEngine$.main(ProcessingEngine.scala:67) > at com.edgecast.engine.ProcessingEngine.main(ProcessingEngine.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542) > > > It seems to be a typical issue. All I am doing here is as below: > > Object ProcessingEngine{ > > def initializeSpark(customer:String):StreamingContext={ > LogHandler.log.info("InitialeSpark") > val custConf = ConfigFactory.load(customer + > ".conf").getConfig(customer).withFallback(AppConf) > implicit val sparkConf: SparkConf = new SparkConf().setAppName(customer) > val ssc: StreamingContext = new StreamingContext(sparkConf, > Seconds(custConf.getLong("batchDurSec"))) > ssc.checkpoint(custConf.getString("checkpointDir")) > ssc > } > > def createDataStreamFromKafka(customer:String, ssc: > StreamingContext):DStream[Array[Byte]]={ > val custConf = ConfigFactory.load(customer + > ".conf").getConfig(customer).withFallback(ConfigFactory.load()) > LogHandler.log.info("createDataStreamFromKafka") > KafkaUtils.createDirectStream[String, > Array[Byte], > StringDecoder, > DefaultDecoder]( > ssc, > Map[String, String]("metadata.broker.list" -> > custConf.getString("brokers"), "group.id" -> custConf.getString("groupId")), > Set(custConf.getString("topics"))) > > } > > def main(args: Array[String]): Unit = { > val AppConf = ConfigFactory.load() > LogHandler.log.info("Starting the processing Engine") > getListOfCustomers().foreach{cust => > implicit val ssc = initializeSpark(cust) > val FullStream = createDataStreamFromKafka(cust,ssc) > ConcurrentOps.determineAllSinks(cust, FullStream) > FullStream.checkpoint(Duration(AppConf.getLong("batchDurSec") * 2000)) > ssc.start() > ssc.awaitTermination() > } > > } > } > > I also tried putting all the initialization directly in main (not using > method calls for initializeSpark and createDataStreamFromKafka) and also > not putting in foreach and creating a single spark and streaming context. > However, the error persists. > > Appreciate any help. > > regards > Sunita >