To my eyes, these are functionally equivalent. I’ll try a Scala approach, but this may cause waves for me upstream (e.g., non-Java)
Thanks for looking at this. If anyone else can see a glaring issue in the Java approach that would be appreciated. Thanks, Matt On Sep 23, 2014, at 4:13 PM, Tim Smith <secs...@gmail.com> wrote: > Sorry, I am almost Java illiterate but here's my Scala code to do the > equivalent (that I have tested to work): > > val kInStreams = (1 to 10).map{_ => > KafkaUtils.createStream(ssc,zkhost.acme.net:2182,"myGrp",Map("myTopic" > -> 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers > across the cluster, one for each partition, potentially but active > receivers are only as many kafka partitions you have > > val kInMsg = > ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER) > > > > > On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell <matt.narr...@gmail.com> wrote: >> So, this is scrubbed some for confidentiality, but the meat of it is as >> follows. Note, that if I substitute the commented section for the loop, I >> receive messages from the topic. >> >> SparkConf sparkConf = new SparkConf(); >> sparkConf.set("spark.streaming.unpersist", "true"); >> sparkConf.set("spark.logConf", "true"); >> >> Map<String, String> kafkaProps = new HashMap<>(); >> kafkaProps.put("zookeeper.connect", Constants.ZK_ENSEMBLE + "/kafka"); >> kafkaProps.put("group.id", groupId); >> >> JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, >> Seconds.apply(1)); >> jsc.checkpoint("hdfs://<some_location>"); >> >> List<JavaPairDStream<String, ProtobufModel>> streamList = new ArrayList<>(5); >> >> for (int i = 0; i < 5; i++) { >> streamList.add(KafkaUtils.createStream(jsc, >> String.class, ProtobufModel.class, >> StringDecoder.class, >> ProtobufModelDecoder.class, >> kafkaProps, >> Collections.singletonMap(topic, 1), >> StorageLevel.MEMORY_ONLY_SER())); >> } >> >> final JavaPairDStream<String, ProtobufModel> stream = >> jsc.union(streamList.get(0), streamList.subList(1, streamList.size())); >> >> // final JavaPairReceiverInputDStream<String, ProtobufModel> stream = >> // KafkaUtils.createStream(jsc, >> // String.class, >> ProtobufModel.class, >> // StringDecoder.class, >> ProtobufModelDecoder.class, >> // kafkaProps, >> // Collections.singletonMap(topic, >> 5), >> // StorageLevel.MEMORY_ONLY_SER()); >> >> final JavaPairDStream<String, Integer> tuples = stream.mapToPair( >> new PairFunction<Tuple2<String, ProtobufModel>, String, Integer>() { >> @Override >> public Tuple2<String, Integer> call(Tuple2<String, ProtobufModel> >> tuple) throws Exception { >> return new Tuple2<>(tuple._2().getDeviceId(), 1); >> } >> }); >> >> … and futher Spark functions ... >> >> On Sep 23, 2014, at 2:55 PM, Tim Smith <secs...@gmail.com> wrote: >> >>> Posting your code would be really helpful in figuring out gotchas. >>> >>> On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell <matt.narr...@gmail.com> >>> wrote: >>>> Hey, >>>> >>>> Spark 1.1.0 >>>> Kafka 0.8.1.1 >>>> Hadoop (YARN/HDFS) 2.5.1 >>>> >>>> I have a five partition Kafka topic. I can create a single Kafka receiver >>>> via KafkaUtils.createStream with five threads in the topic map and consume >>>> messages fine. Sifting through the user list and Google, I see that its >>>> possible to split the Kafka receiver among the Spark workers such that I >>>> can >>>> have a receiver per topic, and have this distributed to workers rather than >>>> localized to the driver. I’m following something like this: >>>> https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java#L132 >>>> But for Kafka obviously. From the Streaming Programming Guide “ Receiving >>>> multiple data streams can therefore be achieved by creating multiple input >>>> DStreams and configuring them to receive different partitions of the data >>>> stream from the source(s)." >>>> >>>> However, I’m not able to consume any messages from Kafka after I perform >>>> the >>>> union operation. Again, if I create a single, multi-threaded, receiver I >>>> can consume messages fine. If I create 5 receivers in a loop, and call >>>> jssc.union(…) i get: >>>> >>>> INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks >>>> INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks >>>> INFO scheduler.ReceiverTracker: Stream 2 received 0 blocks >>>> INFO scheduler.ReceiverTracker: Stream 3 received 0 blocks >>>> INFO scheduler.ReceiverTracker: Stream 4 received 0 blocks >>>> >>>> >>>> Do I need to do anything to the unioned DStream? Am I going about this >>>> incorrectly? >>>> >>>> Thanks in advance. >>>> >>>> Matt >>> >>> --------------------------------------------------------------------- >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >> > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org