Sorry, I am almost Java illiterate but here's my Scala code to do the equivalent (that I have tested to work):
val kInStreams = (1 to 10).map{_ => KafkaUtils.createStream(ssc,zkhost.acme.net:2182,"myGrp",Map("myTopic" -> 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers across the cluster, one for each partition, potentially but active receivers are only as many kafka partitions you have val kInMsg = ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER) On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell <matt.narr...@gmail.com> wrote: > So, this is scrubbed some for confidentiality, but the meat of it is as > follows. Note, that if I substitute the commented section for the loop, I > receive messages from the topic. > > SparkConf sparkConf = new SparkConf(); > sparkConf.set("spark.streaming.unpersist", "true"); > sparkConf.set("spark.logConf", "true"); > > Map<String, String> kafkaProps = new HashMap<>(); > kafkaProps.put("zookeeper.connect", Constants.ZK_ENSEMBLE + "/kafka"); > kafkaProps.put("group.id", groupId); > > JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, > Seconds.apply(1)); > jsc.checkpoint("hdfs://<some_location>"); > > List<JavaPairDStream<String, ProtobufModel>> streamList = new ArrayList<>(5); > > for (int i = 0; i < 5; i++) { > streamList.add(KafkaUtils.createStream(jsc, > String.class, ProtobufModel.class, > StringDecoder.class, > ProtobufModelDecoder.class, > kafkaProps, > Collections.singletonMap(topic, 1), > StorageLevel.MEMORY_ONLY_SER())); > } > > final JavaPairDStream<String, ProtobufModel> stream = > jsc.union(streamList.get(0), streamList.subList(1, streamList.size())); > > // final JavaPairReceiverInputDStream<String, ProtobufModel> stream = > // KafkaUtils.createStream(jsc, > // String.class, ProtobufModel.class, > // StringDecoder.class, > ProtobufModelDecoder.class, > // kafkaProps, > // Collections.singletonMap(topic, > 5), > // StorageLevel.MEMORY_ONLY_SER()); > > final JavaPairDStream<String, Integer> tuples = stream.mapToPair( > new PairFunction<Tuple2<String, ProtobufModel>, String, Integer>() { > @Override > public Tuple2<String, Integer> call(Tuple2<String, ProtobufModel> > tuple) throws Exception { > return new Tuple2<>(tuple._2().getDeviceId(), 1); > } > }); > > … and futher Spark functions ... > > On Sep 23, 2014, at 2:55 PM, Tim Smith <secs...@gmail.com> wrote: > >> Posting your code would be really helpful in figuring out gotchas. >> >> On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell <matt.narr...@gmail.com> wrote: >>> Hey, >>> >>> Spark 1.1.0 >>> Kafka 0.8.1.1 >>> Hadoop (YARN/HDFS) 2.5.1 >>> >>> I have a five partition Kafka topic. I can create a single Kafka receiver >>> via KafkaUtils.createStream with five threads in the topic map and consume >>> messages fine. Sifting through the user list and Google, I see that its >>> possible to split the Kafka receiver among the Spark workers such that I can >>> have a receiver per topic, and have this distributed to workers rather than >>> localized to the driver. I’m following something like this: >>> https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java#L132 >>> But for Kafka obviously. From the Streaming Programming Guide “ Receiving >>> multiple data streams can therefore be achieved by creating multiple input >>> DStreams and configuring them to receive different partitions of the data >>> stream from the source(s)." >>> >>> However, I’m not able to consume any messages from Kafka after I perform the >>> union operation. Again, if I create a single, multi-threaded, receiver I >>> can consume messages fine. If I create 5 receivers in a loop, and call >>> jssc.union(…) i get: >>> >>> INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks >>> INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks >>> INFO scheduler.ReceiverTracker: Stream 2 received 0 blocks >>> INFO scheduler.ReceiverTracker: Stream 3 received 0 blocks >>> INFO scheduler.ReceiverTracker: Stream 4 received 0 blocks >>> >>> >>> Do I need to do anything to the unioned DStream? Am I going about this >>> incorrectly? >>> >>> Thanks in advance. >>> >>> Matt >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org