Tim,

I think I understand this now.  I had a five node Spark cluster and a five 
partition topic, and I created five receivers.  I found this:  
http://stackoverflow.com/questions/25785581/custom-receiver-stalls-worker-in-spark-streaming
 Indicating that if I use all my workers as receivers, there are none left to 
do the processing.  If I drop the number of partitions/receivers down while 
still having multiple unioned receivers, I see messages.

mn

On Sep 25, 2014, at 10:18 AM, Matt Narrell <matt.narr...@gmail.com> wrote:

> I suppose I have other problems as I can’t get the Scala example to work 
> either.  Puzzling, as I have literally coded like the examples (that are 
> purported to work), but no luck.
> 
> mn
> 
> On Sep 24, 2014, at 11:27 AM, Tim Smith <secs...@gmail.com> wrote:
> 
>> Maybe differences between JavaPairDStream and JavaPairReceiverInputDStream?
>> 
>> On Wed, Sep 24, 2014 at 7:46 AM, Matt Narrell <matt.narr...@gmail.com> wrote:
>>> The part that works is the commented out, single receiver stream below the 
>>> loop.  It seems that when I call KafkaUtils.createStream more than once, I 
>>> don’t receive any messages.
>>> 
>>> I’ll dig through the logs, but at first glance yesterday I didn’t see 
>>> anything suspect.  I’ll have to look closer.
>>> 
>>> mn
>>> 
>>> On Sep 23, 2014, at 6:14 PM, Tim Smith <secs...@gmail.com> wrote:
>>> 
>>>> Maybe post the before-code as in what was the code before you did the
>>>> loop (that worked)? I had similar situations where reviewing code
>>>> before (worked) and after (does not work) helped. Also, what helped is
>>>> the Scala REPL because I can see what are the object types being
>>>> returned by each statement.
>>>> 
>>>> Other than code, in the driver logs, you should see events that say
>>>> "Registered receiver for stream 0 from
>>>> akka.tcp://sp...@node5.acme.net:53135"
>>>> 
>>>> Now, if you goto "node5" and look at Spark or YarnContainer logs
>>>> (depending on who's doing RM), you should be able to see if the
>>>> receiver has any errors when trying to talk to kafka.
>>>> 
>>>> 
>>>> 
>>>> On Tue, Sep 23, 2014 at 3:21 PM, Matt Narrell <matt.narr...@gmail.com> 
>>>> wrote:
>>>>> To my eyes, these are functionally equivalent.  I’ll try a Scala 
>>>>> approach, but this may cause waves for me upstream (e.g., non-Java)
>>>>> 
>>>>> Thanks for looking at this.  If anyone else can see a glaring issue in 
>>>>> the Java approach that would be appreciated.
>>>>> 
>>>>> Thanks,
>>>>> Matt
>>>>> 
>>>>> On Sep 23, 2014, at 4:13 PM, Tim Smith <secs...@gmail.com> wrote:
>>>>> 
>>>>>> Sorry, I am almost Java illiterate but here's my Scala code to do the
>>>>>> equivalent (that I have tested to work):
>>>>>> 
>>>>>> val kInStreams = (1 to 10).map{_ =>
>>>>>> KafkaUtils.createStream(ssc,zkhost.acme.net:2182,"myGrp",Map("myTopic"
>>>>>> -> 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers
>>>>>> across the cluster, one for each partition, potentially but active
>>>>>> receivers are only as many kafka partitions you have
>>>>>> 
>>>>>> val kInMsg = 
>>>>>> ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell <matt.narr...@gmail.com> 
>>>>>> wrote:
>>>>>>> So, this is scrubbed some for confidentiality, but the meat of it is as 
>>>>>>> follows.  Note, that if I substitute the commented section for the 
>>>>>>> loop, I receive messages from the topic.
>>>>>>> 
>>>>>>> SparkConf sparkConf = new SparkConf();
>>>>>>> sparkConf.set("spark.streaming.unpersist", "true");
>>>>>>> sparkConf.set("spark.logConf", "true");
>>>>>>> 
>>>>>>> Map<String, String> kafkaProps = new HashMap<>();
>>>>>>> kafkaProps.put("zookeeper.connect", Constants.ZK_ENSEMBLE + "/kafka");
>>>>>>> kafkaProps.put("group.id", groupId);
>>>>>>> 
>>>>>>> JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, 
>>>>>>> Seconds.apply(1));
>>>>>>> jsc.checkpoint("hdfs://<some_location>");
>>>>>>> 
>>>>>>> List<JavaPairDStream<String, ProtobufModel>> streamList = new 
>>>>>>> ArrayList<>(5);
>>>>>>> 
>>>>>>> for (int i = 0; i < 5; i++) {
>>>>>>> streamList.add(KafkaUtils.createStream(jsc,
>>>>>>>                                        String.class, 
>>>>>>> ProtobufModel.class,
>>>>>>>                                        StringDecoder.class, 
>>>>>>> ProtobufModelDecoder.class,
>>>>>>>                                        kafkaProps,
>>>>>>>                                        Collections.singletonMap(topic, 
>>>>>>> 1),
>>>>>>>                                        StorageLevel.MEMORY_ONLY_SER()));
>>>>>>> }
>>>>>>> 
>>>>>>> final JavaPairDStream<String, ProtobufModel> stream = 
>>>>>>> jsc.union(streamList.get(0), streamList.subList(1, streamList.size()));
>>>>>>> 
>>>>>>> //  final JavaPairReceiverInputDStream<String, ProtobufModel> stream =
>>>>>>> //                  KafkaUtils.createStream(jsc,
>>>>>>> //                                          String.class, 
>>>>>>> ProtobufModel.class,
>>>>>>> //                                          StringDecoder.class, 
>>>>>>> ProtobufModelDecoder.class,
>>>>>>> //                                          kafkaProps,
>>>>>>> //                                          
>>>>>>> Collections.singletonMap(topic, 5),
>>>>>>> //                                          
>>>>>>> StorageLevel.MEMORY_ONLY_SER());
>>>>>>> 
>>>>>>> final JavaPairDStream<String, Integer> tuples = stream.mapToPair(
>>>>>>>     new PairFunction<Tuple2<String, ProtobufModel>, String, Integer>() {
>>>>>>>         @Override
>>>>>>>         public Tuple2<String, Integer> call(Tuple2<String, 
>>>>>>> ProtobufModel> tuple) throws Exception {
>>>>>>>             return new Tuple2<>(tuple._2().getDeviceId(), 1);
>>>>>>>         }
>>>>>>>     });
>>>>>>> 
>>>>>>> … and futher Spark functions ...
>>>>>>> 
>>>>>>> On Sep 23, 2014, at 2:55 PM, Tim Smith <secs...@gmail.com> wrote:
>>>>>>> 
>>>>>>>> Posting your code would be really helpful in figuring out gotchas.
>>>>>>>> 
>>>>>>>> On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell <matt.narr...@gmail.com> 
>>>>>>>> wrote:
>>>>>>>>> Hey,
>>>>>>>>> 
>>>>>>>>> Spark 1.1.0
>>>>>>>>> Kafka 0.8.1.1
>>>>>>>>> Hadoop (YARN/HDFS) 2.5.1
>>>>>>>>> 
>>>>>>>>> I have a five partition Kafka topic.  I can create a single Kafka 
>>>>>>>>> receiver
>>>>>>>>> via KafkaUtils.createStream with five threads in the topic map and 
>>>>>>>>> consume
>>>>>>>>> messages fine.  Sifting through the user list and Google, I see that 
>>>>>>>>> its
>>>>>>>>> possible to split the Kafka receiver among the Spark workers such 
>>>>>>>>> that I can
>>>>>>>>> have a receiver per topic, and have this distributed to workers 
>>>>>>>>> rather than
>>>>>>>>> localized to the driver.  I’m following something like this:
>>>>>>>>> https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java#L132
>>>>>>>>> But for Kafka obviously.  From the Streaming Programming Guide “ 
>>>>>>>>> Receiving
>>>>>>>>> multiple data streams can therefore be achieved by creating multiple 
>>>>>>>>> input
>>>>>>>>> DStreams and configuring them to receive different partitions of the 
>>>>>>>>> data
>>>>>>>>> stream from the source(s)."
>>>>>>>>> 
>>>>>>>>> However, I’m not able to consume any messages from Kafka after I 
>>>>>>>>> perform the
>>>>>>>>> union operation.  Again, if I create a single, multi-threaded, 
>>>>>>>>> receiver I
>>>>>>>>> can consume messages fine.  If I create 5 receivers in a loop, and 
>>>>>>>>> call
>>>>>>>>> jssc.union(…) i get:
>>>>>>>>> 
>>>>>>>>> INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks
>>>>>>>>> INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks
>>>>>>>>> INFO scheduler.ReceiverTracker: Stream 2 received 0 blocks
>>>>>>>>> INFO scheduler.ReceiverTracker: Stream 3 received 0 blocks
>>>>>>>>> INFO scheduler.ReceiverTracker: Stream 4 received 0 blocks
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Do I need to do anything to the unioned DStream?  Am I going about 
>>>>>>>>> this
>>>>>>>>> incorrectly?
>>>>>>>>> 
>>>>>>>>> Thanks in advance.
>>>>>>>>> 
>>>>>>>>> Matt
>>>>>>>> 
>>>>>>>> ---------------------------------------------------------------------
>>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>>> ---------------------------------------------------------------------
>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>> 
>>>>> 
>>>> 
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>> 
>>> 
>> 
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>> 
> 

Reply via email to