This is an automated email from the ASF dual-hosted git repository. markusthoemmes pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new 24d10fb Measure kafka queue in consumer with better exactitude. (#3423) 24d10fb is described below commit 24d10fb6825dbd6bdd8b7896b924143751c8b2e0 Author: Vadim Raskin <raskinva...@gmail.com> AuthorDate: Tue Mar 13 11:53:45 2018 +0100 Measure kafka queue in consumer with better exactitude. (#3423) Measure kafka queue metric directly in the consumer by comparing the offsets instead of relying on the built-in consumer metrics. This method provides more precision compared to the old approach, it was observed that the built-in metrics show certain lagging in case of burst. Additionally make kamon flush cadency configurable in the application.conf. --- common/scala/src/main/resources/application.conf | 1 + .../connector/kafka/KafkaConsumerConnector.scala | 30 ++++++++++++++-------- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf index 9470bd3..402ae5d 100644 --- a/common/scala/src/main/resources/application.conf +++ b/common/scala/src/main/resources/application.conf @@ -73,6 +73,7 @@ whisk { // A low value improves latency performance but it is important to not set it too low // as that will cause excessive busy-waiting. fetch-max-wait-ms = 20 + metric-flush-interval-s = 60 } topics { diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala index 26a0f86..fc0954e 100644 --- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala +++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala @@ -21,6 +21,7 @@ import java.util.Properties import akka.actor.ActorSystem import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.{RetriableException, WakeupException} import org.apache.kafka.common.serialization.ByteArrayDeserializer import pureconfig.loadConfigOrThrow @@ -33,7 +34,7 @@ import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future} import scala.collection.JavaConverters._ -case class KafkaConsumerConfig(sessionTimeoutMs: Long) +case class KafkaConsumerConfig(sessionTimeoutMs: Long, metricFlushIntervalS: Int) class KafkaConsumerConnector( kafkahost: String, @@ -49,6 +50,10 @@ class KafkaConsumerConnector( // logic, like the wakeup timer. private val cfg = loadConfigOrThrow[KafkaConsumerConfig](ConfigKeys.kafkaConsumer) + // Currently consumed offset, is used to calculate the topic lag. + // It is updated from one thread in "peek", no concurrent data structure is necessary + private var offset: Long = 0 + /** * Long poll for messages. Method returns once message are available but no later than given * duration. @@ -62,7 +67,11 @@ class KafkaConsumerConnector( val wakeUpTask = actorSystem.scheduler.scheduleOnce(cfg.sessionTimeoutMs.milliseconds + 1.second)(consumer.wakeup()) try { - consumer.poll(duration.toMillis).map(r => (r.topic, r.partition, r.offset, r.value)) + val response = consumer.poll(duration.toMillis).map(r => (r.topic, r.partition, r.offset, r.value)) + response.lastOption.foreach { + case (_, _, newOffset, _) => offset = newOffset + 1 + } + response } catch { // Happens if the peek hangs. case _: WakeupException if retry > 0 => @@ -138,14 +147,13 @@ class KafkaConsumerConnector( @volatile private var consumer = getConsumer(getProps, Some(List(topic))) -// Read current lag of the consumed topic, e.g. invoker queue and -// emit kamon histogram metric every 5 seconds -// Since we use only one partition in kafka, it is defined 0 in the metric name - actorSystem.scheduler.schedule(10.second, 5.second) { - val queueSize = consumer.metrics.asScala - .find(_._1.name() == s"$topic-0.records-lag") - .map(_._2.value().toInt) - .getOrElse(0) - MetricEmitter.emitHistogramMetric(LoggingMarkers.KAFKA_QUEUE(topic), queueSize) + // Read current lag of the consumed topic, e.g. invoker queue + // Since we use only one partition in kafka, it is defined 0 + actorSystem.scheduler.schedule(10.second, cfg.metricFlushIntervalS.second) { + val topicAndPartition = Set(new TopicPartition(topic, 0)) + consumer.endOffsets(topicAndPartition.asJava).asScala.find(_._1.topic() == topic).map(_._2).foreach { endOffset => + val queueSize = endOffset - offset + MetricEmitter.emitHistogramMetric(LoggingMarkers.KAFKA_QUEUE(topic), queueSize) + } } } -- To stop receiving notification emails like this one, please contact markusthoem...@apache.org.