divijvaidya commented on code in PR #13798:
URL: https://github.com/apache/kafka/pull/13798#discussion_r1243425245


##########
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##########
@@ -90,30 +109,34 @@ class AddPartitionsToTxnManager(config: KafkaConfig, 
client: NetworkClient, time
         topicPartitionsToError.put(new TopicPartition(topic.name, partition), 
error)
       }
     }
+    verificationFailureRate.mark(topicPartitionsToError.size)
     topicPartitionsToError.toMap
   }
 
+  private def sendCallback(callback: AddPartitionsToTxnManager.AppendCallback, 
errorMap: Map[TopicPartition, Errors], startTime: Long): Unit = {

Review Comment:
   s/startTime/startTimeMs so that it is clear what the units are



##########
core/src/main/scala/kafka/network/RequestChannel.scala:
##########
@@ -240,17 +240,18 @@ object RequestChannel extends Logging {
       val responseSendTimeMs = nanosToMs(endTimeNanos - 
responseDequeueTimeNanos)
       val messageConversionsTimeMs = nanosToMs(messageConversionsTimeNanos)
       val totalTimeMs = nanosToMs(endTimeNanos - startTimeNanos)
-      val fetchMetricNames =
+      val overrideMetricNames =
         if (header.apiKey == ApiKeys.FETCH) {
-          val isFromFollower = body[FetchRequest].isFromFollower
-          Seq(
-            if (isFromFollower) RequestMetrics.followFetchMetricName
+          val specifiedMetricName =
+            if (body[FetchRequest].isFromFollower) 
RequestMetrics.followFetchMetricName
             else RequestMetrics.consumerFetchMetricName
-          )
+          Seq(specifiedMetricName, header.apiKey.name)
+        } else if (header.apiKey == ApiKeys.ADD_PARTITIONS_TO_TXN && 
body[AddPartitionsToTxnRequest].allVerifyOnlyRequest()) {

Review Comment:
   nit
   
   allVerifyOnlyRequest() parenthesis is optional in scala



##########
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##########
@@ -47,13 +55,19 @@ class AddPartitionsToTxnManager(config: KafkaConfig, 
client: NetworkClient, time
   private val inflightNodes = mutable.HashSet[Node]()
   private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
 
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+  val verificationFailureRate = 
metricsGroup.newMeter("VerificationFailureRate", "failures", TimeUnit.SECONDS)
+  val verificationTimeMs = metricsGroup.newHistogram("VerificationTimeMs")

Review Comment:
   Thank you for the explanation. Makes sense. 
   
   > 1\ use a biased histogram
   
   Could also also please respond to comment around usage of a biased histogram 



##########
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##########
@@ -17,25 +17,37 @@
 
 package kafka.server
 
+import 
kafka.server.AddPartitionsToTxnManager.{verificationFailureRateMetricName, 
verificationTimeMsMetricName}
 import kafka.utils.Logging
 import org.apache.kafka.clients.{ClientResponse, NetworkClient, 
RequestCompletionHandler}
 import org.apache.kafka.common.{Node, TopicPartition}
 import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction,
 AddPartitionsToTxnTransactionCollection}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, 
AddPartitionsToTxnResponse}
 import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.util.{InterBrokerSendThread, 
RequestAndCompletionHandler}
 
 import java.util
+import java.util.concurrent.TimeUnit
 import scala.collection.mutable
 
 object AddPartitionsToTxnManager {
   type AppendCallback = Map[TopicPartition, Errors] => Unit
+
+  val verificationFailureRateMetricName = "VerificationFailureRate"

Review Comment:
   nit
   
   constant variables start from capital letter (pascal case) in Kafka code 
base from what I have observed 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to