i want to join all those logs in some manner. That's what i'm trying to do.

*Thanks*,
<https://in.linkedin.com/in/ramkumarcs31>


On Fri, Oct 30, 2015 at 4:57 PM, Saisai Shao <sai.sai.s...@gmail.com> wrote:

> I don't think Spark Streaming supports multiple streaming context in one
> jvm, you cannot use in such way. Instead you could run multiple streaming
> applications, since you're using Yarn.
>
> 2015年10月30日星期五,Ramkumar V <ramkumar.c...@gmail.com> 写道:
>
>> I found NPE is mainly because of im using the same JavaStreamingContext
>> for some other kafka stream. if i change the name , its running
>> successfully. how to run multiple JavaStreamingContext in a program ?  I'm
>> getting following exception if i run multiple JavaStreamingContext in
>> single file.
>>
>> 15/10/30 11:04:29 INFO yarn.ApplicationMaster: Final app status: FAILED,
>> exitCode: 15, (reason: User class threw exception:
>> java.lang.IllegalStateException: Only one StreamingContext may be started
>> in this JVM. Currently running StreamingContext was started
>> atorg.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:622)
>>
>>
>> *Thanks*,
>> <https://in.linkedin.com/in/ramkumarcs31>
>>
>>
>> On Fri, Oct 30, 2015 at 3:25 PM, Saisai Shao <sai.sai.s...@gmail.com>
>> wrote:
>>
>>> From the code, I think this field "rememberDuration" shouldn't be null,
>>> it will be verified at the start, unless some place changes it's value in
>>> the runtime that makes it null, but I cannot image how this happened. Maybe
>>> you could add some logs around the place where exception happens if you
>>> could reproduce it.
>>>
>>> On Fri, Oct 30, 2015 at 5:31 PM, Ramkumar V <ramkumar.c...@gmail.com>
>>> wrote:
>>>
>>>> No. this is the only exception that im getting multiple times in my
>>>> log. Also i was reading some other topics earlier but im not faced this 
>>>> NPE.
>>>>
>>>> *Thanks*,
>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>
>>>>
>>>> On Fri, Oct 30, 2015 at 2:50 PM, Saisai Shao <sai.sai.s...@gmail.com>
>>>> wrote:
>>>>
>>>>> I just did a local test with your code, seems everything is fine, the
>>>>> only difference is that I use the master branch, but I don't think it
>>>>> changes a lot in this part. Do you met any other exceptions or errors
>>>>> beside this one? Probably this is due to other exceptions that makes this
>>>>> system unstable.
>>>>>
>>>>> On Fri, Oct 30, 2015 at 5:13 PM, Ramkumar V <ramkumar.c...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> No, i dont have any special settings. if i keep only reading line in
>>>>>> my code, it's throwing NPE.
>>>>>>
>>>>>> *Thanks*,
>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>
>>>>>>
>>>>>> On Fri, Oct 30, 2015 at 2:14 PM, Saisai Shao <sai.sai.s...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Do you have any special settings, from your code, I don't think it
>>>>>>> will incur NPE at that place.
>>>>>>>
>>>>>>> On Fri, Oct 30, 2015 at 4:32 PM, Ramkumar V <ramkumar.c...@gmail.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> spark version - spark 1.4.1
>>>>>>>>
>>>>>>>> my code snippet:
>>>>>>>>
>>>>>>>> String brokers = "ip:port,ip:port";
>>>>>>>> String topics = "x,y,z";
>>>>>>>> HashSet<String> TopicsSet = new
>>>>>>>> HashSet<String>(Arrays.asList(topics.split(",")));
>>>>>>>> HashMap<String, String> kafkaParams = new HashMap<String, String>();
>>>>>>>> kafkaParams.put("metadata.broker.list", brokers);
>>>>>>>>
>>>>>>>> JavaPairInputDStream<String, String> messages =
>>>>>>>> KafkaUtils.createDirectStream(
>>>>>>>>            jssc,
>>>>>>>>            String.class,
>>>>>>>>            String.class,
>>>>>>>>            StringDecoder.class,
>>>>>>>>            StringDecoder.class,
>>>>>>>>            kafkaParams,
>>>>>>>>             TopicsSet
>>>>>>>>        );
>>>>>>>>
>>>>>>>> messages.foreachRDD(new Function<JavaPairRDD<String , String>,Void>
>>>>>>>> () {
>>>>>>>>             public Void call(JavaPairRDD<String , String> tuple) {
>>>>>>>>                 JavaRDD<String>rdd = tuple.values();
>>>>>>>>
>>>>>>>> rdd.saveAsTextFile("hdfs://myuser:8020/user/hdfs/output");
>>>>>>>>                 return null;
>>>>>>>>             }
>>>>>>>>        });
>>>>>>>>
>>>>>>>>
>>>>>>>> *Thanks*,
>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Oct 30, 2015 at 1:57 PM, Saisai Shao <
>>>>>>>> sai.sai.s...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> What Spark version are you using, also a small code snippet of how
>>>>>>>>> you use Spark Streaming would be greatly helpful.
>>>>>>>>>
>>>>>>>>> On Fri, Oct 30, 2015 at 3:57 PM, Ramkumar V <
>>>>>>>>> ramkumar.c...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> I can able to read and print few lines. Afterthat i'm getting
>>>>>>>>>> this exception. Any idea for this ?
>>>>>>>>>>
>>>>>>>>>> *Thanks*,
>>>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V <
>>>>>>>>>> ramkumar.c...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> I'm trying to read from kafka stream and printing it textfile.
>>>>>>>>>>> I'm using java over spark. I dont know why i'm getting the following
>>>>>>>>>>> exception. Also exception message is very abstract.  can anyone 
>>>>>>>>>>> please help
>>>>>>>>>>> me ?
>>>>>>>>>>>
>>>>>>>>>>> Log Trace :
>>>>>>>>>>>
>>>>>>>>>>> 15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job
>>>>>>>>>>> generator
>>>>>>>>>>> java.lang.NullPointerException
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>>         at
>>>>>>>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>>>>>>>>>>         at
>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>>>>>>>>>>         at
>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>>>>>>>>>>         at
>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>>>>>>>>>>         at
>>>>>>>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>>>>>>>>>>         at
>>>>>>>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>>>>>>>>>>         at org.apache.spark.streaming.scheduler.JobGenerator.org
>>>>>>>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>>>>>> 15/10/29 12:15:09 ERROR yarn.ApplicationMaster: User class threw
>>>>>>>>>>> exception: java.lang.NullPointerException
>>>>>>>>>>> java.lang.NullPointerException
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>>         at
>>>>>>>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>>>>>>>>>>         at
>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>>>>>>>>>>         at
>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>>>>>>>>>>         at
>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>>>>>>>>>>         at
>>>>>>>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>>>>>>>>>>         at
>>>>>>>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>>>>>>>>>>         at org.apache.spark.streaming.scheduler.JobGenerator.org
>>>>>>>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> *Thanks*,
>>>>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>

Reply via email to