divijvaidya commented on code in PR #13798: URL: https://github.com/apache/kafka/pull/13798#discussion_r1242585330
########## clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java: ########## @@ -154,6 +154,11 @@ public AddPartitionsToTxnRequest normalizeRequest() { return new AddPartitionsToTxnRequest(new AddPartitionsToTxnRequestData().setTransactions(singletonTransaction()), version()); } + public boolean verifyOnlyRequest() { + return version() > 3 && + data.transactions().stream().filter(transaction -> transaction.verifyOnly()).toArray().length == data.transactions().size(); Review Comment: could be replaced with `data.transactions().stream().allMatch(AddPartitionsToTxnTransaction::verifyOnly)` and this would let the JDK library perform this check optimally. ########## core/src/main/scala/kafka/network/RequestChannel.scala: ########## @@ -240,16 +240,17 @@ object RequestChannel extends Logging { val responseSendTimeMs = nanosToMs(endTimeNanos - responseDequeueTimeNanos) val messageConversionsTimeMs = nanosToMs(messageConversionsTimeNanos) val totalTimeMs = nanosToMs(endTimeNanos - startTimeNanos) - val fetchMetricNames = + val metricNames = Review Comment: s/metricNames/overrideMetricNames? ########## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ########## @@ -47,13 +55,19 @@ class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time private val inflightNodes = mutable.HashSet[Node]() private val nodesToTransactions = mutable.Map[Node, TransactionDataAndCallbacks]() + private val metricsGroup = new KafkaMetricsGroup(this.getClass) + val verificationFailureRate = metricsGroup.newMeter("VerificationFailureRate", "failures", TimeUnit.SECONDS) Review Comment: we are using `VerificationFailureRate` at couple of places such as shutdown and in tests. Please move to a constant in companion object. ########## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ########## @@ -47,13 +55,19 @@ class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time private val inflightNodes = mutable.HashSet[Node]() private val nodesToTransactions = mutable.Map[Node, TransactionDataAndCallbacks]() + private val metricsGroup = new KafkaMetricsGroup(this.getClass) + val verificationFailureRate = metricsGroup.newMeter("VerificationFailureRate", "failures", TimeUnit.SECONDS) Review Comment: you might be interested in adding "version" to the tags ########## core/src/test/scala/unit/kafka/utils/TestUtils.scala: ########## @@ -2158,6 +2158,12 @@ object TestUtils extends Logging { KafkaYammerMetrics.defaultRegistry.removeMetric(metricName) } + def clearYammerMetric(metricName: String): Unit = { Review Comment: where are we using this? ########## clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java: ########## @@ -154,6 +154,11 @@ public AddPartitionsToTxnRequest normalizeRequest() { return new AddPartitionsToTxnRequest(new AddPartitionsToTxnRequestData().setTransactions(singletonTransaction()), version()); } + public boolean verifyOnlyRequest() { + return version() > 3 && Review Comment: (optional) May I suggest adding a constant, `EARLIEST_SUPPORTED_VERSION` to `AddPartitionsToTxnManager` and using the constant over here. It greatly helps in code readability. ########## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ########## @@ -47,13 +55,19 @@ class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time private val inflightNodes = mutable.HashSet[Node]() private val nodesToTransactions = mutable.Map[Node, TransactionDataAndCallbacks]() + private val metricsGroup = new KafkaMetricsGroup(this.getClass) + val verificationFailureRate = metricsGroup.newMeter("VerificationFailureRate", "failures", TimeUnit.SECONDS) + val verificationTimeMs = metricsGroup.newHistogram("VerificationTimeMs") Review Comment: We probably want this (and failure) metric consistent with metrics that we have for other requests. To make it consistent, we will have to do things like, 1\ use a biased histogram 2\ add tags such as request=AddPartitionsToTxn. Please reference RequestChannel#RequestMetrics on how other APIs are capturing metrics. ########## clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java: ########## @@ -154,6 +154,11 @@ public AddPartitionsToTxnRequest normalizeRequest() { return new AddPartitionsToTxnRequest(new AddPartitionsToTxnRequestData().setTransactions(singletonTransaction()), version()); } + public boolean verifyOnlyRequest() { Review Comment: It would be nice if we could add a java doc saying that we expect all requests from clients which are version > 3 to contain verifyOnly field. Asking for java doc because this is an usual check for verifyOnly since typically we don't expect all fields in request to contain "--verifyOnly". -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org