Sorry, I am almost Java illiterate but here's my Scala code to do the
equivalent (that I have tested to work):

val kInStreams = (1 to 10).map{_ =>
KafkaUtils.createStream(ssc,zkhost.acme.net:2182,"myGrp",Map("myTopic"
-> 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers
across the cluster, one for each partition, potentially but active
receivers are only as many kafka partitions you have

val kInMsg = 
ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)




On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell <matt.narr...@gmail.com> wrote:
> So, this is scrubbed some for confidentiality, but the meat of it is as 
> follows.  Note, that if I substitute the commented section for the loop, I 
> receive messages from the topic.
>
> SparkConf sparkConf = new SparkConf();
> sparkConf.set("spark.streaming.unpersist", "true");
> sparkConf.set("spark.logConf", "true");
>
> Map<String, String> kafkaProps = new HashMap<>();
> kafkaProps.put("zookeeper.connect", Constants.ZK_ENSEMBLE + "/kafka");
> kafkaProps.put("group.id", groupId);
>
> JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, 
> Seconds.apply(1));
> jsc.checkpoint("hdfs://<some_location>");
>
> List<JavaPairDStream<String, ProtobufModel>> streamList = new ArrayList<>(5);
>
> for (int i = 0; i < 5; i++) {
>     streamList.add(KafkaUtils.createStream(jsc,
>                                            String.class, ProtobufModel.class,
>                                            StringDecoder.class, 
> ProtobufModelDecoder.class,
>                                            kafkaProps,
>                                            Collections.singletonMap(topic, 1),
>                                            StorageLevel.MEMORY_ONLY_SER()));
> }
>
> final JavaPairDStream<String, ProtobufModel> stream = 
> jsc.union(streamList.get(0), streamList.subList(1, streamList.size()));
>
> //  final JavaPairReceiverInputDStream<String, ProtobufModel> stream =
> //                  KafkaUtils.createStream(jsc,
> //                                          String.class, ProtobufModel.class,
> //                                          StringDecoder.class, 
> ProtobufModelDecoder.class,
> //                                          kafkaProps,
> //                                          Collections.singletonMap(topic, 
> 5),
> //                                          StorageLevel.MEMORY_ONLY_SER());
>
> final JavaPairDStream<String, Integer> tuples = stream.mapToPair(
>         new PairFunction<Tuple2<String, ProtobufModel>, String, Integer>() {
>             @Override
>             public Tuple2<String, Integer> call(Tuple2<String, ProtobufModel> 
> tuple) throws Exception {
>                 return new Tuple2<>(tuple._2().getDeviceId(), 1);
>             }
>         });
>
> … and futher Spark functions ...
>
> On Sep 23, 2014, at 2:55 PM, Tim Smith <secs...@gmail.com> wrote:
>
>> Posting your code would be really helpful in figuring out gotchas.
>>
>> On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell <matt.narr...@gmail.com> wrote:
>>> Hey,
>>>
>>> Spark 1.1.0
>>> Kafka 0.8.1.1
>>> Hadoop (YARN/HDFS) 2.5.1
>>>
>>> I have a five partition Kafka topic.  I can create a single Kafka receiver
>>> via KafkaUtils.createStream with five threads in the topic map and consume
>>> messages fine.  Sifting through the user list and Google, I see that its
>>> possible to split the Kafka receiver among the Spark workers such that I can
>>> have a receiver per topic, and have this distributed to workers rather than
>>> localized to the driver.  I’m following something like this:
>>> https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java#L132
>>> But for Kafka obviously.  From the Streaming Programming Guide “ Receiving
>>> multiple data streams can therefore be achieved by creating multiple input
>>> DStreams and configuring them to receive different partitions of the data
>>> stream from the source(s)."
>>>
>>> However, I’m not able to consume any messages from Kafka after I perform the
>>> union operation.  Again, if I create a single, multi-threaded, receiver I
>>> can consume messages fine.  If I create 5 receivers in a loop, and call
>>> jssc.union(…) i get:
>>>
>>> INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks
>>> INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks
>>> INFO scheduler.ReceiverTracker: Stream 2 received 0 blocks
>>> INFO scheduler.ReceiverTracker: Stream 3 received 0 blocks
>>> INFO scheduler.ReceiverTracker: Stream 4 received 0 blocks
>>>
>>>
>>> Do I need to do anything to the unioned DStream?  Am I going about this
>>> incorrectly?
>>>
>>> Thanks in advance.
>>>
>>> Matt
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to