Re: Exception while reading from kafka stream

2015-11-03 Thread Ramkumar V
Thanks a lot , it worked for me. I'm using single direct stream which
retrieves data from all the topic.

*Thanks*,



On Mon, Nov 2, 2015 at 8:13 PM, Cody Koeninger  wrote:

> combine topicsSet_1 and topicsSet_2 in a single createDirectStream call.
> Then you can use hasOffsetRanges to see what the topic for a given
> partition is.
>
> On Mon, Nov 2, 2015 at 7:26 AM, Ramkumar V 
> wrote:
>
>> if i try like below code snippet , it shows exception , how to avoid this
>> exception ? how to switch processing based on topic ?
>>
>> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
>> Durations.seconds(30));
>> HashSet topicsSet_1 = new
>> HashSet(Arrays.asList(topics.split(",")));
>> HashSet topicsSet_2 = new
>> HashSet(Arrays.asList(topics.split(",")));
>> HashMap kafkaParams = new HashMap();
>> kafkaParams.put("metadata.broker.list", brokers);
>> JavaPairInputDStream messages_1 =
>> KafkaUtils.createDirectStream(
>>jssc,
>>String.class,
>>String.class,
>>StringDecoder.class,
>>StringDecoder.class,
>>kafkaParams,
>>topicsSet_1
>>);
>>
>> JavaPairInputDStream messages_2 =
>> KafkaUtils.createDirectStream(
>>jssc,
>>String.class,
>>String.class,
>>StringDecoder.class,
>>StringDecoder.class,
>>kafkaParams,
>> topicsSet_2
>>);
>>
>> * Log Trace* :
>>
>> [ERROR] [11/02/2015 12:59:08.107] [Executor task launch worker-0]
>> [akka.tcp://sparkDriver@10.125.4.200:34251/user/CoarseGrainedScheduler]
>> swallowing exception during message send
>> (akka.remote.RemoteTransportExceptionNoStackTrace)
>> [ERROR] [11/02/2015 12:59:08.104] [Executor task launch worker-0]
>> [akka.tcp://sparkDriver@10.125.4.200:34251/user/CoarseGrainedScheduler]
>> swallowing exception during message send
>> (akka.remote.RemoteTransportExceptionNoStackTrace)
>> [ERROR] [11/02/2015 13:01:13.812] [Executor task launch worker-0]
>> [akka.tcp://sparkDriver@10.125.4.200:41039/user/CoarseGrainedScheduler]
>> swallowing exception during message send
>> (akka.remote.RemoteTransportExceptionNoStackTrace)
>> 15/11/02 12:59:05 ERROR yarn.ApplicationMaster: User class threw
>> exception: java.io.IOException: Failed to delete
>> somedomain/user/hdfs/spark_output/kyt_req/part-00055
>> 15/11/02 12:59:05 INFO yarn.ApplicationMaster: Final app status: FAILED,
>> exitCode: 15, (reason: User class threw exception: java.io.IOException:
>> Failed to delete somedomain/user/hdfs/spark_output/kyt_req/part-00055)
>> java.io.IOException: Failed on local exception:
>> java.io.InterruptedIOException: Interruped while waiting for IO on channel
>> java.nio.channels.SocketChannel[connected local=/10.125.4.200:40770
>> remote=somedomain]. 59994 millis timeout left.; Host Details : local host
>> is: "somedomain"; destination host is: "somedomain":8020;
>> java.io.IOException: Failed on local exception:
>> java.io.InterruptedIOException: Interruped while waiting for IO on channel
>> java.nio.channels.SocketChannel[connected local=/10.125.4.200:41898
>> remote=somedomain]. 59998 millis timeout left.; Host Details : local host
>> is: "somedomain"; destination host is: "somedomain;
>> 15/11/02 13:01:11 ERROR yarn.ApplicationMaster: User class threw
>> exception: java.lang.NullPointerException
>> 15/11/02 13:01:11 INFO yarn.ApplicationMaster: Final app status: FAILED,
>> exitCode: 15, (reason: User class threw exception:
>> java.lang.NullPointerException)
>> 15/11/02 13:01:13 INFO yarn.ApplicationMaster: Unregistering
>> ApplicationMaster with FAILED (diag message: User class threw exception:
>> java.lang.NullPointerException)
>> java.io.IOException: Failed on local exception:
>> java.io.InterruptedIOException: Interruped while waiting for IO on channel
>> java.nio.channels.SocketChannel[connected local=/10.125.4.224:40482
>> remote=somedomain]. 59991 millis timeout left.; Host Details : local host
>> is: "somedomain"; destination host is: "somedomain":8020;
>> [ERROR] [11/02/2015 12:59:08.102] [Executor task launch worker-0]
>> [akka.tcp://sparkDriver@10.125.4.200:34251/user/CoarseGrainedScheduler]
>> swallowing exception during message send
>> (akka.remote.RemoteTransportExceptionNoStackTrace)
>>
>>
>>
>> *Thanks*,
>> 
>>
>>
>> On Fri, Oct 30, 2015 at 7:34 PM, Cody Koeninger 
>> wrote:
>>
>>> Just put them all in one stream and switch processing based on the topic
>>>
>>> On Fri, Oct 30, 2015 at 6:29 AM, Ramkumar V 
>>> wrote:
>>>
 i want to join all those logs in some manner. That's what i'm trying to
 do.

 *Thanks*,
 


 On Fri, Oct 30, 2015 at 4:57 PM, Saisai Shao 

Re: Exception while reading from kafka stream

2015-11-02 Thread Cody Koeninger
combine topicsSet_1 and topicsSet_2 in a single createDirectStream call.
Then you can use hasOffsetRanges to see what the topic for a given
partition is.

On Mon, Nov 2, 2015 at 7:26 AM, Ramkumar V  wrote:

> if i try like below code snippet , it shows exception , how to avoid this
> exception ? how to switch processing based on topic ?
>
> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
> Durations.seconds(30));
> HashSet topicsSet_1 = new
> HashSet(Arrays.asList(topics.split(",")));
> HashSet topicsSet_2 = new
> HashSet(Arrays.asList(topics.split(",")));
> HashMap kafkaParams = new HashMap();
> kafkaParams.put("metadata.broker.list", brokers);
> JavaPairInputDStream messages_1 =
> KafkaUtils.createDirectStream(
>jssc,
>String.class,
>String.class,
>StringDecoder.class,
>StringDecoder.class,
>kafkaParams,
>topicsSet_1
>);
>
> JavaPairInputDStream messages_2 =
> KafkaUtils.createDirectStream(
>jssc,
>String.class,
>String.class,
>StringDecoder.class,
>StringDecoder.class,
>kafkaParams,
> topicsSet_2
>);
>
> * Log Trace* :
>
> [ERROR] [11/02/2015 12:59:08.107] [Executor task launch worker-0]
> [akka.tcp://sparkDriver@10.125.4.200:34251/user/CoarseGrainedScheduler]
> swallowing exception during message send
> (akka.remote.RemoteTransportExceptionNoStackTrace)
> [ERROR] [11/02/2015 12:59:08.104] [Executor task launch worker-0]
> [akka.tcp://sparkDriver@10.125.4.200:34251/user/CoarseGrainedScheduler]
> swallowing exception during message send
> (akka.remote.RemoteTransportExceptionNoStackTrace)
> [ERROR] [11/02/2015 13:01:13.812] [Executor task launch worker-0]
> [akka.tcp://sparkDriver@10.125.4.200:41039/user/CoarseGrainedScheduler]
> swallowing exception during message send
> (akka.remote.RemoteTransportExceptionNoStackTrace)
> 15/11/02 12:59:05 ERROR yarn.ApplicationMaster: User class threw
> exception: java.io.IOException: Failed to delete
> somedomain/user/hdfs/spark_output/kyt_req/part-00055
> 15/11/02 12:59:05 INFO yarn.ApplicationMaster: Final app status: FAILED,
> exitCode: 15, (reason: User class threw exception: java.io.IOException:
> Failed to delete somedomain/user/hdfs/spark_output/kyt_req/part-00055)
> java.io.IOException: Failed on local exception:
> java.io.InterruptedIOException: Interruped while waiting for IO on channel
> java.nio.channels.SocketChannel[connected local=/10.125.4.200:40770
> remote=somedomain]. 59994 millis timeout left.; Host Details : local host
> is: "somedomain"; destination host is: "somedomain":8020;
> java.io.IOException: Failed on local exception:
> java.io.InterruptedIOException: Interruped while waiting for IO on channel
> java.nio.channels.SocketChannel[connected local=/10.125.4.200:41898
> remote=somedomain]. 59998 millis timeout left.; Host Details : local host
> is: "somedomain"; destination host is: "somedomain;
> 15/11/02 13:01:11 ERROR yarn.ApplicationMaster: User class threw
> exception: java.lang.NullPointerException
> 15/11/02 13:01:11 INFO yarn.ApplicationMaster: Final app status: FAILED,
> exitCode: 15, (reason: User class threw exception:
> java.lang.NullPointerException)
> 15/11/02 13:01:13 INFO yarn.ApplicationMaster: Unregistering
> ApplicationMaster with FAILED (diag message: User class threw exception:
> java.lang.NullPointerException)
> java.io.IOException: Failed on local exception:
> java.io.InterruptedIOException: Interruped while waiting for IO on channel
> java.nio.channels.SocketChannel[connected local=/10.125.4.224:40482
> remote=somedomain]. 59991 millis timeout left.; Host Details : local host
> is: "somedomain"; destination host is: "somedomain":8020;
> [ERROR] [11/02/2015 12:59:08.102] [Executor task launch worker-0]
> [akka.tcp://sparkDriver@10.125.4.200:34251/user/CoarseGrainedScheduler]
> swallowing exception during message send
> (akka.remote.RemoteTransportExceptionNoStackTrace)
>
>
>
> *Thanks*,
> 
>
>
> On Fri, Oct 30, 2015 at 7:34 PM, Cody Koeninger 
> wrote:
>
>> Just put them all in one stream and switch processing based on the topic
>>
>> On Fri, Oct 30, 2015 at 6:29 AM, Ramkumar V 
>> wrote:
>>
>>> i want to join all those logs in some manner. That's what i'm trying to
>>> do.
>>>
>>> *Thanks*,
>>> 
>>>
>>>
>>> On Fri, Oct 30, 2015 at 4:57 PM, Saisai Shao 
>>> wrote:
>>>
 I don't think Spark Streaming supports multiple streaming context in
 one jvm, you cannot use in such way. Instead you could run multiple
 streaming applications, since you're using Yarn.

 2015年10月30日星期五,Ramkumar V  写道:

> I found NPE is mainly because of im using the

Re: Exception while reading from kafka stream

2015-11-02 Thread Ramkumar V
if i try like below code snippet , it shows exception , how to avoid this
exception ? how to switch processing based on topic ?

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
Durations.seconds(30));
HashSet topicsSet_1 = new
HashSet(Arrays.asList(topics.split(",")));
HashSet topicsSet_2 = new
HashSet(Arrays.asList(topics.split(",")));
HashMap kafkaParams = new HashMap();
kafkaParams.put("metadata.broker.list", brokers);
JavaPairInputDStream messages_1 =
KafkaUtils.createDirectStream(
   jssc,
   String.class,
   String.class,
   StringDecoder.class,
   StringDecoder.class,
   kafkaParams,
   topicsSet_1
   );

JavaPairInputDStream messages_2 =
KafkaUtils.createDirectStream(
   jssc,
   String.class,
   String.class,
   StringDecoder.class,
   StringDecoder.class,
   kafkaParams,
topicsSet_2
   );

* Log Trace* :

[ERROR] [11/02/2015 12:59:08.107] [Executor task launch worker-0]
[akka.tcp://sparkDriver@10.125.4.200:34251/user/CoarseGrainedScheduler]
swallowing exception during message send
(akka.remote.RemoteTransportExceptionNoStackTrace)
[ERROR] [11/02/2015 12:59:08.104] [Executor task launch worker-0]
[akka.tcp://sparkDriver@10.125.4.200:34251/user/CoarseGrainedScheduler]
swallowing exception during message send
(akka.remote.RemoteTransportExceptionNoStackTrace)
[ERROR] [11/02/2015 13:01:13.812] [Executor task launch worker-0]
[akka.tcp://sparkDriver@10.125.4.200:41039/user/CoarseGrainedScheduler]
swallowing exception during message send
(akka.remote.RemoteTransportExceptionNoStackTrace)
15/11/02 12:59:05 ERROR yarn.ApplicationMaster: User class threw exception:
java.io.IOException: Failed to delete
somedomain/user/hdfs/spark_output/kyt_req/part-00055
15/11/02 12:59:05 INFO yarn.ApplicationMaster: Final app status: FAILED,
exitCode: 15, (reason: User class threw exception: java.io.IOException:
Failed to delete somedomain/user/hdfs/spark_output/kyt_req/part-00055)
java.io.IOException: Failed on local exception:
java.io.InterruptedIOException: Interruped while waiting for IO on channel
java.nio.channels.SocketChannel[connected local=/10.125.4.200:40770
remote=somedomain]. 59994 millis timeout left.; Host Details : local host
is: "somedomain"; destination host is: "somedomain":8020;
java.io.IOException: Failed on local exception:
java.io.InterruptedIOException: Interruped while waiting for IO on channel
java.nio.channels.SocketChannel[connected local=/10.125.4.200:41898
remote=somedomain]. 59998 millis timeout left.; Host Details : local host
is: "somedomain"; destination host is: "somedomain;
15/11/02 13:01:11 ERROR yarn.ApplicationMaster: User class threw exception:
java.lang.NullPointerException
15/11/02 13:01:11 INFO yarn.ApplicationMaster: Final app status: FAILED,
exitCode: 15, (reason: User class threw exception:
java.lang.NullPointerException)
15/11/02 13:01:13 INFO yarn.ApplicationMaster: Unregistering
ApplicationMaster with FAILED (diag message: User class threw exception:
java.lang.NullPointerException)
java.io.IOException: Failed on local exception:
java.io.InterruptedIOException: Interruped while waiting for IO on channel
java.nio.channels.SocketChannel[connected local=/10.125.4.224:40482
remote=somedomain]. 59991 millis timeout left.; Host Details : local host
is: "somedomain"; destination host is: "somedomain":8020;
[ERROR] [11/02/2015 12:59:08.102] [Executor task launch worker-0]
[akka.tcp://sparkDriver@10.125.4.200:34251/user/CoarseGrainedScheduler]
swallowing exception during message send
(akka.remote.RemoteTransportExceptionNoStackTrace)



*Thanks*,



On Fri, Oct 30, 2015 at 7:34 PM, Cody Koeninger  wrote:

> Just put them all in one stream and switch processing based on the topic
>
> On Fri, Oct 30, 2015 at 6:29 AM, Ramkumar V 
> wrote:
>
>> i want to join all those logs in some manner. That's what i'm trying to
>> do.
>>
>> *Thanks*,
>> 
>>
>>
>> On Fri, Oct 30, 2015 at 4:57 PM, Saisai Shao 
>> wrote:
>>
>>> I don't think Spark Streaming supports multiple streaming context in one
>>> jvm, you cannot use in such way. Instead you could run multiple streaming
>>> applications, since you're using Yarn.
>>>
>>> 2015年10月30日星期五,Ramkumar V  写道:
>>>
 I found NPE is mainly because of im using the same JavaStreamingContext
 for some other kafka stream. if i change the name , its running
 successfully. how to run multiple JavaStreamingContext in a program ?  I'm
 getting following exception if i run multiple JavaStreamingContext in
 single file.

 15/10/30 11:04:29 INFO yarn.ApplicationMaster: Final app status:
 FAILED, exitCode: 15, (reason: User class threw exception:
 

Re: Exception while reading from kafka stream

2015-10-30 Thread Saisai Shao
What Spark version are you using, also a small code snippet of how you use
Spark Streaming would be greatly helpful.

On Fri, Oct 30, 2015 at 3:57 PM, Ramkumar V  wrote:

> I can able to read and print few lines. Afterthat i'm getting this
> exception. Any idea for this ?
>
> *Thanks*,
> 
>
>
> On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V 
> wrote:
>
>> Hi,
>>
>> I'm trying to read from kafka stream and printing it textfile. I'm using
>> java over spark. I dont know why i'm getting the following exception.
>> Also exception message is very abstract.  can anyone please help me ?
>>
>> Log Trace :
>>
>> 15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job generator
>> java.lang.NullPointerException
>> at
>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>> at
>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>> at
>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>> at
>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>> at
>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>> at
>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>> at
>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>> at
>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>> at
>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>> at
>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>> at org.apache.spark.streaming.scheduler.JobGenerator.org
>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>> at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>> at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>> 15/10/29 12:15:09 ERROR yarn.ApplicationMaster: User class threw
>> exception: java.lang.NullPointerException
>> java.lang.NullPointerException
>> at
>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>> at
>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>> at
>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>> at
>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>> at
>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>> at
>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>> at
>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>> at
>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>> at
>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>> at
>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>> at org.apache.spark.streaming.scheduler.JobGenerator.org
>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>> at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>> at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>
>>
>>
>> *Thanks*,
>> 
>>
>>
>


Re: Exception while reading from kafka stream

2015-10-30 Thread Ramkumar V
spark version - spark 1.4.1

my code snippet:

String brokers = "ip:port,ip:port";
String topics = "x,y,z";
HashSet TopicsSet = new
HashSet(Arrays.asList(topics.split(",")));
HashMap kafkaParams = new HashMap();
kafkaParams.put("metadata.broker.list", brokers);

JavaPairInputDStream messages =
KafkaUtils.createDirectStream(
   jssc,
   String.class,
   String.class,
   StringDecoder.class,
   StringDecoder.class,
   kafkaParams,
TopicsSet
   );

messages.foreachRDD(new Function () {
public Void call(JavaPairRDD tuple) {
JavaRDDrdd = tuple.values();
rdd.saveAsTextFile("hdfs://myuser:8020/user/hdfs/output");
return null;
}
   });


*Thanks*,



On Fri, Oct 30, 2015 at 1:57 PM, Saisai Shao  wrote:

> What Spark version are you using, also a small code snippet of how you use
> Spark Streaming would be greatly helpful.
>
> On Fri, Oct 30, 2015 at 3:57 PM, Ramkumar V 
> wrote:
>
>> I can able to read and print few lines. Afterthat i'm getting this
>> exception. Any idea for this ?
>>
>> *Thanks*,
>> 
>>
>>
>> On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V 
>> wrote:
>>
>>> Hi,
>>>
>>> I'm trying to read from kafka stream and printing it textfile. I'm using
>>> java over spark. I dont know why i'm getting the following exception.
>>> Also exception message is very abstract.  can anyone please help me ?
>>>
>>> Log Trace :
>>>
>>> 15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job generator
>>> java.lang.NullPointerException
>>> at
>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>> at
>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>> at
>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>> at
>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>> at
>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>> at
>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>> at
>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>> at
>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>> at
>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>> at
>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>> at org.apache.spark.streaming.scheduler.JobGenerator.org
>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>> at
>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>> at
>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>> at
>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>> 15/10/29 12:15:09 ERROR yarn.ApplicationMaster: User class threw
>>> exception: java.lang.NullPointerException
>>> java.lang.NullPointerException
>>> at
>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>> at
>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>> at
>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>> at
>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>> at
>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>> at
>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>> at
>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>> at
>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>> at
>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>> at
>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>> at org.apache.spark.streaming.scheduler.JobGenerator.org
>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>> at
>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>> at
>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>> at
>>> 

Re: Exception while reading from kafka stream

2015-10-30 Thread Ramkumar V
I can able to read and print few lines. Afterthat i'm getting this
exception. Any idea for this ?

*Thanks*,



On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V  wrote:

> Hi,
>
> I'm trying to read from kafka stream and printing it textfile. I'm using
> java over spark. I dont know why i'm getting the following exception.
> Also exception message is very abstract.  can anyone please help me ?
>
> Log Trace :
>
> 15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job generator
> java.lang.NullPointerException
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
> at
> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
> at
> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
> at
> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
> at
> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
> at
> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
> at
> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
> at
> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
> at
> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
> at org.apache.spark.streaming.scheduler.JobGenerator.org
> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> 15/10/29 12:15:09 ERROR yarn.ApplicationMaster: User class threw
> exception: java.lang.NullPointerException
> java.lang.NullPointerException
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
> at
> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
> at
> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
> at
> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
> at
> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
> at
> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
> at
> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
> at
> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
> at
> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
> at org.apache.spark.streaming.scheduler.JobGenerator.org
> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>
>
>
> *Thanks*,
> 
>
>


Re: Exception while reading from kafka stream

2015-10-30 Thread Saisai Shao
Do you have any special settings, from your code, I don't think it will
incur NPE at that place.

On Fri, Oct 30, 2015 at 4:32 PM, Ramkumar V  wrote:

> spark version - spark 1.4.1
>
> my code snippet:
>
> String brokers = "ip:port,ip:port";
> String topics = "x,y,z";
> HashSet TopicsSet = new
> HashSet(Arrays.asList(topics.split(",")));
> HashMap kafkaParams = new HashMap();
> kafkaParams.put("metadata.broker.list", brokers);
>
> JavaPairInputDStream messages =
> KafkaUtils.createDirectStream(
>jssc,
>String.class,
>String.class,
>StringDecoder.class,
>StringDecoder.class,
>kafkaParams,
> TopicsSet
>);
>
> messages.foreachRDD(new Function () {
> public Void call(JavaPairRDD tuple) {
> JavaRDDrdd = tuple.values();
> rdd.saveAsTextFile("hdfs://myuser:8020/user/hdfs/output");
> return null;
> }
>});
>
>
> *Thanks*,
> 
>
>
> On Fri, Oct 30, 2015 at 1:57 PM, Saisai Shao 
> wrote:
>
>> What Spark version are you using, also a small code snippet of how you
>> use Spark Streaming would be greatly helpful.
>>
>> On Fri, Oct 30, 2015 at 3:57 PM, Ramkumar V 
>> wrote:
>>
>>> I can able to read and print few lines. Afterthat i'm getting this
>>> exception. Any idea for this ?
>>>
>>> *Thanks*,
>>> 
>>>
>>>
>>> On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V 
>>> wrote:
>>>
 Hi,

 I'm trying to read from kafka stream and printing it textfile. I'm
 using java over spark. I dont know why i'm getting the following exception.
 Also exception message is very abstract.  can anyone please help me ?

 Log Trace :

 15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job generator
 java.lang.NullPointerException
 at
 org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
 at
 org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
 at
 scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
 at
 scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
 at
 scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
 at
 scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
 at
 scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
 at
 scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
 at
 org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
 at
 org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
 at org.apache.spark.streaming.scheduler.JobGenerator.org
 $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
 at
 org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
 at
 org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
 at
 org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
 15/10/29 12:15:09 ERROR yarn.ApplicationMaster: User class threw
 exception: java.lang.NullPointerException
 java.lang.NullPointerException
 at
 org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
 at
 org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
 at
 scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
 at
 scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
 at
 scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
 at
 scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
 at
 scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
 at
 scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
 at
 org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
 at
 org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
 at org.apache.spark.streaming.scheduler.JobGenerator.org
 

Re: Exception while reading from kafka stream

2015-10-30 Thread Saisai Shao
I just did a local test with your code, seems everything is fine, the only
difference is that I use the master branch, but I don't think it changes a
lot in this part. Do you met any other exceptions or errors beside this
one? Probably this is due to other exceptions that makes this system
unstable.

On Fri, Oct 30, 2015 at 5:13 PM, Ramkumar V  wrote:

> No, i dont have any special settings. if i keep only reading line in my
> code, it's throwing NPE.
>
> *Thanks*,
> 
>
>
> On Fri, Oct 30, 2015 at 2:14 PM, Saisai Shao 
> wrote:
>
>> Do you have any special settings, from your code, I don't think it will
>> incur NPE at that place.
>>
>> On Fri, Oct 30, 2015 at 4:32 PM, Ramkumar V 
>> wrote:
>>
>>> spark version - spark 1.4.1
>>>
>>> my code snippet:
>>>
>>> String brokers = "ip:port,ip:port";
>>> String topics = "x,y,z";
>>> HashSet TopicsSet = new
>>> HashSet(Arrays.asList(topics.split(",")));
>>> HashMap kafkaParams = new HashMap();
>>> kafkaParams.put("metadata.broker.list", brokers);
>>>
>>> JavaPairInputDStream messages =
>>> KafkaUtils.createDirectStream(
>>>jssc,
>>>String.class,
>>>String.class,
>>>StringDecoder.class,
>>>StringDecoder.class,
>>>kafkaParams,
>>> TopicsSet
>>>);
>>>
>>> messages.foreachRDD(new Function () {
>>> public Void call(JavaPairRDD tuple) {
>>> JavaRDDrdd = tuple.values();
>>>
>>> rdd.saveAsTextFile("hdfs://myuser:8020/user/hdfs/output");
>>> return null;
>>> }
>>>});
>>>
>>>
>>> *Thanks*,
>>> 
>>>
>>>
>>> On Fri, Oct 30, 2015 at 1:57 PM, Saisai Shao 
>>> wrote:
>>>
 What Spark version are you using, also a small code snippet of how you
 use Spark Streaming would be greatly helpful.

 On Fri, Oct 30, 2015 at 3:57 PM, Ramkumar V 
 wrote:

> I can able to read and print few lines. Afterthat i'm getting this
> exception. Any idea for this ?
>
> *Thanks*,
> 
>
>
> On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V 
> wrote:
>
>> Hi,
>>
>> I'm trying to read from kafka stream and printing it textfile. I'm
>> using java over spark. I dont know why i'm getting the following 
>> exception.
>> Also exception message is very abstract.  can anyone please help me ?
>>
>> Log Trace :
>>
>> 15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job generator
>> java.lang.NullPointerException
>> at
>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>> at
>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>> at
>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>> at
>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>> at
>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>> at
>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>> at
>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>> at
>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>> at
>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>> at
>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>> at org.apache.spark.streaming.scheduler.JobGenerator.org
>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>> at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>> at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>> at
>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>> 15/10/29 12:15:09 ERROR yarn.ApplicationMaster: User class threw
>> exception: java.lang.NullPointerException
>> java.lang.NullPointerException
>> at
>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>> at
>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>> at
>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)

Re: Exception while reading from kafka stream

2015-10-30 Thread Saisai Shao
I don't think Spark Streaming supports multiple streaming context in one
jvm, you cannot use in such way. Instead you could run multiple streaming
applications, since you're using Yarn.

2015年10月30日星期五,Ramkumar V  写道:

> I found NPE is mainly because of im using the same JavaStreamingContext
> for some other kafka stream. if i change the name , its running
> successfully. how to run multiple JavaStreamingContext in a program ?  I'm
> getting following exception if i run multiple JavaStreamingContext in
> single file.
>
> 15/10/30 11:04:29 INFO yarn.ApplicationMaster: Final app status: FAILED,
> exitCode: 15, (reason: User class threw exception:
> java.lang.IllegalStateException: Only one StreamingContext may be started
> in this JVM. Currently running StreamingContext was started
> atorg.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:622)
>
>
> *Thanks*,
> 
>
>
> On Fri, Oct 30, 2015 at 3:25 PM, Saisai Shao  > wrote:
>
>> From the code, I think this field "rememberDuration" shouldn't be null,
>> it will be verified at the start, unless some place changes it's value in
>> the runtime that makes it null, but I cannot image how this happened. Maybe
>> you could add some logs around the place where exception happens if you
>> could reproduce it.
>>
>> On Fri, Oct 30, 2015 at 5:31 PM, Ramkumar V > > wrote:
>>
>>> No. this is the only exception that im getting multiple times in my log.
>>> Also i was reading some other topics earlier but im not faced this NPE.
>>>
>>> *Thanks*,
>>> 
>>>
>>>
>>> On Fri, Oct 30, 2015 at 2:50 PM, Saisai Shao >> > wrote:
>>>
 I just did a local test with your code, seems everything is fine, the
 only difference is that I use the master branch, but I don't think it
 changes a lot in this part. Do you met any other exceptions or errors
 beside this one? Probably this is due to other exceptions that makes this
 system unstable.

 On Fri, Oct 30, 2015 at 5:13 PM, Ramkumar V > wrote:

> No, i dont have any special settings. if i keep only reading line in
> my code, it's throwing NPE.
>
> *Thanks*,
> 
>
>
> On Fri, Oct 30, 2015 at 2:14 PM, Saisai Shao  > wrote:
>
>> Do you have any special settings, from your code, I don't think it
>> will incur NPE at that place.
>>
>> On Fri, Oct 30, 2015 at 4:32 PM, Ramkumar V > > wrote:
>>
>>> spark version - spark 1.4.1
>>>
>>> my code snippet:
>>>
>>> String brokers = "ip:port,ip:port";
>>> String topics = "x,y,z";
>>> HashSet TopicsSet = new
>>> HashSet(Arrays.asList(topics.split(",")));
>>> HashMap kafkaParams = new HashMap();
>>> kafkaParams.put("metadata.broker.list", brokers);
>>>
>>> JavaPairInputDStream messages =
>>> KafkaUtils.createDirectStream(
>>>jssc,
>>>String.class,
>>>String.class,
>>>StringDecoder.class,
>>>StringDecoder.class,
>>>kafkaParams,
>>> TopicsSet
>>>);
>>>
>>> messages.foreachRDD(new Function
>>> () {
>>> public Void call(JavaPairRDD tuple) {
>>> JavaRDDrdd = tuple.values();
>>>
>>> rdd.saveAsTextFile("hdfs://myuser:8020/user/hdfs/output");
>>> return null;
>>> }
>>>});
>>>
>>>
>>> *Thanks*,
>>> 
>>>
>>>
>>> On Fri, Oct 30, 2015 at 1:57 PM, Saisai Shao >> > wrote:
>>>
 What Spark version are you using, also a small code snippet of how
 you use Spark Streaming would be greatly helpful.

 On Fri, Oct 30, 2015 at 3:57 PM, Ramkumar V <
 ramkumar.c...@gmail.com
 > wrote:

> I can able to read and print few lines. Afterthat i'm getting this
> exception. Any idea for this ?
>
> *Thanks*,
> 
>
>
> On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V <
> 

Re: Exception while reading from kafka stream

2015-10-30 Thread Ramkumar V
No. this is the only exception that im getting multiple times in my log.
Also i was reading some other topics earlier but im not faced this NPE.

*Thanks*,



On Fri, Oct 30, 2015 at 2:50 PM, Saisai Shao  wrote:

> I just did a local test with your code, seems everything is fine, the only
> difference is that I use the master branch, but I don't think it changes a
> lot in this part. Do you met any other exceptions or errors beside this
> one? Probably this is due to other exceptions that makes this system
> unstable.
>
> On Fri, Oct 30, 2015 at 5:13 PM, Ramkumar V 
> wrote:
>
>> No, i dont have any special settings. if i keep only reading line in my
>> code, it's throwing NPE.
>>
>> *Thanks*,
>> 
>>
>>
>> On Fri, Oct 30, 2015 at 2:14 PM, Saisai Shao 
>> wrote:
>>
>>> Do you have any special settings, from your code, I don't think it will
>>> incur NPE at that place.
>>>
>>> On Fri, Oct 30, 2015 at 4:32 PM, Ramkumar V 
>>> wrote:
>>>
 spark version - spark 1.4.1

 my code snippet:

 String brokers = "ip:port,ip:port";
 String topics = "x,y,z";
 HashSet TopicsSet = new
 HashSet(Arrays.asList(topics.split(",")));
 HashMap kafkaParams = new HashMap();
 kafkaParams.put("metadata.broker.list", brokers);

 JavaPairInputDStream messages =
 KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
 TopicsSet
);

 messages.foreachRDD(new Function () {
 public Void call(JavaPairRDD tuple) {
 JavaRDDrdd = tuple.values();

 rdd.saveAsTextFile("hdfs://myuser:8020/user/hdfs/output");
 return null;
 }
});


 *Thanks*,
 


 On Fri, Oct 30, 2015 at 1:57 PM, Saisai Shao 
 wrote:

> What Spark version are you using, also a small code snippet of how you
> use Spark Streaming would be greatly helpful.
>
> On Fri, Oct 30, 2015 at 3:57 PM, Ramkumar V 
> wrote:
>
>> I can able to read and print few lines. Afterthat i'm getting this
>> exception. Any idea for this ?
>>
>> *Thanks*,
>> 
>>
>>
>> On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V 
>> wrote:
>>
>>> Hi,
>>>
>>> I'm trying to read from kafka stream and printing it textfile. I'm
>>> using java over spark. I dont know why i'm getting the following 
>>> exception.
>>> Also exception message is very abstract.  can anyone please help me ?
>>>
>>> Log Trace :
>>>
>>> 15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job
>>> generator
>>> java.lang.NullPointerException
>>> at
>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>> at
>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>> at
>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>> at
>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>> at
>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>> at
>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>> at
>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>> at
>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>> at
>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>> at
>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>> at org.apache.spark.streaming.scheduler.JobGenerator.org
>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>> at
>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>> at
>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>> at
>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>> 15/10/29 12:15:09 ERROR yarn.ApplicationMaster: User class threw
>>> exception: java.lang.NullPointerException
>>> 

Re: Exception while reading from kafka stream

2015-10-30 Thread Saisai Shao
>From the code, I think this field "rememberDuration" shouldn't be null, it
will be verified at the start, unless some place changes it's value in the
runtime that makes it null, but I cannot image how this happened. Maybe you
could add some logs around the place where exception happens if you could
reproduce it.

On Fri, Oct 30, 2015 at 5:31 PM, Ramkumar V  wrote:

> No. this is the only exception that im getting multiple times in my log.
> Also i was reading some other topics earlier but im not faced this NPE.
>
> *Thanks*,
> 
>
>
> On Fri, Oct 30, 2015 at 2:50 PM, Saisai Shao 
> wrote:
>
>> I just did a local test with your code, seems everything is fine, the
>> only difference is that I use the master branch, but I don't think it
>> changes a lot in this part. Do you met any other exceptions or errors
>> beside this one? Probably this is due to other exceptions that makes this
>> system unstable.
>>
>> On Fri, Oct 30, 2015 at 5:13 PM, Ramkumar V 
>> wrote:
>>
>>> No, i dont have any special settings. if i keep only reading line in my
>>> code, it's throwing NPE.
>>>
>>> *Thanks*,
>>> 
>>>
>>>
>>> On Fri, Oct 30, 2015 at 2:14 PM, Saisai Shao 
>>> wrote:
>>>
 Do you have any special settings, from your code, I don't think it will
 incur NPE at that place.

 On Fri, Oct 30, 2015 at 4:32 PM, Ramkumar V 
 wrote:

> spark version - spark 1.4.1
>
> my code snippet:
>
> String brokers = "ip:port,ip:port";
> String topics = "x,y,z";
> HashSet TopicsSet = new
> HashSet(Arrays.asList(topics.split(",")));
> HashMap kafkaParams = new HashMap();
> kafkaParams.put("metadata.broker.list", brokers);
>
> JavaPairInputDStream messages =
> KafkaUtils.createDirectStream(
>jssc,
>String.class,
>String.class,
>StringDecoder.class,
>StringDecoder.class,
>kafkaParams,
> TopicsSet
>);
>
> messages.foreachRDD(new Function ()
> {
> public Void call(JavaPairRDD tuple) {
> JavaRDDrdd = tuple.values();
>
> rdd.saveAsTextFile("hdfs://myuser:8020/user/hdfs/output");
> return null;
> }
>});
>
>
> *Thanks*,
> 
>
>
> On Fri, Oct 30, 2015 at 1:57 PM, Saisai Shao 
> wrote:
>
>> What Spark version are you using, also a small code snippet of how
>> you use Spark Streaming would be greatly helpful.
>>
>> On Fri, Oct 30, 2015 at 3:57 PM, Ramkumar V 
>> wrote:
>>
>>> I can able to read and print few lines. Afterthat i'm getting this
>>> exception. Any idea for this ?
>>>
>>> *Thanks*,
>>> 
>>>
>>>
>>> On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V >> > wrote:
>>>
 Hi,

 I'm trying to read from kafka stream and printing it textfile. I'm
 using java over spark. I dont know why i'm getting the following 
 exception.
 Also exception message is very abstract.  can anyone please help me ?

 Log Trace :

 15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job
 generator
 java.lang.NullPointerException
 at
 org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
 at
 org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
 at
 scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
 at
 scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
 at
 scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
 at
 scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
 at
 scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
 at
 scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
 at
 org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
 at
 org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
 at org.apache.spark.streaming.scheduler.JobGenerator.org
 

Re: Exception while reading from kafka stream

2015-10-30 Thread Ramkumar V
I found NPE is mainly because of im using the same JavaStreamingContext for
some other kafka stream. if i change the name , its running successfully.
how to run multiple JavaStreamingContext in a program ?  I'm getting
following exception if i run multiple JavaStreamingContext in single file.

15/10/30 11:04:29 INFO yarn.ApplicationMaster: Final app status: FAILED,
exitCode: 15, (reason: User class threw exception:
java.lang.IllegalStateException: Only one StreamingContext may be started
in this JVM. Currently running StreamingContext was started
atorg.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:622)


*Thanks*,



On Fri, Oct 30, 2015 at 3:25 PM, Saisai Shao  wrote:

> From the code, I think this field "rememberDuration" shouldn't be null,
> it will be verified at the start, unless some place changes it's value in
> the runtime that makes it null, but I cannot image how this happened. Maybe
> you could add some logs around the place where exception happens if you
> could reproduce it.
>
> On Fri, Oct 30, 2015 at 5:31 PM, Ramkumar V 
> wrote:
>
>> No. this is the only exception that im getting multiple times in my log.
>> Also i was reading some other topics earlier but im not faced this NPE.
>>
>> *Thanks*,
>> 
>>
>>
>> On Fri, Oct 30, 2015 at 2:50 PM, Saisai Shao 
>> wrote:
>>
>>> I just did a local test with your code, seems everything is fine, the
>>> only difference is that I use the master branch, but I don't think it
>>> changes a lot in this part. Do you met any other exceptions or errors
>>> beside this one? Probably this is due to other exceptions that makes this
>>> system unstable.
>>>
>>> On Fri, Oct 30, 2015 at 5:13 PM, Ramkumar V 
>>> wrote:
>>>
 No, i dont have any special settings. if i keep only reading line in my
 code, it's throwing NPE.

 *Thanks*,
 


 On Fri, Oct 30, 2015 at 2:14 PM, Saisai Shao 
 wrote:

> Do you have any special settings, from your code, I don't think it
> will incur NPE at that place.
>
> On Fri, Oct 30, 2015 at 4:32 PM, Ramkumar V 
> wrote:
>
>> spark version - spark 1.4.1
>>
>> my code snippet:
>>
>> String brokers = "ip:port,ip:port";
>> String topics = "x,y,z";
>> HashSet TopicsSet = new
>> HashSet(Arrays.asList(topics.split(",")));
>> HashMap kafkaParams = new HashMap();
>> kafkaParams.put("metadata.broker.list", brokers);
>>
>> JavaPairInputDStream messages =
>> KafkaUtils.createDirectStream(
>>jssc,
>>String.class,
>>String.class,
>>StringDecoder.class,
>>StringDecoder.class,
>>kafkaParams,
>> TopicsSet
>>);
>>
>> messages.foreachRDD(new Function
>> () {
>> public Void call(JavaPairRDD tuple) {
>> JavaRDDrdd = tuple.values();
>>
>> rdd.saveAsTextFile("hdfs://myuser:8020/user/hdfs/output");
>> return null;
>> }
>>});
>>
>>
>> *Thanks*,
>> 
>>
>>
>> On Fri, Oct 30, 2015 at 1:57 PM, Saisai Shao 
>> wrote:
>>
>>> What Spark version are you using, also a small code snippet of how
>>> you use Spark Streaming would be greatly helpful.
>>>
>>> On Fri, Oct 30, 2015 at 3:57 PM, Ramkumar V >> > wrote:
>>>
 I can able to read and print few lines. Afterthat i'm getting this
 exception. Any idea for this ?

 *Thanks*,
 


 On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V <
 ramkumar.c...@gmail.com> wrote:

> Hi,
>
> I'm trying to read from kafka stream and printing it textfile. I'm
> using java over spark. I dont know why i'm getting the following 
> exception.
> Also exception message is very abstract.  can anyone please help me ?
>
> Log Trace :
>
> 15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job
> generator
> java.lang.NullPointerException
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
> at
> 

Re: Exception while reading from kafka stream

2015-10-30 Thread Cody Koeninger
Just put them all in one stream and switch processing based on the topic

On Fri, Oct 30, 2015 at 6:29 AM, Ramkumar V  wrote:

> i want to join all those logs in some manner. That's what i'm trying to do.
>
> *Thanks*,
> 
>
>
> On Fri, Oct 30, 2015 at 4:57 PM, Saisai Shao 
> wrote:
>
>> I don't think Spark Streaming supports multiple streaming context in one
>> jvm, you cannot use in such way. Instead you could run multiple streaming
>> applications, since you're using Yarn.
>>
>> 2015年10月30日星期五,Ramkumar V  写道:
>>
>>> I found NPE is mainly because of im using the same JavaStreamingContext
>>> for some other kafka stream. if i change the name , its running
>>> successfully. how to run multiple JavaStreamingContext in a program ?  I'm
>>> getting following exception if i run multiple JavaStreamingContext in
>>> single file.
>>>
>>> 15/10/30 11:04:29 INFO yarn.ApplicationMaster: Final app status: FAILED,
>>> exitCode: 15, (reason: User class threw exception:
>>> java.lang.IllegalStateException: Only one StreamingContext may be started
>>> in this JVM. Currently running StreamingContext was started
>>> atorg.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:622)
>>>
>>>
>>> *Thanks*,
>>> 
>>>
>>>
>>> On Fri, Oct 30, 2015 at 3:25 PM, Saisai Shao 
>>> wrote:
>>>
 From the code, I think this field "rememberDuration" shouldn't be
 null, it will be verified at the start, unless some place changes it's
 value in the runtime that makes it null, but I cannot image how this
 happened. Maybe you could add some logs around the place where exception
 happens if you could reproduce it.

 On Fri, Oct 30, 2015 at 5:31 PM, Ramkumar V 
 wrote:

> No. this is the only exception that im getting multiple times in my
> log. Also i was reading some other topics earlier but im not faced this 
> NPE.
>
> *Thanks*,
> 
>
>
> On Fri, Oct 30, 2015 at 2:50 PM, Saisai Shao 
> wrote:
>
>> I just did a local test with your code, seems everything is fine, the
>> only difference is that I use the master branch, but I don't think it
>> changes a lot in this part. Do you met any other exceptions or errors
>> beside this one? Probably this is due to other exceptions that makes this
>> system unstable.
>>
>> On Fri, Oct 30, 2015 at 5:13 PM, Ramkumar V 
>> wrote:
>>
>>> No, i dont have any special settings. if i keep only reading line in
>>> my code, it's throwing NPE.
>>>
>>> *Thanks*,
>>> 
>>>
>>>
>>> On Fri, Oct 30, 2015 at 2:14 PM, Saisai Shao >> > wrote:
>>>
 Do you have any special settings, from your code, I don't think it
 will incur NPE at that place.

 On Fri, Oct 30, 2015 at 4:32 PM, Ramkumar V <
 ramkumar.c...@gmail.com> wrote:

> spark version - spark 1.4.1
>
> my code snippet:
>
> String brokers = "ip:port,ip:port";
> String topics = "x,y,z";
> HashSet TopicsSet = new
> HashSet(Arrays.asList(topics.split(",")));
> HashMap kafkaParams = new HashMap String>();
> kafkaParams.put("metadata.broker.list", brokers);
>
> JavaPairInputDStream messages =
> KafkaUtils.createDirectStream(
>jssc,
>String.class,
>String.class,
>StringDecoder.class,
>StringDecoder.class,
>kafkaParams,
> TopicsSet
>);
>
> messages.foreachRDD(new Function,Void> () {
> public Void call(JavaPairRDD tuple) {
> JavaRDDrdd = tuple.values();
>
> rdd.saveAsTextFile("hdfs://myuser:8020/user/hdfs/output");
> return null;
> }
>});
>
>
> *Thanks*,
> 
>
>
> On Fri, Oct 30, 2015 at 1:57 PM, Saisai Shao <
> sai.sai.s...@gmail.com> wrote:
>
>> What Spark version are you using, also a small code snippet of
>> how you use Spark Streaming would be greatly helpful.
>>
>> On Fri, Oct 30, 2015 at 3:57 PM, Ramkumar V <
>> ramkumar.c...@gmail.com> wrote:
>>
>>> I can able to read and print few lines. Afterthat i'm getting
>>> this exception. Any idea for this ?

Re: Exception while reading from kafka stream

2015-10-30 Thread Ramkumar V
No, i dont have any special settings. if i keep only reading line in my
code, it's throwing NPE.

*Thanks*,



On Fri, Oct 30, 2015 at 2:14 PM, Saisai Shao  wrote:

> Do you have any special settings, from your code, I don't think it will
> incur NPE at that place.
>
> On Fri, Oct 30, 2015 at 4:32 PM, Ramkumar V 
> wrote:
>
>> spark version - spark 1.4.1
>>
>> my code snippet:
>>
>> String brokers = "ip:port,ip:port";
>> String topics = "x,y,z";
>> HashSet TopicsSet = new
>> HashSet(Arrays.asList(topics.split(",")));
>> HashMap kafkaParams = new HashMap();
>> kafkaParams.put("metadata.broker.list", brokers);
>>
>> JavaPairInputDStream messages =
>> KafkaUtils.createDirectStream(
>>jssc,
>>String.class,
>>String.class,
>>StringDecoder.class,
>>StringDecoder.class,
>>kafkaParams,
>> TopicsSet
>>);
>>
>> messages.foreachRDD(new Function () {
>> public Void call(JavaPairRDD tuple) {
>> JavaRDDrdd = tuple.values();
>> rdd.saveAsTextFile("hdfs://myuser:8020/user/hdfs/output");
>> return null;
>> }
>>});
>>
>>
>> *Thanks*,
>> 
>>
>>
>> On Fri, Oct 30, 2015 at 1:57 PM, Saisai Shao 
>> wrote:
>>
>>> What Spark version are you using, also a small code snippet of how you
>>> use Spark Streaming would be greatly helpful.
>>>
>>> On Fri, Oct 30, 2015 at 3:57 PM, Ramkumar V 
>>> wrote:
>>>
 I can able to read and print few lines. Afterthat i'm getting this
 exception. Any idea for this ?

 *Thanks*,
 


 On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V 
 wrote:

> Hi,
>
> I'm trying to read from kafka stream and printing it textfile. I'm
> using java over spark. I dont know why i'm getting the following 
> exception.
> Also exception message is very abstract.  can anyone please help me ?
>
> Log Trace :
>
> 15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job generator
> java.lang.NullPointerException
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
> at
> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
> at
> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
> at
> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
> at
> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
> at
> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
> at
> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
> at
> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
> at
> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
> at org.apache.spark.streaming.scheduler.JobGenerator.org
> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
> at
> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> 15/10/29 12:15:09 ERROR yarn.ApplicationMaster: User class threw
> exception: java.lang.NullPointerException
> java.lang.NullPointerException
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
> at
> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
> at
> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
> at
> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
> at
> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
> at
> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
> at
> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>   

Re: Exception while reading from kafka stream

2015-10-30 Thread Ramkumar V
In general , i need to consume five different type of logs from kafka in
spark. I have different set of topics for each log. How to start five
different stream in spark ?

*Thanks*,



On Fri, Oct 30, 2015 at 4:40 PM, Ramkumar V  wrote:

> I found NPE is mainly because of im using the same JavaStreamingContext
> for some other kafka stream. if i change the name , its running
> successfully. how to run multiple JavaStreamingContext in a program ?  I'm
> getting following exception if i run multiple JavaStreamingContext in
> single file.
>
> 15/10/30 11:04:29 INFO yarn.ApplicationMaster: Final app status: FAILED,
> exitCode: 15, (reason: User class threw exception:
> java.lang.IllegalStateException: Only one StreamingContext may be started
> in this JVM. Currently running StreamingContext was started
> atorg.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:622)
>
>
> *Thanks*,
> 
>
>
> On Fri, Oct 30, 2015 at 3:25 PM, Saisai Shao 
> wrote:
>
>> From the code, I think this field "rememberDuration" shouldn't be null,
>> it will be verified at the start, unless some place changes it's value in
>> the runtime that makes it null, but I cannot image how this happened. Maybe
>> you could add some logs around the place where exception happens if you
>> could reproduce it.
>>
>> On Fri, Oct 30, 2015 at 5:31 PM, Ramkumar V 
>> wrote:
>>
>>> No. this is the only exception that im getting multiple times in my log.
>>> Also i was reading some other topics earlier but im not faced this NPE.
>>>
>>> *Thanks*,
>>> 
>>>
>>>
>>> On Fri, Oct 30, 2015 at 2:50 PM, Saisai Shao 
>>> wrote:
>>>
 I just did a local test with your code, seems everything is fine, the
 only difference is that I use the master branch, but I don't think it
 changes a lot in this part. Do you met any other exceptions or errors
 beside this one? Probably this is due to other exceptions that makes this
 system unstable.

 On Fri, Oct 30, 2015 at 5:13 PM, Ramkumar V 
 wrote:

> No, i dont have any special settings. if i keep only reading line in
> my code, it's throwing NPE.
>
> *Thanks*,
> 
>
>
> On Fri, Oct 30, 2015 at 2:14 PM, Saisai Shao 
> wrote:
>
>> Do you have any special settings, from your code, I don't think it
>> will incur NPE at that place.
>>
>> On Fri, Oct 30, 2015 at 4:32 PM, Ramkumar V 
>> wrote:
>>
>>> spark version - spark 1.4.1
>>>
>>> my code snippet:
>>>
>>> String brokers = "ip:port,ip:port";
>>> String topics = "x,y,z";
>>> HashSet TopicsSet = new
>>> HashSet(Arrays.asList(topics.split(",")));
>>> HashMap kafkaParams = new HashMap();
>>> kafkaParams.put("metadata.broker.list", brokers);
>>>
>>> JavaPairInputDStream messages =
>>> KafkaUtils.createDirectStream(
>>>jssc,
>>>String.class,
>>>String.class,
>>>StringDecoder.class,
>>>StringDecoder.class,
>>>kafkaParams,
>>> TopicsSet
>>>);
>>>
>>> messages.foreachRDD(new Function
>>> () {
>>> public Void call(JavaPairRDD tuple) {
>>> JavaRDDrdd = tuple.values();
>>>
>>> rdd.saveAsTextFile("hdfs://myuser:8020/user/hdfs/output");
>>> return null;
>>> }
>>>});
>>>
>>>
>>> *Thanks*,
>>> 
>>>
>>>
>>> On Fri, Oct 30, 2015 at 1:57 PM, Saisai Shao >> > wrote:
>>>
 What Spark version are you using, also a small code snippet of how
 you use Spark Streaming would be greatly helpful.

 On Fri, Oct 30, 2015 at 3:57 PM, Ramkumar V <
 ramkumar.c...@gmail.com> wrote:

> I can able to read and print few lines. Afterthat i'm getting this
> exception. Any idea for this ?
>
> *Thanks*,
> 
>
>
> On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V <
> ramkumar.c...@gmail.com> wrote:
>
>> Hi,
>>
>> I'm trying to read from kafka stream and printing it textfile.
>> I'm using java over spark. I dont know why i'm getting the following
>> exception. Also exception message is very abstract.  can anyone 
>> please help
>> me ?
>>
>> Log Trace :
>>
>> 15/10/29 12:15:09 

Re: Exception while reading from kafka stream

2015-10-30 Thread Ramkumar V
i want to join all those logs in some manner. That's what i'm trying to do.

*Thanks*,



On Fri, Oct 30, 2015 at 4:57 PM, Saisai Shao  wrote:

> I don't think Spark Streaming supports multiple streaming context in one
> jvm, you cannot use in such way. Instead you could run multiple streaming
> applications, since you're using Yarn.
>
> 2015年10月30日星期五,Ramkumar V  写道:
>
>> I found NPE is mainly because of im using the same JavaStreamingContext
>> for some other kafka stream. if i change the name , its running
>> successfully. how to run multiple JavaStreamingContext in a program ?  I'm
>> getting following exception if i run multiple JavaStreamingContext in
>> single file.
>>
>> 15/10/30 11:04:29 INFO yarn.ApplicationMaster: Final app status: FAILED,
>> exitCode: 15, (reason: User class threw exception:
>> java.lang.IllegalStateException: Only one StreamingContext may be started
>> in this JVM. Currently running StreamingContext was started
>> atorg.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:622)
>>
>>
>> *Thanks*,
>> 
>>
>>
>> On Fri, Oct 30, 2015 at 3:25 PM, Saisai Shao 
>> wrote:
>>
>>> From the code, I think this field "rememberDuration" shouldn't be null,
>>> it will be verified at the start, unless some place changes it's value in
>>> the runtime that makes it null, but I cannot image how this happened. Maybe
>>> you could add some logs around the place where exception happens if you
>>> could reproduce it.
>>>
>>> On Fri, Oct 30, 2015 at 5:31 PM, Ramkumar V 
>>> wrote:
>>>
 No. this is the only exception that im getting multiple times in my
 log. Also i was reading some other topics earlier but im not faced this 
 NPE.

 *Thanks*,
 


 On Fri, Oct 30, 2015 at 2:50 PM, Saisai Shao 
 wrote:

> I just did a local test with your code, seems everything is fine, the
> only difference is that I use the master branch, but I don't think it
> changes a lot in this part. Do you met any other exceptions or errors
> beside this one? Probably this is due to other exceptions that makes this
> system unstable.
>
> On Fri, Oct 30, 2015 at 5:13 PM, Ramkumar V 
> wrote:
>
>> No, i dont have any special settings. if i keep only reading line in
>> my code, it's throwing NPE.
>>
>> *Thanks*,
>> 
>>
>>
>> On Fri, Oct 30, 2015 at 2:14 PM, Saisai Shao 
>> wrote:
>>
>>> Do you have any special settings, from your code, I don't think it
>>> will incur NPE at that place.
>>>
>>> On Fri, Oct 30, 2015 at 4:32 PM, Ramkumar V >> > wrote:
>>>
 spark version - spark 1.4.1

 my code snippet:

 String brokers = "ip:port,ip:port";
 String topics = "x,y,z";
 HashSet TopicsSet = new
 HashSet(Arrays.asList(topics.split(",")));
 HashMap kafkaParams = new HashMap();
 kafkaParams.put("metadata.broker.list", brokers);

 JavaPairInputDStream messages =
 KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
 TopicsSet
);

 messages.foreachRDD(new Function
 () {
 public Void call(JavaPairRDD tuple) {
 JavaRDDrdd = tuple.values();

 rdd.saveAsTextFile("hdfs://myuser:8020/user/hdfs/output");
 return null;
 }
});


 *Thanks*,
 


 On Fri, Oct 30, 2015 at 1:57 PM, Saisai Shao <
 sai.sai.s...@gmail.com> wrote:

> What Spark version are you using, also a small code snippet of how
> you use Spark Streaming would be greatly helpful.
>
> On Fri, Oct 30, 2015 at 3:57 PM, Ramkumar V <
> ramkumar.c...@gmail.com> wrote:
>
>> I can able to read and print few lines. Afterthat i'm getting
>> this exception. Any idea for this ?
>>
>> *Thanks*,
>> 
>>
>>
>> On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V <
>> ramkumar.c...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I'm trying to read from kafka stream and 

Exception while reading from kafka stream

2015-10-29 Thread Ramkumar V
Hi,

I'm trying to read from kafka stream and printing it textfile. I'm using
java over spark. I dont know why i'm getting the following exception.
Also exception message is very abstract.  can anyone please help me ?

Log Trace :

15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job generator
java.lang.NullPointerException
at
org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
at
scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
at
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
at
scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
at
scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
at
scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
at scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
at
org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
at
org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
at org.apache.spark.streaming.scheduler.JobGenerator.org
$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
15/10/29 12:15:09 ERROR yarn.ApplicationMaster: User class threw exception:
java.lang.NullPointerException
java.lang.NullPointerException
at
org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
at
scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
at
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
at
scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
at
scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
at
scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
at scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
at
org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
at
org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
at org.apache.spark.streaming.scheduler.JobGenerator.org
$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)



*Thanks*,