This is an automated email from the ASF dual-hosted git repository.

markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 24d10fb  Measure kafka queue in consumer with better exactitude. 
(#3423)
24d10fb is described below

commit 24d10fb6825dbd6bdd8b7896b924143751c8b2e0
Author: Vadim Raskin <raskinva...@gmail.com>
AuthorDate: Tue Mar 13 11:53:45 2018 +0100

    Measure kafka queue in consumer with better exactitude. (#3423)
    
    Measure kafka queue metric directly in the consumer by comparing the 
offsets instead of relying on the built-in consumer metrics. This method 
provides more precision compared to the old approach, it was observed that the 
built-in metrics show certain lagging in case of burst.
    Additionally make kamon flush cadency configurable in the application.conf.
---
 common/scala/src/main/resources/application.conf   |  1 +
 .../connector/kafka/KafkaConsumerConnector.scala   | 30 ++++++++++++++--------
 2 files changed, 20 insertions(+), 11 deletions(-)

diff --git a/common/scala/src/main/resources/application.conf 
b/common/scala/src/main/resources/application.conf
index 9470bd3..402ae5d 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -73,6 +73,7 @@ whisk {
             // A low value improves latency performance but it is important to 
not set it too low
             // as that will cause excessive busy-waiting.
             fetch-max-wait-ms = 20
+            metric-flush-interval-s = 60
         }
 
         topics {
diff --git 
a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
 
b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
index 26a0f86..fc0954e 100644
--- 
a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
+++ 
b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
@@ -21,6 +21,7 @@ import java.util.Properties
 
 import akka.actor.ActorSystem
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.{RetriableException, WakeupException}
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import pureconfig.loadConfigOrThrow
@@ -33,7 +34,7 @@ import scala.concurrent.duration._
 import scala.concurrent.{ExecutionContext, Future}
 import scala.collection.JavaConverters._
 
-case class KafkaConsumerConfig(sessionTimeoutMs: Long)
+case class KafkaConsumerConfig(sessionTimeoutMs: Long, metricFlushIntervalS: 
Int)
 
 class KafkaConsumerConnector(
   kafkahost: String,
@@ -49,6 +50,10 @@ class KafkaConsumerConnector(
   // logic, like the wakeup timer.
   private val cfg = 
loadConfigOrThrow[KafkaConsumerConfig](ConfigKeys.kafkaConsumer)
 
+  // Currently consumed offset, is used to calculate the topic lag.
+  // It is updated from one thread in "peek", no concurrent data structure is 
necessary
+  private var offset: Long = 0
+
   /**
    * Long poll for messages. Method returns once message are available but no 
later than given
    * duration.
@@ -62,7 +67,11 @@ class KafkaConsumerConnector(
     val wakeUpTask = 
actorSystem.scheduler.scheduleOnce(cfg.sessionTimeoutMs.milliseconds + 
1.second)(consumer.wakeup())
 
     try {
-      consumer.poll(duration.toMillis).map(r => (r.topic, r.partition, 
r.offset, r.value))
+      val response = consumer.poll(duration.toMillis).map(r => (r.topic, 
r.partition, r.offset, r.value))
+      response.lastOption.foreach {
+        case (_, _, newOffset, _) => offset = newOffset + 1
+      }
+      response
     } catch {
       // Happens if the peek hangs.
       case _: WakeupException if retry > 0 =>
@@ -138,14 +147,13 @@ class KafkaConsumerConnector(
 
   @volatile private var consumer = getConsumer(getProps, Some(List(topic)))
 
-//  Read current lag of the consumed topic, e.g. invoker queue and
-//  emit kamon histogram metric every 5 seconds
-//  Since we use only one partition in kafka, it is defined 0 in the metric 
name
-  actorSystem.scheduler.schedule(10.second, 5.second) {
-    val queueSize = consumer.metrics.asScala
-      .find(_._1.name() == s"$topic-0.records-lag")
-      .map(_._2.value().toInt)
-      .getOrElse(0)
-    MetricEmitter.emitHistogramMetric(LoggingMarkers.KAFKA_QUEUE(topic), 
queueSize)
+  //  Read current lag of the consumed topic, e.g. invoker queue
+  //  Since we use only one partition in kafka, it is defined 0
+  actorSystem.scheduler.schedule(10.second, cfg.metricFlushIntervalS.second) {
+    val topicAndPartition = Set(new TopicPartition(topic, 0))
+    consumer.endOffsets(topicAndPartition.asJava).asScala.find(_._1.topic() == 
topic).map(_._2).foreach { endOffset =>
+      val queueSize = endOffset - offset
+      MetricEmitter.emitHistogramMetric(LoggingMarkers.KAFKA_QUEUE(topic), 
queueSize)
+    }
   }
 }

-- 
To stop receiving notification emails like this one, please contact
markusthoem...@apache.org.

Reply via email to