KafkaCluster.scala in the spark/extrernal/kafka project has a bunch of api code, including code for updating Kafka-managed ZK offsets. Look at setConsumerOffsets.
Unfortunately all of that code is private, but you can either write your own, copy it, or do what I do (sed out private[spark] and rebuild spark...) On Mon, Jun 1, 2015 at 4:51 PM, Tathagata Das <t...@databricks.com> wrote: > In the receiver-less "direct" approach, there is no concept of consumer > group as we dont use the Kafka High Level consumer (that uses ZK). Instead > Spark Streaming manages offsets on its own, giving tighter guarantees. If > you want to monitor the progress of the processing of offsets, you will > have to update ZK yourself. With the code snippet you posted, you can get > the range of offsets that were processed in each batch, and accordingly > update Zookeeper using some consumer group name of your choice. > > TD > > On Mon, Jun 1, 2015 at 2:23 PM, dgoldenberg <dgoldenberg...@gmail.com> > wrote: > >> Hi, >> >> What are some of the good/adopted approached to monitoring Spark Streaming >> from Kafka? I see that there are things like >> http://quantifind.github.io/KafkaOffsetMonitor, for example. Do they all >> assume that Receiver-based streaming is used? >> >> Then "Note that one disadvantage of this approach (Receiverless Approach, >> #2) is that it does not update offsets in Zookeeper, hence Zookeeper-based >> Kafka monitoring tools will not show progress. However, you can access the >> offsets processed by this approach in each batch and update Zookeeper >> yourself". >> >> The code sample, however, seems sparse. What do you need to do here? - >> directKafkaStream.foreachRDD( >> new Function<JavaPairRDD<String, String>, Void>() { >> @Override >> public Void call(JavaPairRDD<String, Integer> rdd) throws >> IOException { >> OffsetRange[] offsetRanges = >> ((HasOffsetRanges)rdd).offsetRanges >> // offsetRanges.length = # of Kafka partitions being consumed >> ... >> return null; >> } >> } >> ); >> >> and if these are updated, will KafkaOffsetMonitor work? >> >> Monitoring seems to center around the notion of a consumer group. But in >> the receiverless approach, code on the Spark consumer side doesn't seem to >> expose a consumer group parameter. Where does it go? Can I/should I just >> pass in group.id as part of the kafkaParams HashMap? >> >> Thanks >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-monitor-Spark-Streaming-from-Kafka-tp23103.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >