[GitHub] [kafka] divijvaidya commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics

2023-06-27 Thread via GitHub


divijvaidya commented on code in PR #13798:
URL: https://github.com/apache/kafka/pull/13798#discussion_r1244076297


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -47,13 +55,19 @@ class AddPartitionsToTxnManager(config: KafkaConfig, 
client: NetworkClient, time
   private val inflightNodes = mutable.HashSet[Node]()
   private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
 
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+  val verificationFailureRate = 
metricsGroup.newMeter("VerificationFailureRate", "failures", TimeUnit.SECONDS)
+  val verificationTimeMs = metricsGroup.newHistogram("VerificationTimeMs")

Review Comment:
   oh, I missed that. Sounds good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics

2023-06-27 Thread via GitHub


divijvaidya commented on code in PR #13798:
URL: https://github.com/apache/kafka/pull/13798#discussion_r1243570850


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -17,25 +17,37 @@
 
 package kafka.server
 
+import 
kafka.server.AddPartitionsToTxnManager.{verificationFailureRateMetricName, 
verificationTimeMsMetricName}
 import kafka.utils.Logging
 import org.apache.kafka.clients.{ClientResponse, NetworkClient, 
RequestCompletionHandler}
 import org.apache.kafka.common.{Node, TopicPartition}
 import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction,
 AddPartitionsToTxnTransactionCollection}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, 
AddPartitionsToTxnResponse}
 import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.util.{InterBrokerSendThread, 
RequestAndCompletionHandler}
 
 import java.util
+import java.util.concurrent.TimeUnit
 import scala.collection.mutable
 
 object AddPartitionsToTxnManager {
   type AppendCallback = Map[TopicPartition, Errors] => Unit
+
+  val verificationFailureRateMetricName = "VerificationFailureRate"

Review Comment:
   Do we want to add a tag for error code as well? That will help us get per 
error code failure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics

2023-06-27 Thread via GitHub


divijvaidya commented on code in PR #13798:
URL: https://github.com/apache/kafka/pull/13798#discussion_r1243569174


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -47,13 +55,19 @@ class AddPartitionsToTxnManager(config: KafkaConfig, 
client: NetworkClient, time
   private val inflightNodes = mutable.HashSet[Node]()
   private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
 
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+  val verificationFailureRate = 
metricsGroup.newMeter("VerificationFailureRate", "failures", TimeUnit.SECONDS)

Review Comment:
   When we bump the version beyond 4 (in future), let's say we have 5 & 6, 
wouldn't we want to ability to look at failure metrics for version 5 only.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics

2023-06-27 Thread via GitHub


divijvaidya commented on code in PR #13798:
URL: https://github.com/apache/kafka/pull/13798#discussion_r1243425245


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -90,30 +109,34 @@ class AddPartitionsToTxnManager(config: KafkaConfig, 
client: NetworkClient, time
 topicPartitionsToError.put(new TopicPartition(topic.name, partition), 
error)
   }
 }
+verificationFailureRate.mark(topicPartitionsToError.size)
 topicPartitionsToError.toMap
   }
 
