Tim, I think I understand this now. I had a five node Spark cluster and a five partition topic, and I created five receivers. I found this: http://stackoverflow.com/questions/25785581/custom-receiver-stalls-worker-in-spark-streaming Indicating that if I use all my workers as receivers, there are none left to do the processing. If I drop the number of partitions/receivers down while still having multiple unioned receivers, I see messages.
mn On Sep 25, 2014, at 10:18 AM, Matt Narrell <matt.narr...@gmail.com> wrote: > I suppose I have other problems as I can’t get the Scala example to work > either. Puzzling, as I have literally coded like the examples (that are > purported to work), but no luck. > > mn > > On Sep 24, 2014, at 11:27 AM, Tim Smith <secs...@gmail.com> wrote: > >> Maybe differences between JavaPairDStream and JavaPairReceiverInputDStream? >> >> On Wed, Sep 24, 2014 at 7:46 AM, Matt Narrell <matt.narr...@gmail.com> wrote: >>> The part that works is the commented out, single receiver stream below the >>> loop. It seems that when I call KafkaUtils.createStream more than once, I >>> don’t receive any messages. >>> >>> I’ll dig through the logs, but at first glance yesterday I didn’t see >>> anything suspect. I’ll have to look closer. >>> >>> mn >>> >>> On Sep 23, 2014, at 6:14 PM, Tim Smith <secs...@gmail.com> wrote: >>> >>>> Maybe post the before-code as in what was the code before you did the >>>> loop (that worked)? I had similar situations where reviewing code >>>> before (worked) and after (does not work) helped. Also, what helped is >>>> the Scala REPL because I can see what are the object types being >>>> returned by each statement. >>>> >>>> Other than code, in the driver logs, you should see events that say >>>> "Registered receiver for stream 0 from >>>> akka.tcp://sp...@node5.acme.net:53135" >>>> >>>> Now, if you goto "node5" and look at Spark or YarnContainer logs >>>> (depending on who's doing RM), you should be able to see if the >>>> receiver has any errors when trying to talk to kafka. >>>> >>>> >>>> >>>> On Tue, Sep 23, 2014 at 3:21 PM, Matt Narrell <matt.narr...@gmail.com> >>>> wrote: >>>>> To my eyes, these are functionally equivalent. I’ll try a Scala >>>>> approach, but this may cause waves for me upstream (e.g., non-Java) >>>>> >>>>> Thanks for looking at this. If anyone else can see a glaring issue in >>>>> the Java approach that would be appreciated. >>>>> >>>>> Thanks, >>>>> Matt >>>>> >>>>> On Sep 23, 2014, at 4:13 PM, Tim Smith <secs...@gmail.com> wrote: >>>>> >>>>>> Sorry, I am almost Java illiterate but here's my Scala code to do the >>>>>> equivalent (that I have tested to work): >>>>>> >>>>>> val kInStreams = (1 to 10).map{_ => >>>>>> KafkaUtils.createStream(ssc,zkhost.acme.net:2182,"myGrp",Map("myTopic" >>>>>> -> 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers >>>>>> across the cluster, one for each partition, potentially but active >>>>>> receivers are only as many kafka partitions you have >>>>>> >>>>>> val kInMsg = >>>>>> ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER) >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell <matt.narr...@gmail.com> >>>>>> wrote: >>>>>>> So, this is scrubbed some for confidentiality, but the meat of it is as >>>>>>> follows. Note, that if I substitute the commented section for the >>>>>>> loop, I receive messages from the topic. >>>>>>> >>>>>>> SparkConf sparkConf = new SparkConf(); >>>>>>> sparkConf.set("spark.streaming.unpersist", "true"); >>>>>>> sparkConf.set("spark.logConf", "true"); >>>>>>> >>>>>>> Map<String, String> kafkaProps = new HashMap<>(); >>>>>>> kafkaProps.put("zookeeper.connect", Constants.ZK_ENSEMBLE + "/kafka"); >>>>>>> kafkaProps.put("group.id", groupId); >>>>>>> >>>>>>> JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, >>>>>>> Seconds.apply(1)); >>>>>>> jsc.checkpoint("hdfs://<some_location>"); >>>>>>> >>>>>>> List<JavaPairDStream<String, ProtobufModel>> streamList = new >>>>>>> ArrayList<>(5); >>>>>>> >>>>>>> for (int i = 0; i < 5; i++) { >>>>>>> streamList.add(KafkaUtils.createStream(jsc, >>>>>>> String.class, >>>>>>> ProtobufModel.class, >>>>>>> StringDecoder.class, >>>>>>> ProtobufModelDecoder.class, >>>>>>> kafkaProps, >>>>>>> Collections.singletonMap(topic, >>>>>>> 1), >>>>>>> StorageLevel.MEMORY_ONLY_SER())); >>>>>>> } >>>>>>> >>>>>>> final JavaPairDStream<String, ProtobufModel> stream = >>>>>>> jsc.union(streamList.get(0), streamList.subList(1, streamList.size())); >>>>>>> >>>>>>> // final JavaPairReceiverInputDStream<String, ProtobufModel> stream = >>>>>>> // KafkaUtils.createStream(jsc, >>>>>>> // String.class, >>>>>>> ProtobufModel.class, >>>>>>> // StringDecoder.class, >>>>>>> ProtobufModelDecoder.class, >>>>>>> // kafkaProps, >>>>>>> // >>>>>>> Collections.singletonMap(topic, 5), >>>>>>> // >>>>>>> StorageLevel.MEMORY_ONLY_SER()); >>>>>>> >>>>>>> final JavaPairDStream<String, Integer> tuples = stream.mapToPair( >>>>>>> new PairFunction<Tuple2<String, ProtobufModel>, String, Integer>() { >>>>>>> @Override >>>>>>> public Tuple2<String, Integer> call(Tuple2<String, >>>>>>> ProtobufModel> tuple) throws Exception { >>>>>>> return new Tuple2<>(tuple._2().getDeviceId(), 1); >>>>>>> } >>>>>>> }); >>>>>>> >>>>>>> … and futher Spark functions ... >>>>>>> >>>>>>> On Sep 23, 2014, at 2:55 PM, Tim Smith <secs...@gmail.com> wrote: >>>>>>> >>>>>>>> Posting your code would be really helpful in figuring out gotchas. >>>>>>>> >>>>>>>> On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell <matt.narr...@gmail.com> >>>>>>>> wrote: >>>>>>>>> Hey, >>>>>>>>> >>>>>>>>> Spark 1.1.0 >>>>>>>>> Kafka 0.8.1.1 >>>>>>>>> Hadoop (YARN/HDFS) 2.5.1 >>>>>>>>> >>>>>>>>> I have a five partition Kafka topic. I can create a single Kafka >>>>>>>>> receiver >>>>>>>>> via KafkaUtils.createStream with five threads in the topic map and >>>>>>>>> consume >>>>>>>>> messages fine. Sifting through the user list and Google, I see that >>>>>>>>> its >>>>>>>>> possible to split the Kafka receiver among the Spark workers such >>>>>>>>> that I can >>>>>>>>> have a receiver per topic, and have this distributed to workers >>>>>>>>> rather than >>>>>>>>> localized to the driver. I’m following something like this: >>>>>>>>> https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java#L132 >>>>>>>>> But for Kafka obviously. From the Streaming Programming Guide “ >>>>>>>>> Receiving >>>>>>>>> multiple data streams can therefore be achieved by creating multiple >>>>>>>>> input >>>>>>>>> DStreams and configuring them to receive different partitions of the >>>>>>>>> data >>>>>>>>> stream from the source(s)." >>>>>>>>> >>>>>>>>> However, I’m not able to consume any messages from Kafka after I >>>>>>>>> perform the >>>>>>>>> union operation. Again, if I create a single, multi-threaded, >>>>>>>>> receiver I >>>>>>>>> can consume messages fine. If I create 5 receivers in a loop, and >>>>>>>>> call >>>>>>>>> jssc.union(…) i get: >>>>>>>>> >>>>>>>>> INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks >>>>>>>>> INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks >>>>>>>>> INFO scheduler.ReceiverTracker: Stream 2 received 0 blocks >>>>>>>>> INFO scheduler.ReceiverTracker: Stream 3 received 0 blocks >>>>>>>>> INFO scheduler.ReceiverTracker: Stream 4 received 0 blocks >>>>>>>>> >>>>>>>>> >>>>>>>>> Do I need to do anything to the unioned DStream? Am I going about >>>>>>>>> this >>>>>>>>> incorrectly? >>>>>>>>> >>>>>>>>> Thanks in advance. >>>>>>>>> >>>>>>>>> Matt >>>>>>>> >>>>>>>> --------------------------------------------------------------------- >>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>>>>> >>>>>>> >>>>>> >>>>>> --------------------------------------------------------------------- >>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>>> >>>>> >>>> >>>> --------------------------------------------------------------------- >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>> For additional commands, e-mail: user-h...@spark.apache.org >>>> >>> >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >