This is an automated email from the ASF dual-hosted git repository. chetanm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new 658516e Track Kafka client side metrics via Kamon (#4481) 658516e is described below commit 658516e6db1bf65ea4bd82e2b5e1ef10f69e0391 Author: Chetan Mehrotra <chet...@apache.org> AuthorDate: Wed May 29 09:44:54 2019 +0530 Track Kafka client side metrics via Kamon (#4481) Adds a configurable MetricsReporter to route Kafka metrics to Kamon once enabled. Set of metrics names which need to be captured needs to be explicitly configured --- common/scala/src/main/resources/application.conf | 15 +++ .../connector/kafka/KafkaConsumerConnector.scala | 1 + .../connector/kafka/KamonMetricsReporter.scala | 132 +++++++++++++++++++++ 3 files changed, 148 insertions(+) diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf index 9d122b7..7fd870e 100644 --- a/common/scala/src/main/resources/application.conf +++ b/common/scala/src/main/resources/application.conf @@ -95,6 +95,10 @@ whisk { common { security-protocol = PLAINTEXT ssl-endpoint-identification-algorithm = "" // restores pre-kafka 2.0.0 default + + //Enable this for reporting Kafka client metrics + //metric-reporters = "org.apache.openwhisk.connector.kafka.KamonMetricsReporter" + } producer { acks = 1 @@ -154,6 +158,17 @@ whisk { retention-ms = 3600000 } } + + metrics { + // Name of metrics which should be tracked via Kamon + names = [ + // consumer-fetch-manager-metrics + "records-lag-max", // The maximum lag in terms of number of records for any partition in this window + "records-consumed-total" // The total number of records consumed + ] + + report-interval = 10 seconds + } } # db related configuration db { diff --git a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaConsumerConnector.scala b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaConsumerConnector.scala index 7e673d1..b75c689 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaConsumerConnector.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaConsumerConnector.scala @@ -142,6 +142,7 @@ class KafkaConsumerConnector( /** Creates a new kafka consumer and subscribes to topic list if given. */ private def createConsumer(topic: String) = { val config = Map( + ConsumerConfig.CLIENT_ID_CONFIG -> s"consumer-$topic", ConsumerConfig.GROUP_ID_CONFIG -> groupid, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> kafkahost, ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> maxPeek.toString) ++ diff --git a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KamonMetricsReporter.scala b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KamonMetricsReporter.scala new file mode 100644 index 0000000..4c1d792 --- /dev/null +++ b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KamonMetricsReporter.scala @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.connector.kafka + +import java.util +import java.util.concurrent.{ScheduledFuture, TimeUnit} + +import kamon.Kamon +import kamon.metric.{Counter, Gauge, Metric} +import org.apache.kafka.common.MetricName +import org.apache.kafka.common.metrics.stats.Total +import org.apache.kafka.common.metrics.{KafkaMetric, MetricsReporter} +import org.apache.openwhisk.core.ConfigKeys +import pureconfig.loadConfigOrThrow + +import scala.collection.concurrent.TrieMap +import scala.concurrent.duration.FiniteDuration +import scala.util.{Success, Try} + +class KamonMetricsReporter extends MetricsReporter { + import KamonMetricsReporter._ + private val metrics = new TrieMap[MetricName, MetricBridge]() + private val metricConfig = loadConfigOrThrow[KafkaMetricConfig](s"${ConfigKeys.kafka}.metrics") + @volatile + private var updater: Option[ScheduledFuture[_]] = None + + override def init(metrics: util.List[KafkaMetric]): Unit = metrics.forEach(add) + + override def metricChange(metric: KafkaMetric): Unit = { + remove(metric) + add(metric) + } + + override def metricRemoval(metric: KafkaMetric): Unit = remove(metric) + + override def close(): Unit = updater.foreach(_.cancel(false)) + + override def configure(configs: util.Map[String, _]): Unit = { + val interval = metricConfig.reportInterval.toSeconds + val f = Kamon.scheduler().scheduleAtFixedRate(() => updateAll(), interval, interval, TimeUnit.SECONDS) + updater = Some(f) + } + + private def add(metric: KafkaMetric): Unit = { + val mn = metric.metricName() + if (metricConfig.names.contains(mn.name()) && shouldIncludeMetric(mn)) { + val tags = mn.tags() + val metricName = kamonName(mn) + val bridge = if (isCounterMetric(metric)) { + val counter = Kamon.counter(metricName) + new CounterBridge(metric, counter, counter.refine(tags)) + } else { + val gauge = Kamon.gauge(metricName) + new GaugeBridge(metric, gauge, gauge.refine(tags)) + } + metrics.putIfAbsent(mn, bridge) + } + } + + private def remove(metric: KafkaMetric) = metrics.remove(metric.metricName()).foreach(_.remove()) + + private def updateAll(): Unit = metrics.values.foreach(_.update()) +} + +object KamonMetricsReporter { + private val excludedTopicAttributes = Set("records-lag-max", "records-consumed-total", "bytes-consumed-total") + + case class KafkaMetricConfig(names: Set[String], reportInterval: FiniteDuration) + + abstract class MetricBridge(val kafkaMetric: KafkaMetric, kamonMetric: Metric[_]) { + def remove(): Unit = kamonMetric.remove(kafkaMetric.metricName().tags()) + def update(): Unit + + def metricValue: Long = + Try(kafkaMetric.metricValue()) + .map { + case d: java.lang.Double => d.toLong + case _ => 0L + } + .getOrElse(0L) + } + + class GaugeBridge(kafkaMetric: KafkaMetric, kamonMetric: Metric[_], gauge: Gauge) + extends MetricBridge(kafkaMetric, kamonMetric) { + override def update(): Unit = gauge.set(metricValue) + } + + class CounterBridge(kafkaMetric: KafkaMetric, kamonMetric: Metric[_], counter: Counter) + extends MetricBridge(kafkaMetric, kamonMetric) { + @volatile + private var lastValue: Long = 0 + override def update(): Unit = { + val newValue = metricValue + counter.increment(newValue - lastValue) + lastValue = newValue + } + } + + def kamonName(mn: MetricName): String = { + //Drop the `-total` suffix as it results in prometheus metrics ending with total twice + val name = if (mn.name().endsWith("-total")) mn.name().dropRight(6) else mn.name() + s"${mn.group()}_$name" + } + + def isCounterMetric(metric: KafkaMetric): Boolean = Try(metric.measurable()) match { + case Success(_: Total) => true + case _ => false + } + + def shouldIncludeMetric(m: MetricName): Boolean = { + //Avoid duplicate metrics for specific cases which are recorded at multiple level + //For example `bytes-consumed-total` is recorded at consumer and topic level. As we use a 1-1 consumer per topic + //We can drop the lag recording at topic level + if (excludedTopicAttributes.contains(m.name())) !m.tags().containsKey("topic") + else true + } +}