I suppose I have other problems as I can’t get the Scala example to work either. Puzzling, as I have literally coded like the examples (that are purported to work), but no luck.
mn On Sep 24, 2014, at 11:27 AM, Tim Smith <secs...@gmail.com> wrote: > Maybe differences between JavaPairDStream and JavaPairReceiverInputDStream? > > On Wed, Sep 24, 2014 at 7:46 AM, Matt Narrell <matt.narr...@gmail.com> wrote: >> The part that works is the commented out, single receiver stream below the >> loop. It seems that when I call KafkaUtils.createStream more than once, I >> don’t receive any messages. >> >> I’ll dig through the logs, but at first glance yesterday I didn’t see >> anything suspect. I’ll have to look closer. >> >> mn >> >> On Sep 23, 2014, at 6:14 PM, Tim Smith <secs...@gmail.com> wrote: >> >>> Maybe post the before-code as in what was the code before you did the >>> loop (that worked)? I had similar situations where reviewing code >>> before (worked) and after (does not work) helped. Also, what helped is >>> the Scala REPL because I can see what are the object types being >>> returned by each statement. >>> >>> Other than code, in the driver logs, you should see events that say >>> "Registered receiver for stream 0 from >>> akka.tcp://sp...@node5.acme.net:53135" >>> >>> Now, if you goto "node5" and look at Spark or YarnContainer logs >>> (depending on who's doing RM), you should be able to see if the >>> receiver has any errors when trying to talk to kafka. >>> >>> >>> >>> On Tue, Sep 23, 2014 at 3:21 PM, Matt Narrell <matt.narr...@gmail.com> >>> wrote: >>>> To my eyes, these are functionally equivalent. I’ll try a Scala approach, >>>> but this may cause waves for me upstream (e.g., non-Java) >>>> >>>> Thanks for looking at this. If anyone else can see a glaring issue in the >>>> Java approach that would be appreciated. >>>> >>>> Thanks, >>>> Matt >>>> >>>> On Sep 23, 2014, at 4:13 PM, Tim Smith <secs...@gmail.com> wrote: >>>> >>>>> Sorry, I am almost Java illiterate but here's my Scala code to do the >>>>> equivalent (that I have tested to work): >>>>> >>>>> val kInStreams = (1 to 10).map{_ => >>>>> KafkaUtils.createStream(ssc,zkhost.acme.net:2182,"myGrp",Map("myTopic" >>>>> -> 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers >>>>> across the cluster, one for each partition, potentially but active >>>>> receivers are only as many kafka partitions you have >>>>> >>>>> val kInMsg = >>>>> ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER) >>>>> >>>>> >>>>> >>>>> >>>>> On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell <matt.narr...@gmail.com> >>>>> wrote: >>>>>> So, this is scrubbed some for confidentiality, but the meat of it is as >>>>>> follows. Note, that if I substitute the commented section for the loop, >>>>>> I receive messages from the topic. >>>>>> >>>>>> SparkConf sparkConf = new SparkConf(); >>>>>> sparkConf.set("spark.streaming.unpersist", "true"); >>>>>> sparkConf.set("spark.logConf", "true"); >>>>>> >>>>>> Map<String, String> kafkaProps = new HashMap<>(); >>>>>> kafkaProps.put("zookeeper.connect", Constants.ZK_ENSEMBLE + "/kafka"); >>>>>> kafkaProps.put("group.id", groupId); >>>>>> >>>>>> JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, >>>>>> Seconds.apply(1)); >>>>>> jsc.checkpoint("hdfs://<some_location>"); >>>>>> >>>>>> List<JavaPairDStream<String, ProtobufModel>> streamList = new >>>>>> ArrayList<>(5); >>>>>> >>>>>> for (int i = 0; i < 5; i++) { >>>>>> streamList.add(KafkaUtils.createStream(jsc, >>>>>> String.class, >>>>>> ProtobufModel.class, >>>>>> StringDecoder.class, >>>>>> ProtobufModelDecoder.class, >>>>>> kafkaProps, >>>>>> Collections.singletonMap(topic, >>>>>> 1), >>>>>> StorageLevel.MEMORY_ONLY_SER())); >>>>>> } >>>>>> >>>>>> final JavaPairDStream<String, ProtobufModel> stream = >>>>>> jsc.union(streamList.get(0), streamList.subList(1, streamList.size())); >>>>>> >>>>>> // final JavaPairReceiverInputDStream<String, ProtobufModel> stream = >>>>>> // KafkaUtils.createStream(jsc, >>>>>> // String.class, >>>>>> ProtobufModel.class, >>>>>> // StringDecoder.class, >>>>>> ProtobufModelDecoder.class, >>>>>> // kafkaProps, >>>>>> // >>>>>> Collections.singletonMap(topic, 5), >>>>>> // >>>>>> StorageLevel.MEMORY_ONLY_SER()); >>>>>> >>>>>> final JavaPairDStream<String, Integer> tuples = stream.mapToPair( >>>>>> new PairFunction<Tuple2<String, ProtobufModel>, String, Integer>() { >>>>>> @Override >>>>>> public Tuple2<String, Integer> call(Tuple2<String, >>>>>> ProtobufModel> tuple) throws Exception { >>>>>> return new Tuple2<>(tuple._2().getDeviceId(), 1); >>>>>> } >>>>>> }); >>>>>> >>>>>> … and futher Spark functions ... >>>>>> >>>>>> On Sep 23, 2014, at 2:55 PM, Tim Smith <secs...@gmail.com> wrote: >>>>>> >>>>>>> Posting your code would be really helpful in figuring out gotchas. >>>>>>> >>>>>>> On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell <matt.narr...@gmail.com> >>>>>>> wrote: >>>>>>>> Hey, >>>>>>>> >>>>>>>> Spark 1.1.0 >>>>>>>> Kafka 0.8.1.1 >>>>>>>> Hadoop (YARN/HDFS) 2.5.1 >>>>>>>> >>>>>>>> I have a five partition Kafka topic. I can create a single Kafka >>>>>>>> receiver >>>>>>>> via KafkaUtils.createStream with five threads in the topic map and >>>>>>>> consume >>>>>>>> messages fine. Sifting through the user list and Google, I see that >>>>>>>> its >>>>>>>> possible to split the Kafka receiver among the Spark workers such that >>>>>>>> I can >>>>>>>> have a receiver per topic, and have this distributed to workers rather >>>>>>>> than >>>>>>>> localized to the driver. I’m following something like this: >>>>>>>> https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java#L132 >>>>>>>> But for Kafka obviously. From the Streaming Programming Guide “ >>>>>>>> Receiving >>>>>>>> multiple data streams can therefore be achieved by creating multiple >>>>>>>> input >>>>>>>> DStreams and configuring them to receive different partitions of the >>>>>>>> data >>>>>>>> stream from the source(s)." >>>>>>>> >>>>>>>> However, I’m not able to consume any messages from Kafka after I >>>>>>>> perform the >>>>>>>> union operation. Again, if I create a single, multi-threaded, >>>>>>>> receiver I >>>>>>>> can consume messages fine. If I create 5 receivers in a loop, and call >>>>>>>> jssc.union(…) i get: >>>>>>>> >>>>>>>> INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks >>>>>>>> INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks >>>>>>>> INFO scheduler.ReceiverTracker: Stream 2 received 0 blocks >>>>>>>> INFO scheduler.ReceiverTracker: Stream 3 received 0 blocks >>>>>>>> INFO scheduler.ReceiverTracker: Stream 4 received 0 blocks >>>>>>>> >>>>>>>> >>>>>>>> Do I need to do anything to the unioned DStream? Am I going about this >>>>>>>> incorrectly? >>>>>>>> >>>>>>>> Thanks in advance. >>>>>>>> >>>>>>>> Matt >>>>>>> >>>>>>> --------------------------------------------------------------------- >>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>>>> >>>>>> >>>>> >>>>> --------------------------------------------------------------------- >>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>> >>>> >>> >>> --------------------------------------------------------------------- >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >> > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org