combine topicsSet_1 and topicsSet_2 in a single createDirectStream call. Then you can use hasOffsetRanges to see what the topic for a given partition is.
On Mon, Nov 2, 2015 at 7:26 AM, Ramkumar V <ramkumar.c...@gmail.com> wrote: > if i try like below code snippet , it shows exception , how to avoid this > exception ? how to switch processing based on topic ? > > JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, > Durations.seconds(30)); > HashSet<String> topicsSet_1 = new > HashSet<String>(Arrays.asList(topics.split(","))); > HashSet<String> topicsSet_2 = new > HashSet<String>(Arrays.asList(topics.split(","))); > HashMap<String, String> kafkaParams = new HashMap<String, String>(); > kafkaParams.put("metadata.broker.list", brokers); > JavaPairInputDStream<String, String> messages_1 = > KafkaUtils.createDirectStream( > jssc, > String.class, > String.class, > StringDecoder.class, > StringDecoder.class, > kafkaParams, > topicsSet_1 > ); > > JavaPairInputDStream<String, String> messages_2 = > KafkaUtils.createDirectStream( > jssc, > String.class, > String.class, > StringDecoder.class, > StringDecoder.class, > kafkaParams, > topicsSet_2 > ); > > * Log Trace* : > > [ERROR] [11/02/2015 12:59:08.107] [Executor task launch worker-0] > [akka.tcp://sparkDriver@10.125.4.200:34251/user/CoarseGrainedScheduler] > swallowing exception during message send > (akka.remote.RemoteTransportExceptionNoStackTrace) > [ERROR] [11/02/2015 12:59:08.104] [Executor task launch worker-0] > [akka.tcp://sparkDriver@10.125.4.200:34251/user/CoarseGrainedScheduler] > swallowing exception during message send > (akka.remote.RemoteTransportExceptionNoStackTrace) > [ERROR] [11/02/2015 13:01:13.812] [Executor task launch worker-0] > [akka.tcp://sparkDriver@10.125.4.200:41039/user/CoarseGrainedScheduler] > swallowing exception during message send > (akka.remote.RemoteTransportExceptionNoStackTrace) > 15/11/02 12:59:05 ERROR yarn.ApplicationMaster: User class threw > exception: java.io.IOException: Failed to delete > somedomain/user/hdfs/spark_output/kyt_req/part-00055 > 15/11/02 12:59:05 INFO yarn.ApplicationMaster: Final app status: FAILED, > exitCode: 15, (reason: User class threw exception: java.io.IOException: > Failed to delete somedomain/user/hdfs/spark_output/kyt_req/part-00055) > java.io.IOException: Failed on local exception: > java.io.InterruptedIOException: Interruped while waiting for IO on channel > java.nio.channels.SocketChannel[connected local=/10.125.4.200:40770 > remote=somedomain]. 59994 millis timeout left.; Host Details : local host > is: "somedomain"; destination host is: "somedomain":8020; > java.io.IOException: Failed on local exception: > java.io.InterruptedIOException: Interruped while waiting for IO on channel > java.nio.channels.SocketChannel[connected local=/10.125.4.200:41898 > remote=somedomain]. 59998 millis timeout left.; Host Details : local host > is: "somedomain"; destination host is: "somedomain; > 15/11/02 13:01:11 ERROR yarn.ApplicationMaster: User class threw > exception: java.lang.NullPointerException > 15/11/02 13:01:11 INFO yarn.ApplicationMaster: Final app status: FAILED, > exitCode: 15, (reason: User class threw exception: > java.lang.NullPointerException) > 15/11/02 13:01:13 INFO yarn.ApplicationMaster: Unregistering > ApplicationMaster with FAILED (diag message: User class threw exception: > java.lang.NullPointerException) > java.io.IOException: Failed on local exception: > java.io.InterruptedIOException: Interruped while waiting for IO on channel > java.nio.channels.SocketChannel[connected local=/10.125.4.224:40482 > remote=somedomain]. 59991 millis timeout left.; Host Details : local host > is: "somedomain"; destination host is: "somedomain":8020; > [ERROR] [11/02/2015 12:59:08.102] [Executor task launch worker-0] > [akka.tcp://sparkDriver@10.125.4.200:34251/user/CoarseGrainedScheduler] > swallowing exception during message send > (akka.remote.RemoteTransportExceptionNoStackTrace) > > > > *Thanks*, > <https://in.linkedin.com/in/ramkumarcs31> > > > On Fri, Oct 30, 2015 at 7:34 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> Just put them all in one stream and switch processing based on the topic >> >> On Fri, Oct 30, 2015 at 6:29 AM, Ramkumar V <ramkumar.c...@gmail.com> >> wrote: >> >>> i want to join all those logs in some manner. That's what i'm trying to >>> do. >>> >>> *Thanks*, >>> <https://in.linkedin.com/in/ramkumarcs31> >>> >>> >>> On Fri, Oct 30, 2015 at 4:57 PM, Saisai Shao <sai.sai.s...@gmail.com> >>> wrote: >>> >>>> I don't think Spark Streaming supports multiple streaming context in >>>> one jvm, you cannot use in such way. Instead you could run multiple >>>> streaming applications, since you're using Yarn. >>>> >>>> 2015年10月30日星期五,Ramkumar V <ramkumar.c...@gmail.com> 写道: >>>> >>>>> I found NPE is mainly because of im using the >>>>> same JavaStreamingContext for some other kafka stream. if i change the >>>>> name >>>>> , its running successfully. how to run multiple JavaStreamingContext in >>>>> a program ? I'm getting following exception if i run >>>>> multiple JavaStreamingContext in single file. >>>>> >>>>> 15/10/30 11:04:29 INFO yarn.ApplicationMaster: Final app status: >>>>> FAILED, exitCode: 15, (reason: User class threw exception: >>>>> java.lang.IllegalStateException: Only one StreamingContext may be started >>>>> in this JVM. Currently running StreamingContext was started >>>>> atorg.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:622) >>>>> >>>>> >>>>> *Thanks*, >>>>> <https://in.linkedin.com/in/ramkumarcs31> >>>>> >>>>> >>>>> On Fri, Oct 30, 2015 at 3:25 PM, Saisai Shao <sai.sai.s...@gmail.com> >>>>> wrote: >>>>> >>>>>> From the code, I think this field "rememberDuration" shouldn't be >>>>>> null, it will be verified at the start, unless some place changes it's >>>>>> value in the runtime that makes it null, but I cannot image how this >>>>>> happened. Maybe you could add some logs around the place where exception >>>>>> happens if you could reproduce it. >>>>>> >>>>>> On Fri, Oct 30, 2015 at 5:31 PM, Ramkumar V <ramkumar.c...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> No. this is the only exception that im getting multiple times in my >>>>>>> log. Also i was reading some other topics earlier but im not faced this >>>>>>> NPE. >>>>>>> >>>>>>> *Thanks*, >>>>>>> <https://in.linkedin.com/in/ramkumarcs31> >>>>>>> >>>>>>> >>>>>>> On Fri, Oct 30, 2015 at 2:50 PM, Saisai Shao <sai.sai.s...@gmail.com >>>>>>> > wrote: >>>>>>> >>>>>>>> I just did a local test with your code, seems everything is fine, >>>>>>>> the only difference is that I use the master branch, but I don't think >>>>>>>> it >>>>>>>> changes a lot in this part. Do you met any other exceptions or errors >>>>>>>> beside this one? Probably this is due to other exceptions that makes >>>>>>>> this >>>>>>>> system unstable. >>>>>>>> >>>>>>>> On Fri, Oct 30, 2015 at 5:13 PM, Ramkumar V < >>>>>>>> ramkumar.c...@gmail.com> wrote: >>>>>>>> >>>>>>>>> No, i dont have any special settings. if i keep only reading line >>>>>>>>> in my code, it's throwing NPE. >>>>>>>>> >>>>>>>>> *Thanks*, >>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Oct 30, 2015 at 2:14 PM, Saisai Shao < >>>>>>>>> sai.sai.s...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Do you have any special settings, from your code, I don't think >>>>>>>>>> it will incur NPE at that place. >>>>>>>>>> >>>>>>>>>> On Fri, Oct 30, 2015 at 4:32 PM, Ramkumar V < >>>>>>>>>> ramkumar.c...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> spark version - spark 1.4.1 >>>>>>>>>>> >>>>>>>>>>> my code snippet: >>>>>>>>>>> >>>>>>>>>>> String brokers = "ip:port,ip:port"; >>>>>>>>>>> String topics = "x,y,z"; >>>>>>>>>>> HashSet<String> TopicsSet = new >>>>>>>>>>> HashSet<String>(Arrays.asList(topics.split(","))); >>>>>>>>>>> HashMap<String, String> kafkaParams = new HashMap<String, >>>>>>>>>>> String>(); >>>>>>>>>>> kafkaParams.put("metadata.broker.list", brokers); >>>>>>>>>>> >>>>>>>>>>> JavaPairInputDStream<String, String> messages = >>>>>>>>>>> KafkaUtils.createDirectStream( >>>>>>>>>>> jssc, >>>>>>>>>>> String.class, >>>>>>>>>>> String.class, >>>>>>>>>>> StringDecoder.class, >>>>>>>>>>> StringDecoder.class, >>>>>>>>>>> kafkaParams, >>>>>>>>>>> TopicsSet >>>>>>>>>>> ); >>>>>>>>>>> >>>>>>>>>>> messages.foreachRDD(new Function<JavaPairRDD<String , >>>>>>>>>>> String>,Void> () { >>>>>>>>>>> public Void call(JavaPairRDD<String , String> tuple) >>>>>>>>>>> { >>>>>>>>>>> JavaRDD<String>rdd = tuple.values(); >>>>>>>>>>> >>>>>>>>>>> rdd.saveAsTextFile("hdfs://myuser:8020/user/hdfs/output"); >>>>>>>>>>> return null; >>>>>>>>>>> } >>>>>>>>>>> }); >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> *Thanks*, >>>>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Fri, Oct 30, 2015 at 1:57 PM, Saisai Shao < >>>>>>>>>>> sai.sai.s...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> What Spark version are you using, also a small code snippet of >>>>>>>>>>>> how you use Spark Streaming would be greatly helpful. >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Oct 30, 2015 at 3:57 PM, Ramkumar V < >>>>>>>>>>>> ramkumar.c...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> I can able to read and print few lines. Afterthat i'm getting >>>>>>>>>>>>> this exception. Any idea for this ? >>>>>>>>>>>>> >>>>>>>>>>>>> *Thanks*, >>>>>>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V < >>>>>>>>>>>>> ramkumar.c...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>> >>>>>>>>>>>>>> I'm trying to read from kafka stream and printing it >>>>>>>>>>>>>> textfile. I'm using java over spark. I dont know why i'm getting >>>>>>>>>>>>>> the >>>>>>>>>>>>>> following exception. Also exception message is very abstract. >>>>>>>>>>>>>> can anyone >>>>>>>>>>>>>> please help me ? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Log Trace : >>>>>>>>>>>>>> >>>>>>>>>>>>>> 15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job >>>>>>>>>>>>>> generator >>>>>>>>>>>>>> java.lang.NullPointerException >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.org >>>>>>>>>>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) >>>>>>>>>>>>>> 15/10/29 12:15:09 ERROR yarn.ApplicationMaster: User class >>>>>>>>>>>>>> threw exception: java.lang.NullPointerException >>>>>>>>>>>>>> java.lang.NullPointerException >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.org >>>>>>>>>>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> *Thanks*, >>>>>>>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>> >> >