Good to know it worked out and thanks for the update. I didn't realize
you need to provision for receiver workers + processing workers. One
would think a worker would process multiple stages of an app/job and
receive is just a stage of the job.



On Thu, Sep 25, 2014 at 12:05 PM, Matt Narrell <matt.narr...@gmail.com> wrote:
> Additionally,
>
> If I dial up/down the number of executor cores, this does what I want.
> Thanks for the extra eyes!
>
> mn
>
> On Sep 25, 2014, at 12:34 PM, Matt Narrell <matt.narr...@gmail.com> wrote:
>
> Tim,
>
> I think I understand this now.  I had a five node Spark cluster and a five
> partition topic, and I created five receivers.  I found this:
> http://stackoverflow.com/questions/25785581/custom-receiver-stalls-worker-in-spark-streaming
> Indicating that if I use all my workers as receivers, there are none left to
> do the processing.  If I drop the number of partitions/receivers down while
> still having multiple unioned receivers, I see messages.
>
> mn
>
> On Sep 25, 2014, at 10:18 AM, Matt Narrell <matt.narr...@gmail.com> wrote:
>
> I suppose I have other problems as I can’t get the Scala example to work
> either.  Puzzling, as I have literally coded like the examples (that are
> purported to work), but no luck.
>
> mn
>
> On Sep 24, 2014, at 11:27 AM, Tim Smith <secs...@gmail.com> wrote:
>
> Maybe differences between JavaPairDStream and JavaPairReceiverInputDStream?
>
> On Wed, Sep 24, 2014 at 7:46 AM, Matt Narrell <matt.narr...@gmail.com>
> wrote:
>
> The part that works is the commented out, single receiver stream below the
> loop.  It seems that when I call KafkaUtils.createStream more than once, I
> don’t receive any messages.
>
> I’ll dig through the logs, but at first glance yesterday I didn’t see
> anything suspect.  I’ll have to look closer.
>
> mn
>
> On Sep 23, 2014, at 6:14 PM, Tim Smith <secs...@gmail.com> wrote:
>
> Maybe post the before-code as in what was the code before you did the
> loop (that worked)? I had similar situations where reviewing code
> before (worked) and after (does not work) helped. Also, what helped is
> the Scala REPL because I can see what are the object types being
> returned by each statement.
>
> Other than code, in the driver logs, you should see events that say
> "Registered receiver for stream 0 from
> akka.tcp://sp...@node5.acme.net:53135"
>
> Now, if you goto "node5" and look at Spark or YarnContainer logs
> (depending on who's doing RM), you should be able to see if the
> receiver has any errors when trying to talk to kafka.
>
>
>
> On Tue, Sep 23, 2014 at 3:21 PM, Matt Narrell <matt.narr...@gmail.com>
> wrote:
>
> To my eyes, these are functionally equivalent.  I’ll try a Scala approach,
> but this may cause waves for me upstream (e.g., non-Java)
>
> Thanks for looking at this.  If anyone else can see a glaring issue in the
> Java approach that would be appreciated.
>
> Thanks,
> Matt
>
> On Sep 23, 2014, at 4:13 PM, Tim Smith <secs...@gmail.com> wrote:
>
> Sorry, I am almost Java illiterate but here's my Scala code to do the
> equivalent (that I have tested to work):
>
> val kInStreams = (1 to 10).map{_ =>
> KafkaUtils.createStream(ssc,zkhost.acme.net:2182,"myGrp",Map("myTopic"
> -> 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers
> across the cluster, one for each partition, potentially but active
> receivers are only as many kafka partitions you have
>
> val kInMsg =
> ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)
>
>
>
>
> On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell <matt.narr...@gmail.com>
> wrote:
>
> So, this is scrubbed some for confidentiality, but the meat of it is as
> follows.  Note, that if I substitute the commented section for the loop, I
> receive messages from the topic.
>
> SparkConf sparkConf = new SparkConf();
> sparkConf.set("spark.streaming.unpersist", "true");
> sparkConf.set("spark.logConf", "true");
>
> Map<String, String> kafkaProps = new HashMap<>();
> kafkaProps.put("zookeeper.connect", Constants.ZK_ENSEMBLE + "/kafka");
> kafkaProps.put("group.id", groupId);
>
> JavaStreamingContext jsc = new JavaStreamingContext(sparkConf,
> Seconds.apply(1));
> jsc.checkpoint("hdfs://<some_location>");
>
> List<JavaPairDStream<String, ProtobufModel>> streamList = new
> ArrayList<>(5);
>
> for (int i = 0; i < 5; i++) {
> streamList.add(KafkaUtils.createStream(jsc,
>                                        String.class, ProtobufModel.class,
>                                        StringDecoder.class,
> ProtobufModelDecoder.class,
>                                        kafkaProps,
>                                        Collections.singletonMap(topic, 1),
>                                        StorageLevel.MEMORY_ONLY_SER()));
> }
>
> final JavaPairDStream<String, ProtobufModel> stream =
> jsc.union(streamList.get(0), streamList.subList(1, streamList.size()));
>
> //  final JavaPairReceiverInputDStream<String, ProtobufModel> stream =
> //                  KafkaUtils.createStream(jsc,
> //                                          String.class,
> ProtobufModel.class,
> //                                          StringDecoder.class,
> ProtobufModelDecoder.class,
> //                                          kafkaProps,
> //                                          Collections.singletonMap(topic,
> 5),
> //                                          StorageLevel.MEMORY_ONLY_SER());
>
> final JavaPairDStream<String, Integer> tuples = stream.mapToPair(
>     new PairFunction<Tuple2<String, ProtobufModel>, String, Integer>() {
>         @Override
>         public Tuple2<String, Integer> call(Tuple2<String, ProtobufModel>
> tuple) throws Exception {
>             return new Tuple2<>(tuple._2().getDeviceId(), 1);
>         }
>     });
>
> … and futher Spark functions ...
>
> On Sep 23, 2014, at 2:55 PM, Tim Smith <secs...@gmail.com> wrote:
>
> Posting your code would be really helpful in figuring out gotchas.
>
> On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell <matt.narr...@gmail.com>
> wrote:
>
> Hey,
>
> Spark 1.1.0
> Kafka 0.8.1.1
> Hadoop (YARN/HDFS) 2.5.1
>
> I have a five partition Kafka topic.  I can create a single Kafka receiver
> via KafkaUtils.createStream with five threads in the topic map and consume
> messages fine.  Sifting through the user list and Google, I see that its
> possible to split the Kafka receiver among the Spark workers such that I can
> have a receiver per topic, and have this distributed to workers rather than
> localized to the driver.  I’m following something like this:
> https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java#L132
> But for Kafka obviously.  From the Streaming Programming Guide “ Receiving
> multiple data streams can therefore be achieved by creating multiple input
> DStreams and configuring them to receive different partitions of the data
> stream from the source(s)."
>
> However, I’m not able to consume any messages from Kafka after I perform the
> union operation.  Again, if I create a single, multi-threaded, receiver I
> can consume messages fine.  If I create 5 receivers in a loop, and call
> jssc.union(…) i get:
>
> INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks
> INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks
> INFO scheduler.ReceiverTracker: Stream 2 received 0 blocks
> INFO scheduler.ReceiverTracker: Stream 3 received 0 blocks
> INFO scheduler.ReceiverTracker: Stream 4 received 0 blocks
>
>
> Do I need to do anything to the unioned DStream?  Am I going about this
> incorrectly?
>
> Thanks in advance.
>
> Matt
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to