Do you have any special settings, from your code, I don't think it will incur NPE at that place.
On Fri, Oct 30, 2015 at 4:32 PM, Ramkumar V <ramkumar.c...@gmail.com> wrote: > spark version - spark 1.4.1 > > my code snippet: > > String brokers = "ip:port,ip:port"; > String topics = "x,y,z"; > HashSet<String> TopicsSet = new > HashSet<String>(Arrays.asList(topics.split(","))); > HashMap<String, String> kafkaParams = new HashMap<String, String>(); > kafkaParams.put("metadata.broker.list", brokers); > > JavaPairInputDStream<String, String> messages = > KafkaUtils.createDirectStream( > jssc, > String.class, > String.class, > StringDecoder.class, > StringDecoder.class, > kafkaParams, > TopicsSet > ); > > messages.foreachRDD(new Function<JavaPairRDD<String , String>,Void> () { > public Void call(JavaPairRDD<String , String> tuple) { > JavaRDD<String>rdd = tuple.values(); > rdd.saveAsTextFile("hdfs://myuser:8020/user/hdfs/output"); > return null; > } > }); > > > *Thanks*, > <https://in.linkedin.com/in/ramkumarcs31> > > > On Fri, Oct 30, 2015 at 1:57 PM, Saisai Shao <sai.sai.s...@gmail.com> > wrote: > >> What Spark version are you using, also a small code snippet of how you >> use Spark Streaming would be greatly helpful. >> >> On Fri, Oct 30, 2015 at 3:57 PM, Ramkumar V <ramkumar.c...@gmail.com> >> wrote: >> >>> I can able to read and print few lines. Afterthat i'm getting this >>> exception. Any idea for this ? >>> >>> *Thanks*, >>> <https://in.linkedin.com/in/ramkumarcs31> >>> >>> >>> On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V <ramkumar.c...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> >>>> I'm trying to read from kafka stream and printing it textfile. I'm >>>> using java over spark. I dont know why i'm getting the following exception. >>>> Also exception message is very abstract. can anyone please help me ? >>>> >>>> Log Trace : >>>> >>>> 15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job generator >>>> java.lang.NullPointerException >>>> at >>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172) >>>> at >>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172) >>>> at >>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225) >>>> at >>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) >>>> at >>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68) >>>> at >>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47) >>>> at >>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225) >>>> at >>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105) >>>> at >>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172) >>>> at >>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267) >>>> at org.apache.spark.streaming.scheduler.JobGenerator.org >>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178) >>>> at >>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83) >>>> at >>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82) >>>> at >>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) >>>> 15/10/29 12:15:09 ERROR yarn.ApplicationMaster: User class threw >>>> exception: java.lang.NullPointerException >>>> java.lang.NullPointerException >>>> at >>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172) >>>> at >>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172) >>>> at >>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225) >>>> at >>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) >>>> at >>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68) >>>> at >>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47) >>>> at >>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225) >>>> at >>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105) >>>> at >>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172) >>>> at >>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267) >>>> at org.apache.spark.streaming.scheduler.JobGenerator.org >>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178) >>>> at >>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83) >>>> at >>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82) >>>> at >>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) >>>> >>>> >>>> >>>> *Thanks*, >>>> <https://in.linkedin.com/in/ramkumarcs31> >>>> >>>> >>> >> >