cadonna commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r694676118



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -1873,7 +1875,13 @@ public OffsetAndMetadata committed(TopicPartition 
partition, final Duration time
         acquireAndEnsureOpen();
         try {
             maybeThrowInvalidGroupIdException();
-            Map<TopicPartition, OffsetAndMetadata> offsets = 
coordinator.fetchCommittedOffsets(partitions, time.timer(timeout));
+            final Map<TopicPartition, OffsetAndMetadata> offsets;
+            long start = time.nanoseconds();
+            try {
+                offsets = coordinator.fetchCommittedOffsets(partitions, 
time.timer(timeout));
+            } finally {
+                kafkaConsumerMetrics.recordCommitted(time.nanoseconds() - 
start);
+            }

Review comment:
       Could you please add unit tests for this change?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -1873,7 +1875,13 @@ public OffsetAndMetadata committed(TopicPartition 
partition, final Duration time
         acquireAndEnsureOpen();
         try {
             maybeThrowInvalidGroupIdException();

Review comment:
       Why do you exclude this check in the measured time here but include it 
above? Similar applies to `offsets.forEach(this::updateLastSeenEpochIfNewer)`.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetrics.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.producer.internals;
+
+import java.util.Map;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;

Review comment:
       ```suggestion
   import org.apache.kafka.common.MetricName;
   import org.apache.kafka.common.metrics.Metrics;
   import org.apache.kafka.common.metrics.Sensor;
   import org.apache.kafka.common.metrics.stats.CumulativeSum;
   
   import java.util.Map;
   ```

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##########
@@ -699,7 +706,9 @@ public void sendOffsetsToTransaction(Map<TopicPartition, 
OffsetAndMetadata> offs
         throwIfProducerClosed();
         TransactionalRequestResult result = 
transactionManager.sendOffsetsToTransaction(offsets, groupMetadata);
         sender.wakeup();

Review comment:
       Why are those lines not included in the measurement?

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -905,6 +971,57 @@ public void testSendTxnOffsetsWithGroupId() {
         }
     }
 
+    private double getAndAssertDuration(KafkaProducer<?, ?> producer, String 
name, double floor) {
+        double value = getMetricValue(producer, name);
+        assertTrue(value > floor);
+        return value;
+    }
+
+    @Test
+    public void testMeasureTransactionDurations() {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
+        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        Time time = new MockTime(1);
+        MetadataResponse initialUpdateResponse = 
RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+        ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);
+
+        MockClient client = new MockClient(time, metadata);
+        client.updateMetadata(initialUpdateResponse);
+        
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
"some.id", host1));
+        client.prepareResponse(initProducerIdResponse(1L, (short) 5, 
Errors.NONE));
+
+        try (KafkaProducer<String, String> producer = kafkaProducer(configs, 
new StringSerializer(),
+            new StringSerializer(), metadata, client, null, time)) {
+            producer.initTransactions();
+            assertTrue(getMetricValue(producer, "txn-init-time-total") > 
999999);

Review comment:
       I am not sure I understand this verification. Could you elaborate?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
##########
@@ -67,22 +69,35 @@
     private final Map<String, Object> eosV2ProducerConfigs;
     private final KafkaClientSupplier clientSupplier;
     private final StreamThread.ProcessingMode processingMode;
+    private final Time time;
 
     private Producer<byte[], byte[]> producer;
     private boolean transactionInFlight = false;
     private boolean transactionInitialized = false;
+    private double oldProducerTotalBlockedTime = 0;
 
     public StreamsProducer(final StreamsConfig config,
                            final String threadId,
                            final KafkaClientSupplier clientSupplier,
                            final TaskId taskId,
                            final UUID processId,
                            final LogContext logContext) {
+        this(config, threadId, clientSupplier, taskId, processId, logContext, 
Time.SYSTEM);
+    }

Review comment:
       That is not my point. My point is that the objects that call the 
constructor, i.e. tasks and threads, have a time object that they use for the 
their metrics (and probably for other purposes). Now that we also have metrics 
in the `StreamsProducer` that needs a time object, it is inconsistent to create 
a new time object in the constructor instead of passing along the time object 
from tasks and threads into the `StreamProducer`.  

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -1493,6 +1494,7 @@ public void commitSync(final Map<TopicPartition, 
OffsetAndMetadata> offsets, fin
                         "committing offsets " + offsets);
             }
         } finally {
+            kafkaConsumerMetrics.recordCommitSync(time.nanoseconds() - 
commitStart);

Review comment:
       Could you please add unit tests for this change?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetrics.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.producer.internals;
+
+import java.util.Map;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+
+public class KafkaProducerMetrics implements AutoCloseable {

Review comment:
       Could you please add unit tests for this class?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetrics.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.producer.internals;
+
+import java.util.Map;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+
+public class KafkaProducerMetrics implements AutoCloseable {
+
+    public static final String GROUP = "producer-metrics";
+    private static final String FLUSH = "flush";
+    private static final String TXN_INIT = "txn-init";
+    private static final String TXN_BEGIN = "txn-begin";
+    private static final String TXN_SEND_OFFSETS = "txn-send-offsets";
+    private static final String TXN_COMMIT = "txn-commit";
+    private static final String TXN_ABORT = "txn-abort";
+    private static final String TOTAL_TIME_SUFFIX = "-time-total";
+
+    final Map<String, String> tags;
+    final Metrics metrics;
+    final Sensor initTimeSensor;
+    final Sensor beginTxnTimeSensor;
+    final Sensor flushTimeSensor;
+    final Sensor sendOffsetsSensor;
+    final Sensor commitTxnSensor;
+    final Sensor abortTxnSensor;

Review comment:
       Could you please specify this member fields as `private`?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetrics.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.producer.internals;
+
+import java.util.Map;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+
+public class KafkaProducerMetrics implements AutoCloseable {
+
+    public static final String GROUP = "producer-metrics";
+    private static final String FLUSH = "flush";
+    private static final String TXN_INIT = "txn-init";
+    private static final String TXN_BEGIN = "txn-begin";
+    private static final String TXN_SEND_OFFSETS = "txn-send-offsets";
+    private static final String TXN_COMMIT = "txn-commit";
+    private static final String TXN_ABORT = "txn-abort";
+    private static final String TOTAL_TIME_SUFFIX = "-time-total";
+
+    final Map<String, String> tags;
+    final Metrics metrics;
+    final Sensor initTimeSensor;
+    final Sensor beginTxnTimeSensor;
+    final Sensor flushTimeSensor;
+    final Sensor sendOffsetsSensor;
+    final Sensor commitTxnSensor;
+    final Sensor abortTxnSensor;
+
+    public KafkaProducerMetrics(Metrics metrics) {
+        this.metrics = metrics;
+        this.tags = this.metrics.config().tags();
+        this.flushTimeSensor = newLatencySensor(FLUSH);
+        this.initTimeSensor = newLatencySensor(TXN_INIT);
+        this.beginTxnTimeSensor = newLatencySensor(TXN_BEGIN);
+        this.sendOffsetsSensor = newLatencySensor(TXN_SEND_OFFSETS);
+        this.commitTxnSensor = newLatencySensor(TXN_COMMIT);
+        this.abortTxnSensor = newLatencySensor(TXN_ABORT);

Review comment:
       ```suggestion
           this.tags = this.metrics.config().tags();
           this.flushTimeSensor = newLatencySensor(FLUSH);
           initTimeSensor = newLatencySensor(TXN_INIT);
           beginTxnTimeSensor = newLatencySensor(TXN_BEGIN);
           sendOffsetsSensor = newLatencySensor(TXN_SEND_OFFSETS);
           commitTxnSensor = newLatencySensor(TXN_COMMIT);
           abortTxnSensor = newLatencySensor(TXN_ABORT);
   ```

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##########
@@ -1124,12 +1137,16 @@ private void ensureValidRecordSize(int size) {
     @Override
     public void flush() {
         log.trace("Flushing accumulated records in producer.");
+
         this.accumulator.beginFlush();
         this.sender.wakeup();

Review comment:
       Why are those lines not included in the measurement? They do contribute 
to the flush time, right?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetrics.java
##########
@@ -63,6 +66,18 @@ public KafkaConsumerMetrics(Metrics metrics, String 
metricGrpPrefix) {
                 metricGroupName,
                 "The average fraction of time the consumer's poll() is idle as 
opposed to waiting for the user code to process records."),
                 new Avg());
+
+        this.commitSyncSensor = metrics.sensor("commit-sync-time-total");
+        this.commitSyncSensor.add(
+            metrics.metricName("commit-sync-time-total", metricGroupName),
+            new CumulativeSum()
+        );
+
+        this.committedSensor = metrics.sensor("committed-time-total");
+        this.committedSensor.add(
+            metrics.metricName("committed-time-total", metricGroupName),
+            new CumulativeSum()
+        );

Review comment:
       I think you forgot to remove the sensors in `close()`. I know that there 
do not exist unit tests for this class, but maybe you should add them. Maybe in 
a separate PR to not make this PR larger as needed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to