KafkaCluster.scala in the spark/extrernal/kafka project has a bunch of api
code, including code for updating Kafka-managed ZK offsets.  Look at
setConsumerOffsets.

Unfortunately all of that code is private, but you can either write your
own, copy it, or do what I do (sed out private[spark] and rebuild spark...)

On Mon, Jun 1, 2015 at 4:51 PM, Tathagata Das <t...@databricks.com> wrote:

> In the receiver-less "direct" approach, there is no concept of consumer
> group as we dont use the Kafka High Level consumer (that uses ZK). Instead
> Spark Streaming manages offsets on its own, giving tighter guarantees. If
> you want to monitor the progress of the processing of offsets, you will
> have to update ZK yourself. With the code snippet you posted, you can get
> the range of offsets that were processed in each batch, and accordingly
> update Zookeeper using some consumer group name of your choice.
>
> TD
>
> On Mon, Jun 1, 2015 at 2:23 PM, dgoldenberg <dgoldenberg...@gmail.com>
> wrote:
>
>> Hi,
>>
>> What are some of the good/adopted approached to monitoring Spark Streaming
>> from Kafka?  I see that there are things like
>> http://quantifind.github.io/KafkaOffsetMonitor, for example.  Do they all
>> assume that Receiver-based streaming is used?
>>
>> Then "Note that one disadvantage of this approach (Receiverless Approach,
>> #2) is that it does not update offsets in Zookeeper, hence Zookeeper-based
>> Kafka monitoring tools will not show progress. However, you can access the
>> offsets processed by this approach in each batch and update Zookeeper
>> yourself".
>>
>> The code sample, however, seems sparse. What do you need to do here? -
>>  directKafkaStream.foreachRDD(
>>      new Function<JavaPairRDD&lt;String, String>, Void>() {
>>          @Override
>>          public Void call(JavaPairRDD<String, Integer> rdd) throws
>> IOException {
>>              OffsetRange[] offsetRanges =
>> ((HasOffsetRanges)rdd).offsetRanges
>>              // offsetRanges.length = # of Kafka partitions being consumed
>>              ...
>>              return null;
>>          }
>>      }
>>  );
>>
>> and if these are updated, will KafkaOffsetMonitor work?
>>
>> Monitoring seems to center around the notion of a consumer group.  But in
>> the receiverless approach, code on the Spark consumer side doesn't seem to
>> expose a consumer group parameter.  Where does it go?  Can I/should I just
>> pass in group.id as part of the kafkaParams HashMap?
>>
>> Thanks
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-monitor-Spark-Streaming-from-Kafka-tp23103.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>

Reply via email to