I suppose I have other problems as I can’t get the Scala example to work 
either.  Puzzling, as I have literally coded like the examples (that are 
purported to work), but no luck.

mn

On Sep 24, 2014, at 11:27 AM, Tim Smith <secs...@gmail.com> wrote:

> Maybe differences between JavaPairDStream and JavaPairReceiverInputDStream?
> 
> On Wed, Sep 24, 2014 at 7:46 AM, Matt Narrell <matt.narr...@gmail.com> wrote:
>> The part that works is the commented out, single receiver stream below the 
>> loop.  It seems that when I call KafkaUtils.createStream more than once, I 
>> don’t receive any messages.
>> 
>> I’ll dig through the logs, but at first glance yesterday I didn’t see 
>> anything suspect.  I’ll have to look closer.
>> 
>> mn
>> 
>> On Sep 23, 2014, at 6:14 PM, Tim Smith <secs...@gmail.com> wrote:
>> 
>>> Maybe post the before-code as in what was the code before you did the
>>> loop (that worked)? I had similar situations where reviewing code
>>> before (worked) and after (does not work) helped. Also, what helped is
>>> the Scala REPL because I can see what are the object types being
>>> returned by each statement.
>>> 
>>> Other than code, in the driver logs, you should see events that say
>>> "Registered receiver for stream 0 from
>>> akka.tcp://sp...@node5.acme.net:53135"
>>> 
>>> Now, if you goto "node5" and look at Spark or YarnContainer logs
>>> (depending on who's doing RM), you should be able to see if the
>>> receiver has any errors when trying to talk to kafka.
>>> 
>>> 
>>> 
>>> On Tue, Sep 23, 2014 at 3:21 PM, Matt Narrell <matt.narr...@gmail.com> 
>>> wrote:
>>>> To my eyes, these are functionally equivalent.  I’ll try a Scala approach, 
>>>> but this may cause waves for me upstream (e.g., non-Java)
>>>> 
>>>> Thanks for looking at this.  If anyone else can see a glaring issue in the 
>>>> Java approach that would be appreciated.
>>>> 
>>>> Thanks,
>>>> Matt
>>>> 
>>>> On Sep 23, 2014, at 4:13 PM, Tim Smith <secs...@gmail.com> wrote:
>>>> 
>>>>> Sorry, I am almost Java illiterate but here's my Scala code to do the
>>>>> equivalent (that I have tested to work):
>>>>> 
>>>>> val kInStreams = (1 to 10).map{_ =>
>>>>> KafkaUtils.createStream(ssc,zkhost.acme.net:2182,"myGrp",Map("myTopic"
>>>>> -> 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers
>>>>> across the cluster, one for each partition, potentially but active
>>>>> receivers are only as many kafka partitions you have
>>>>> 
>>>>> val kInMsg = 
>>>>> ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell <matt.narr...@gmail.com> 
>>>>> wrote:
>>>>>> So, this is scrubbed some for confidentiality, but the meat of it is as 
>>>>>> follows.  Note, that if I substitute the commented section for the loop, 
>>>>>> I receive messages from the topic.
>>>>>> 
>>>>>> SparkConf sparkConf = new SparkConf();
>>>>>> sparkConf.set("spark.streaming.unpersist", "true");
>>>>>> sparkConf.set("spark.logConf", "true");
>>>>>> 
>>>>>> Map<String, String> kafkaProps = new HashMap<>();
>>>>>> kafkaProps.put("zookeeper.connect", Constants.ZK_ENSEMBLE + "/kafka");
>>>>>> kafkaProps.put("group.id", groupId);
>>>>>> 
>>>>>> JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, 
>>>>>> Seconds.apply(1));
>>>>>> jsc.checkpoint("hdfs://<some_location>");
>>>>>> 
>>>>>> List<JavaPairDStream<String, ProtobufModel>> streamList = new 
>>>>>> ArrayList<>(5);
>>>>>> 
>>>>>> for (int i = 0; i < 5; i++) {
>>>>>>  streamList.add(KafkaUtils.createStream(jsc,
>>>>>>                                         String.class, 
>>>>>> ProtobufModel.class,
>>>>>>                                         StringDecoder.class, 
>>>>>> ProtobufModelDecoder.class,
>>>>>>                                         kafkaProps,
>>>>>>                                         Collections.singletonMap(topic, 
>>>>>> 1),
>>>>>>                                         StorageLevel.MEMORY_ONLY_SER()));
>>>>>> }
>>>>>> 
>>>>>> final JavaPairDStream<String, ProtobufModel> stream = 
>>>>>> jsc.union(streamList.get(0), streamList.subList(1, streamList.size()));
>>>>>> 
>>>>>> //  final JavaPairReceiverInputDStream<String, ProtobufModel> stream =
>>>>>> //                  KafkaUtils.createStream(jsc,
>>>>>> //                                          String.class, 
>>>>>> ProtobufModel.class,
>>>>>> //                                          StringDecoder.class, 
>>>>>> ProtobufModelDecoder.class,
>>>>>> //                                          kafkaProps,
>>>>>> //                                          
>>>>>> Collections.singletonMap(topic, 5),
>>>>>> //                                          
>>>>>> StorageLevel.MEMORY_ONLY_SER());
>>>>>> 
>>>>>> final JavaPairDStream<String, Integer> tuples = stream.mapToPair(
>>>>>>      new PairFunction<Tuple2<String, ProtobufModel>, String, Integer>() {
>>>>>>          @Override
>>>>>>          public Tuple2<String, Integer> call(Tuple2<String, 
>>>>>> ProtobufModel> tuple) throws Exception {
>>>>>>              return new Tuple2<>(tuple._2().getDeviceId(), 1);
>>>>>>          }
>>>>>>      });
>>>>>> 
>>>>>> … and futher Spark functions ...
>>>>>> 
>>>>>> On Sep 23, 2014, at 2:55 PM, Tim Smith <secs...@gmail.com> wrote:
>>>>>> 
>>>>>>> Posting your code would be really helpful in figuring out gotchas.
>>>>>>> 
>>>>>>> On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell <matt.narr...@gmail.com> 
>>>>>>> wrote:
>>>>>>>> Hey,
>>>>>>>> 
>>>>>>>> Spark 1.1.0
>>>>>>>> Kafka 0.8.1.1
>>>>>>>> Hadoop (YARN/HDFS) 2.5.1
>>>>>>>> 
>>>>>>>> I have a five partition Kafka topic.  I can create a single Kafka 
>>>>>>>> receiver
>>>>>>>> via KafkaUtils.createStream with five threads in the topic map and 
>>>>>>>> consume
>>>>>>>> messages fine.  Sifting through the user list and Google, I see that 
>>>>>>>> its
>>>>>>>> possible to split the Kafka receiver among the Spark workers such that 
>>>>>>>> I can
>>>>>>>> have a receiver per topic, and have this distributed to workers rather 
>>>>>>>> than
>>>>>>>> localized to the driver.  I’m following something like this:
>>>>>>>> https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java#L132
>>>>>>>> But for Kafka obviously.  From the Streaming Programming Guide “ 
>>>>>>>> Receiving
>>>>>>>> multiple data streams can therefore be achieved by creating multiple 
>>>>>>>> input
>>>>>>>> DStreams and configuring them to receive different partitions of the 
>>>>>>>> data
>>>>>>>> stream from the source(s)."
>>>>>>>> 
>>>>>>>> However, I’m not able to consume any messages from Kafka after I 
>>>>>>>> perform the
>>>>>>>> union operation.  Again, if I create a single, multi-threaded, 
>>>>>>>> receiver I
>>>>>>>> can consume messages fine.  If I create 5 receivers in a loop, and call
>>>>>>>> jssc.union(…) i get:
>>>>>>>> 
>>>>>>>> INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks
>>>>>>>> INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks
>>>>>>>> INFO scheduler.ReceiverTracker: Stream 2 received 0 blocks
>>>>>>>> INFO scheduler.ReceiverTracker: Stream 3 received 0 blocks
>>>>>>>> INFO scheduler.ReceiverTracker: Stream 4 received 0 blocks
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Do I need to do anything to the unioned DStream?  Am I going about this
>>>>>>>> incorrectly?
>>>>>>>> 
>>>>>>>> Thanks in advance.
>>>>>>>> 
>>>>>>>> Matt
>>>>>>> 
>>>>>>> ---------------------------------------------------------------------
>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>> 
>>>> 
>>> 
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>> 
>> 
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to