I just did a local test with your code, seems everything is fine, the only
difference is that I use the master branch, but I don't think it changes a
lot in this part. Do you met any other exceptions or errors beside this
one? Probably this is due to other exceptions that makes this system
unstable.

On Fri, Oct 30, 2015 at 5:13 PM, Ramkumar V <ramkumar.c...@gmail.com> wrote:

> No, i dont have any special settings. if i keep only reading line in my
> code, it's throwing NPE.
>
> *Thanks*,
> <https://in.linkedin.com/in/ramkumarcs31>
>
>
> On Fri, Oct 30, 2015 at 2:14 PM, Saisai Shao <sai.sai.s...@gmail.com>
> wrote:
>
>> Do you have any special settings, from your code, I don't think it will
>> incur NPE at that place.
>>
>> On Fri, Oct 30, 2015 at 4:32 PM, Ramkumar V <ramkumar.c...@gmail.com>
>> wrote:
>>
>>> spark version - spark 1.4.1
>>>
>>> my code snippet:
>>>
>>> String brokers = "ip:port,ip:port";
>>> String topics = "x,y,z";
>>> HashSet<String> TopicsSet = new
>>> HashSet<String>(Arrays.asList(topics.split(",")));
>>> HashMap<String, String> kafkaParams = new HashMap<String, String>();
>>> kafkaParams.put("metadata.broker.list", brokers);
>>>
>>> JavaPairInputDStream<String, String> messages =
>>> KafkaUtils.createDirectStream(
>>>            jssc,
>>>            String.class,
>>>            String.class,
>>>            StringDecoder.class,
>>>            StringDecoder.class,
>>>            kafkaParams,
>>>             TopicsSet
>>>        );
>>>
>>> messages.foreachRDD(new Function<JavaPairRDD<String , String>,Void> () {
>>>             public Void call(JavaPairRDD<String , String> tuple) {
>>>                 JavaRDD<String>rdd = tuple.values();
>>>
>>> rdd.saveAsTextFile("hdfs://myuser:8020/user/hdfs/output");
>>>                 return null;
>>>             }
>>>        });
>>>
>>>
>>> *Thanks*,
>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>
>>>
>>> On Fri, Oct 30, 2015 at 1:57 PM, Saisai Shao <sai.sai.s...@gmail.com>
>>> wrote:
>>>
>>>> What Spark version are you using, also a small code snippet of how you
>>>> use Spark Streaming would be greatly helpful.
>>>>
>>>> On Fri, Oct 30, 2015 at 3:57 PM, Ramkumar V <ramkumar.c...@gmail.com>
>>>> wrote:
>>>>
>>>>> I can able to read and print few lines. Afterthat i'm getting this
>>>>> exception. Any idea for this ?
>>>>>
>>>>> *Thanks*,
>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>
>>>>>
>>>>> On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V <ramkumar.c...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I'm trying to read from kafka stream and printing it textfile. I'm
>>>>>> using java over spark. I dont know why i'm getting the following 
>>>>>> exception.
>>>>>> Also exception message is very abstract.  can anyone please help me ?
>>>>>>
>>>>>> Log Trace :
>>>>>>
>>>>>> 15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job generator
>>>>>> java.lang.NullPointerException
>>>>>>         at
>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>         at
>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>         at
>>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>>>>>         at
>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>>>>>         at
>>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>>>>>         at
>>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>>>>>         at
>>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>>>>>         at
>>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>>>>>         at
>>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>>>>>         at
>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>>>>>         at org.apache.spark.streaming.scheduler.JobGenerator.org
>>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>>>>>         at
>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>>>>>         at
>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>>>>>         at
>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>> 15/10/29 12:15:09 ERROR yarn.ApplicationMaster: User class threw
>>>>>> exception: java.lang.NullPointerException
>>>>>> java.lang.NullPointerException
>>>>>>         at
>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>         at
>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>         at
>>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>>>>>         at
>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>>>>>         at
>>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>>>>>         at
>>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>>>>>         at
>>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>>>>>         at
>>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>>>>>         at
>>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>>>>>         at
>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>>>>>         at org.apache.spark.streaming.scheduler.JobGenerator.org
>>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>>>>>         at
>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>>>>>         at
>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>>>>>         at
>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>
>>>>>>
>>>>>>
>>>>>> *Thanks*,
>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to