Re: How to monitor Spark Streaming from Kafka?
Nobody mentioned CM yet? Kafka is now supported by CM/CDH 5.4 http://www.cloudera.com/content/cloudera/en/documentation/cloudera-kafka/latest/PDF/cloudera-kafka.pdf -- Ruslan Dautkhanov On Mon, Jun 1, 2015 at 5:19 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Thank you, Tathagata, Cody, Otis. - Dmitry On Mon, Jun 1, 2015 at 6:57 PM, Otis Gospodnetic otis.gospodne...@gmail.com wrote: I think you can use SPM - http://sematext.com/spm - it will give you all Spark and all Kafka metrics, including offsets broken down by topic, etc. out of the box. I see more and more people using it to monitor various components in data processing pipelines, a la http://blog.sematext.com/2015/04/22/monitoring-stream-processing-tools-cassandra-kafka-and-spark/ Otis On Mon, Jun 1, 2015 at 5:23 PM, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, What are some of the good/adopted approached to monitoring Spark Streaming from Kafka? I see that there are things like http://quantifind.github.io/KafkaOffsetMonitor, for example. Do they all assume that Receiver-based streaming is used? Then Note that one disadvantage of this approach (Receiverless Approach, #2) is that it does not update offsets in Zookeeper, hence Zookeeper-based Kafka monitoring tools will not show progress. However, you can access the offsets processed by this approach in each batch and update Zookeeper yourself. The code sample, however, seems sparse. What do you need to do here? - directKafkaStream.foreachRDD( new FunctionJavaPairRDDlt;String, String, Void() { @Override public Void call(JavaPairRDDString, Integer rdd) throws IOException { OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd).offsetRanges // offsetRanges.length = # of Kafka partitions being consumed ... return null; } } ); and if these are updated, will KafkaOffsetMonitor work? Monitoring seems to center around the notion of a consumer group. But in the receiverless approach, code on the Spark consumer side doesn't seem to expose a consumer group parameter. Where does it go? Can I/should I just pass in group.id as part of the kafkaParams HashMap? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-monitor-Spark-Streaming-from-Kafka-tp23103.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to monitor Spark Streaming from Kafka?
Hi, What are some of the good/adopted approached to monitoring Spark Streaming from Kafka? I see that there are things like http://quantifind.github.io/KafkaOffsetMonitor, for example. Do they all assume that Receiver-based streaming is used? Then Note that one disadvantage of this approach (Receiverless Approach, #2) is that it does not update offsets in Zookeeper, hence Zookeeper-based Kafka monitoring tools will not show progress. However, you can access the offsets processed by this approach in each batch and update Zookeeper yourself. The code sample, however, seems sparse. What do you need to do here? - directKafkaStream.foreachRDD( new FunctionJavaPairRDDlt;String, String, Void() { @Override public Void call(JavaPairRDDString, Integer rdd) throws IOException { OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd).offsetRanges // offsetRanges.length = # of Kafka partitions being consumed ... return null; } } ); and if these are updated, will KafkaOffsetMonitor work? Monitoring seems to center around the notion of a consumer group. But in the receiverless approach, code on the Spark consumer side doesn't seem to expose a consumer group parameter. Where does it go? Can I/should I just pass in group.id as part of the kafkaParams HashMap? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-monitor-Spark-Streaming-from-Kafka-tp23103.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to monitor Spark Streaming from Kafka?
KafkaCluster.scala in the spark/extrernal/kafka project has a bunch of api code, including code for updating Kafka-managed ZK offsets. Look at setConsumerOffsets. Unfortunately all of that code is private, but you can either write your own, copy it, or do what I do (sed out private[spark] and rebuild spark...) On Mon, Jun 1, 2015 at 4:51 PM, Tathagata Das t...@databricks.com wrote: In the receiver-less direct approach, there is no concept of consumer group as we dont use the Kafka High Level consumer (that uses ZK). Instead Spark Streaming manages offsets on its own, giving tighter guarantees. If you want to monitor the progress of the processing of offsets, you will have to update ZK yourself. With the code snippet you posted, you can get the range of offsets that were processed in each batch, and accordingly update Zookeeper using some consumer group name of your choice. TD On Mon, Jun 1, 2015 at 2:23 PM, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, What are some of the good/adopted approached to monitoring Spark Streaming from Kafka? I see that there are things like http://quantifind.github.io/KafkaOffsetMonitor, for example. Do they all assume that Receiver-based streaming is used? Then Note that one disadvantage of this approach (Receiverless Approach, #2) is that it does not update offsets in Zookeeper, hence Zookeeper-based Kafka monitoring tools will not show progress. However, you can access the offsets processed by this approach in each batch and update Zookeeper yourself. The code sample, however, seems sparse. What do you need to do here? - directKafkaStream.foreachRDD( new FunctionJavaPairRDDlt;String, String, Void() { @Override public Void call(JavaPairRDDString, Integer rdd) throws IOException { OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd).offsetRanges // offsetRanges.length = # of Kafka partitions being consumed ... return null; } } ); and if these are updated, will KafkaOffsetMonitor work? Monitoring seems to center around the notion of a consumer group. But in the receiverless approach, code on the Spark consumer side doesn't seem to expose a consumer group parameter. Where does it go? Can I/should I just pass in group.id as part of the kafkaParams HashMap? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-monitor-Spark-Streaming-from-Kafka-tp23103.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to monitor Spark Streaming from Kafka?
In the receiver-less direct approach, there is no concept of consumer group as we dont use the Kafka High Level consumer (that uses ZK). Instead Spark Streaming manages offsets on its own, giving tighter guarantees. If you want to monitor the progress of the processing of offsets, you will have to update ZK yourself. With the code snippet you posted, you can get the range of offsets that were processed in each batch, and accordingly update Zookeeper using some consumer group name of your choice. TD On Mon, Jun 1, 2015 at 2:23 PM, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, What are some of the good/adopted approached to monitoring Spark Streaming from Kafka? I see that there are things like http://quantifind.github.io/KafkaOffsetMonitor, for example. Do they all assume that Receiver-based streaming is used? Then Note that one disadvantage of this approach (Receiverless Approach, #2) is that it does not update offsets in Zookeeper, hence Zookeeper-based Kafka monitoring tools will not show progress. However, you can access the offsets processed by this approach in each batch and update Zookeeper yourself. The code sample, however, seems sparse. What do you need to do here? - directKafkaStream.foreachRDD( new FunctionJavaPairRDDlt;String, String, Void() { @Override public Void call(JavaPairRDDString, Integer rdd) throws IOException { OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd).offsetRanges // offsetRanges.length = # of Kafka partitions being consumed ... return null; } } ); and if these are updated, will KafkaOffsetMonitor work? Monitoring seems to center around the notion of a consumer group. But in the receiverless approach, code on the Spark consumer side doesn't seem to expose a consumer group parameter. Where does it go? Can I/should I just pass in group.id as part of the kafkaParams HashMap? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-monitor-Spark-Streaming-from-Kafka-tp23103.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to monitor Spark Streaming from Kafka?
I think you can use SPM - http://sematext.com/spm - it will give you all Spark and all Kafka metrics, including offsets broken down by topic, etc. out of the box. I see more and more people using it to monitor various components in data processing pipelines, a la http://blog.sematext.com/2015/04/22/monitoring-stream-processing-tools-cassandra-kafka-and-spark/ Otis On Mon, Jun 1, 2015 at 5:23 PM, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, What are some of the good/adopted approached to monitoring Spark Streaming from Kafka? I see that there are things like http://quantifind.github.io/KafkaOffsetMonitor, for example. Do they all assume that Receiver-based streaming is used? Then Note that one disadvantage of this approach (Receiverless Approach, #2) is that it does not update offsets in Zookeeper, hence Zookeeper-based Kafka monitoring tools will not show progress. However, you can access the offsets processed by this approach in each batch and update Zookeeper yourself. The code sample, however, seems sparse. What do you need to do here? - directKafkaStream.foreachRDD( new FunctionJavaPairRDDlt;String, String, Void() { @Override public Void call(JavaPairRDDString, Integer rdd) throws IOException { OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd).offsetRanges // offsetRanges.length = # of Kafka partitions being consumed ... return null; } } ); and if these are updated, will KafkaOffsetMonitor work? Monitoring seems to center around the notion of a consumer group. But in the receiverless approach, code on the Spark consumer side doesn't seem to expose a consumer group parameter. Where does it go? Can I/should I just pass in group.id as part of the kafkaParams HashMap? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-monitor-Spark-Streaming-from-Kafka-tp23103.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to monitor Spark Streaming from Kafka?
Thank you, Tathagata, Cody, Otis. - Dmitry On Mon, Jun 1, 2015 at 6:57 PM, Otis Gospodnetic otis.gospodne...@gmail.com wrote: I think you can use SPM - http://sematext.com/spm - it will give you all Spark and all Kafka metrics, including offsets broken down by topic, etc. out of the box. I see more and more people using it to monitor various components in data processing pipelines, a la http://blog.sematext.com/2015/04/22/monitoring-stream-processing-tools-cassandra-kafka-and-spark/ Otis On Mon, Jun 1, 2015 at 5:23 PM, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, What are some of the good/adopted approached to monitoring Spark Streaming from Kafka? I see that there are things like http://quantifind.github.io/KafkaOffsetMonitor, for example. Do they all assume that Receiver-based streaming is used? Then Note that one disadvantage of this approach (Receiverless Approach, #2) is that it does not update offsets in Zookeeper, hence Zookeeper-based Kafka monitoring tools will not show progress. However, you can access the offsets processed by this approach in each batch and update Zookeeper yourself. The code sample, however, seems sparse. What do you need to do here? - directKafkaStream.foreachRDD( new FunctionJavaPairRDDlt;String, String, Void() { @Override public Void call(JavaPairRDDString, Integer rdd) throws IOException { OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd).offsetRanges // offsetRanges.length = # of Kafka partitions being consumed ... return null; } } ); and if these are updated, will KafkaOffsetMonitor work? Monitoring seems to center around the notion of a consumer group. But in the receiverless approach, code on the Spark consumer side doesn't seem to expose a consumer group parameter. Where does it go? Can I/should I just pass in group.id as part of the kafkaParams HashMap? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-monitor-Spark-Streaming-from-Kafka-tp23103.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org