+  private def sendCallback(callback: AddPartitionsToTxnManager.AppendCallback, 
errorMap: Map[TopicPartition, Errors], startTime: Long): Unit = {

Review Comment:
   s/startTime/startTimeMs so that it is clear what the units are



##
core/src/main/scala/kafka/network/RequestChannel.scala:
##
@@ -240,17 +240,18 @@ object RequestChannel extends Logging {
   val responseSendTimeMs = nanosToMs(endTimeNanos - 
responseDequeueTimeNanos)
   val messageConversionsTimeMs = nanosToMs(messageConversionsTimeNanos)
   val totalTimeMs = nanosToMs(endTimeNanos - startTimeNanos)
-  val fetchMetricNames =
+  val overrideMetricNames =
 if (header.apiKey == ApiKeys.FETCH) {
-  val isFromFollower = body[FetchRequest].isFromFollower
-  Seq(
-if (isFromFollower) RequestMetrics.followFetchMetricName
+  val specifiedMetricName =
+if (body[FetchRequest].isFromFollower) 
RequestMetrics.followFetchMetricName
 else RequestMetrics.consumerFetchMetricName
-  )
+  Seq(specifiedMetricName, header.apiKey.name)
+} else if (header.apiKey == ApiKeys.ADD_PARTITIONS_TO_TXN && 
body[AddPartitionsToTxnRequest].allVerifyOnlyRequest()) {

Review Comment:
   nit
   
   allVerifyOnlyRequest() parenthesis is optional in scala



##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -47,13 +55,19 @@ class AddPartitionsToTxnManager(config: KafkaConfig, 
client: NetworkClient, time
   private val inflightNodes = mutable.HashSet[Node]()
   private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
 
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+  val verificationFailureRate = 
metricsGroup.newMeter("VerificationFailureRate", "failures", TimeUnit.SECONDS)
+  val verificationTimeMs = metricsGroup.newHistogram("VerificationTimeMs")

Review Comment:
   Thank you for the explanation. Makes sense. 
   
   > 1\ use a biased histogram
   
   Could also also please respond to comment around usage of a biased histogram 



##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -17,25 +17,37 @@
 
 package kafka.server
 
+import 
kafka.server.AddPartitionsToTxnManager.{verificationFailureRateMetricName, 
verificationTimeMsMetricName}
 import kafka.utils.Logging
 import org.apache.kafka.clients.{ClientResponse, NetworkClient, 
RequestCompletionHandler}
 import org.apache.kafka.common.{Node, TopicPartition}
 import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction,
 AddPartitionsToTxnTransactionCollection}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, 
AddPartitionsToTxnResponse}
 import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.util.{InterBrokerSendThread, 
RequestAndCompletionHandler}
 
 import java.util
+import java.util.concurrent.TimeUnit
 import scala.collection.mutable
 
 object AddPartitionsToTxnManager {
   type AppendCallback = Map[TopicPartition, Errors] => Unit
+
+  val verificationFailureRateMetricName = "VerificationFailureRate"

Review Comment:
   nit
   
   constant variables start from capital letter (pascal case) in Kafka code 
base from what I have observed 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics

2023-06-26 Thread via GitHub


divijvaidya commented on code in PR #13798:
URL: https://github.com/apache/kafka/pull/13798#discussion_r1242585330


##
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java:
##
@@ -154,6 +154,11 @@ public AddPartitionsToTxnRequest normalizeRequest() {
 return new AddPartitionsToTxnRequest(new 
AddPartitionsToTxnRequestData().setTransactions(singletonTransaction()), 
version());
 }
 
+public boolean verifyOnlyRequest() {
+return version() > 3 &&
+data.transactions().stream().filter(transaction -> 
transaction.verifyOnly()).toArray().length == data.transactions().size();

Review Comment:
   could be replaced with 
`data.transactions().stream().allMatch(AddPartitionsToTxnTransaction::verifyOnly)`
 and this would let the JDK library perform this check optimally. 



##
core/src/main/scala/kafka/network/RequestChannel.scala:
##
@@ -240,16 +240,17 @@ object RequestChannel extends Logging {
   val responseSendTimeMs = nanosToMs(endTimeNanos - 
responseDequeueTimeNanos)
   val messageConversionsTimeMs = nanosToMs(messageConversionsTimeNanos)
   val totalTimeMs = nanosToMs(endTimeNanos - startTimeNanos)
-  val fetchMetricNames =
+  val metricNames =

Review Comment:
   s/metricNames/overrideMetricNames?



##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -47,13 +55,19 @@ class AddPartitionsToTxnManager(config: KafkaConfig, 
client: NetworkClient, time
   private val inflightNodes = mutable.HashSet[Node]()
   private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
 
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+  val verificationFailureRate = 
metricsGroup.newMeter("VerificationFailureRate", "failures", TimeUnit.SECONDS)

Review Comment:
   we are using `VerificationFailureRate` at couple of places such as shutdown 
and in tests. Please move to a constant in companion object. 



##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -47,13 +55,19 @@ class AddPartitionsToTxnManager(config: KafkaConfig, 
client: NetworkClient, time
   private val inflightNodes = mutable.HashSet[Node]()
   private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
 
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+  val verificationFailureRate = 
metricsGroup.newMeter("VerificationFailureRate", "failures", TimeUnit.SECONDS)

Review Comment:
   you might be interested in adding "version" to the tags



##
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##
@@ -2158,6 +2158,12 @@ object TestUtils extends Logging {
   KafkaYammerMetrics.defaultRegistry.removeMetric(metricName)
   }
 
+  def clearYammerMetric(metricName: String): Unit = {

Review Comment:
   where are we using this?



##
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java:
##
@@ -154,6 +154,11 @@ public AddPartitionsToTxnRequest normalizeRequest() {
 return new AddPartitionsToTxnRequest(new 
AddPartitionsToTxnRequestData().setTransactions(singletonTransaction()), 
version());
 }
 
+public boolean verifyOnlyRequest() {
+return version() > 3 &&

Review Comment:
   (optional)
   May I suggest adding a constant, `EARLIEST_SUPPORTED_VERSION` to 
`AddPartitionsToTxnManager` and using the constant over here. It greatly helps 
in code readability. 



##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -47,13 +55,19 @@ class AddPartitionsToTxnManager(config: KafkaConfig, 
client: NetworkClient, time
   private val inflightNodes = mutable.HashSet[Node]()
   private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
 
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+  val verificationFailureRate = 
metricsGroup.newMeter("VerificationFailureRate", "failures", TimeUnit.SECONDS)
+  val verificationTimeMs = metricsGroup.newHistogram("VerificationTimeMs")

Review Comment:
   We probably want this (and failure) metric consistent with metrics that we 
have for other requests. To make it consistent, we will have to do things like, 
1\ use a biased histogram 2\ add tags such as request=AddPartitionsToTxn.
   
   Please reference RequestChannel#RequestMetrics on how other APIs are 
capturing metrics.



##
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java:
##
@@ -154,6 +154,11 @@ public AddPartitionsToTxnRequest normalizeRequest() {
 return new AddPartitionsToTxnRequest(new 
AddPartitionsToTxnRequestData().setTransactions(singletonTransaction()), 
version());
 }
 
+public boolean verifyOnlyRequest() {

Review Comment:
   It would be nice if we could add a java doc saying that we expect all 
requests from clients which are version > 3 to contain verifyOnly 

[GitHub] [kafka] divijvaidya commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics

2023-06-20 Thread via GitHub


divijvaidya commented on code in PR #13798:
URL: https://github.com/apache/kafka/pull/13798#discussion_r1235924422


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -42,13 +50,19 @@ class AddPartitionsToTxnManager(config: KafkaConfig, 
client: NetworkClient, time
   private val inflightNodes = mutable.HashSet[Node]()
   private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
 
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+  val verificationFailureRate = 
metricsGroup.newMeter("VerificationFailureRate", "failures", TimeUnit.SECONDS)

Review Comment:
   You probably want to unregister these metrics when the thread is shutdown. 
Also, please add a test to validate that metrics are being removed correctly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org