This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 66709c0  KAFKA-9788; Use distinct names for transaction and group load 
time sensors (#8784)
66709c0 is described below

commit 66709c02229714c46f8735e53d7b12156d6a4e3b
Author: Bob Barrett <[email protected]>
AuthorDate: Wed Jun 3 17:53:30 2020 -0700

    KAFKA-9788; Use distinct names for transaction and group load time sensors 
(#8784)
    
    Sensor objects are stored in the Kafka metrics registry and keyed by name. 
If a new sensor is created with the same name as an existing one, the existing 
one is returned rather than a new object being created. The partition load time 
sensors for the transaction and group coordinators used the same name, so data 
recorded to either was stored in the same object. This meant that the metrics 
values for both metrics were identical and consisted of the combined data. This 
patch changes the  [...]
    
    Reviewers: Jason Gustafson <[email protected]>
---
 .../scala/kafka/coordinator/group/GroupMetadataManager.scala  | 11 +++++++----
 .../coordinator/transaction/TransactionStateManager.scala     |  9 ++++++---
 2 files changed, 13 insertions(+), 7 deletions(-)

diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index aba6a5f..4898e22 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -49,9 +49,9 @@ import org.apache.kafka.common.requests.{OffsetCommitRequest, 
OffsetFetchRespons
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 
-import scala.jdk.CollectionConverters._
 import scala.collection._
 import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
 
 class GroupMetadataManager(brokerId: Int,
                            interBrokerProtocolVersion: ApiVersion,
@@ -89,13 +89,13 @@ class GroupMetadataManager(brokerId: Int,
   private val openGroupsForProducer = mutable.HashMap[Long, 
mutable.Set[String]]()
 
   /* setup metrics*/
-  val partitionLoadSensor = metrics.sensor("PartitionLoadTime")
+  private val partitionLoadSensor = 
metrics.sensor(GroupMetadataManager.LoadTimeSensor)
 
   partitionLoadSensor.add(metrics.metricName("partition-load-time-max",
-    "group-coordinator-metrics",
+    GroupMetadataManager.MetricsGroup,
     "The max time it took to load the partitions in the last 30sec"), new 
Max())
   partitionLoadSensor.add(metrics.metricName("partition-load-time-avg",
-    "group-coordinator-metrics",
+    GroupMetadataManager.MetricsGroup,
     "The avg time it took to load the partitions in the last 30sec"), new 
Avg())
 
   val offsetCommitsSensor = metrics.sensor("OffsetCommits")
@@ -991,6 +991,9 @@ class GroupMetadataManager(brokerId: Int,
  *    -> value version 0:       [protocol_type, generation, protocol, leader, 
members]
  */
 object GroupMetadataManager {
+  // Metrics names
+  val MetricsGroup: String = "group-coordinator-metrics"
+  val LoadTimeSensor: String = "GroupPartitionLoadTime"
 
   private val CURRENT_OFFSET_KEY_SCHEMA_VERSION = 1.toShort
   private val CURRENT_GROUP_KEY_SCHEMA_VERSION = 2.toShort
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index a96857c..8eb65df 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -48,6 +48,9 @@ object TransactionStateManager {
   val DefaultTransactionalIdExpirationMs: Int = TimeUnit.DAYS.toMillis(7).toInt
   val DefaultAbortTimedOutTransactionsIntervalMs: Int = 
TimeUnit.SECONDS.toMillis(10).toInt
   val DefaultRemoveExpiredTransactionalIdsIntervalMs: Int = 
TimeUnit.HOURS.toMillis(1).toInt
+
+  val MetricsGroup: String = "transaction-coordinator-metrics"
+  val LoadTimeSensor: String = "TransactionsPartitionLoadTime"
 }
 
 /**
@@ -95,13 +98,13 @@ class TransactionStateManager(brokerId: Int,
   private val transactionTopicPartitionCount = 
getTransactionTopicPartitionCount
 
   /** setup metrics*/
-  private val partitionLoadSensor = metrics.sensor("PartitionLoadTime")
+  private val partitionLoadSensor = 
metrics.sensor(TransactionStateManager.LoadTimeSensor)
 
   partitionLoadSensor.add(metrics.metricName("partition-load-time-max",
-    "transaction-coordinator-metrics",
+    TransactionStateManager.MetricsGroup,
     "The max time it took to load the partitions in the last 30sec"), new 
Max())
   partitionLoadSensor.add(metrics.metricName("partition-load-time-avg",
-    "transaction-coordinator-metrics",
+    TransactionStateManager.MetricsGroup,
     "The avg time it took to load the partitions in the last 30sec"), new 
Avg())
 
   // visible for testing only

Reply via email to