cadonna commented on a change in pull request #11149: URL: https://github.com/apache/kafka/pull/11149#discussion_r694676118
########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ########## @@ -1873,7 +1875,13 @@ public OffsetAndMetadata committed(TopicPartition partition, final Duration time acquireAndEnsureOpen(); try { maybeThrowInvalidGroupIdException(); - Map<TopicPartition, OffsetAndMetadata> offsets = coordinator.fetchCommittedOffsets(partitions, time.timer(timeout)); + final Map<TopicPartition, OffsetAndMetadata> offsets; + long start = time.nanoseconds(); + try { + offsets = coordinator.fetchCommittedOffsets(partitions, time.timer(timeout)); + } finally { + kafkaConsumerMetrics.recordCommitted(time.nanoseconds() - start); + } Review comment: Could you please add unit tests for this change? ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ########## @@ -1873,7 +1875,13 @@ public OffsetAndMetadata committed(TopicPartition partition, final Duration time acquireAndEnsureOpen(); try { maybeThrowInvalidGroupIdException(); Review comment: Why do you exclude this check in the measured time here but include it above? Similar applies to `offsets.forEach(this::updateLastSeenEpochIfNewer)`. ########## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetrics.java ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.producer.internals; + +import java.util.Map; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.CumulativeSum; Review comment: ```suggestion import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.CumulativeSum; import java.util.Map; ``` ########## File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ########## @@ -699,7 +706,9 @@ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offs throwIfProducerClosed(); TransactionalRequestResult result = transactionManager.sendOffsetsToTransaction(offsets, groupMetadata); sender.wakeup(); Review comment: Why are those lines not included in the measurement? ########## File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java ########## @@ -905,6 +971,57 @@ public void testSendTxnOffsetsWithGroupId() { } } + private double getAndAssertDuration(KafkaProducer<?, ?> producer, String name, double floor) { + double value = getMetricValue(producer, name); + assertTrue(value > floor); + return value; + } + + @Test + public void testMeasureTransactionDurations() { + Map<String, Object> configs = new HashMap<>(); + configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id"); + configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000); + configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); + Time time = new MockTime(1); + MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1)); + ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE); + + MockClient client = new MockClient(time, metadata); + client.updateMetadata(initialUpdateResponse); + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1)); + client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE)); + + try (KafkaProducer<String, String> producer = kafkaProducer(configs, new StringSerializer(), + new StringSerializer(), metadata, client, null, time)) { + producer.initTransactions(); + assertTrue(getMetricValue(producer, "txn-init-time-total") > 999999); Review comment: I am not sure I understand this verification. Could you elaborate? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java ########## @@ -67,22 +69,35 @@ private final Map<String, Object> eosV2ProducerConfigs; private final KafkaClientSupplier clientSupplier; private final StreamThread.ProcessingMode processingMode; + private final Time time; private Producer<byte[], byte[]> producer; private boolean transactionInFlight = false; private boolean transactionInitialized = false; + private double oldProducerTotalBlockedTime = 0; public StreamsProducer(final StreamsConfig config, final String threadId, final KafkaClientSupplier clientSupplier, final TaskId taskId, final UUID processId, final LogContext logContext) { + this(config, threadId, clientSupplier, taskId, processId, logContext, Time.SYSTEM); + } Review comment: That is not my point. My point is that the objects that call the constructor, i.e. tasks and threads, have a time object that they use for the their metrics (and probably for other purposes). Now that we also have metrics in the `StreamsProducer` that needs a time object, it is inconsistent to create a new time object in the constructor instead of passing along the time object from tasks and threads into the `StreamProducer`. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ########## @@ -1493,6 +1494,7 @@ public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, fin "committing offsets " + offsets); } } finally { + kafkaConsumerMetrics.recordCommitSync(time.nanoseconds() - commitStart); Review comment: Could you please add unit tests for this change? ########## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetrics.java ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.producer.internals; + +import java.util.Map; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.CumulativeSum; + +public class KafkaProducerMetrics implements AutoCloseable { Review comment: Could you please add unit tests for this class? ########## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetrics.java ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.producer.internals; + +import java.util.Map; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.CumulativeSum; + +public class KafkaProducerMetrics implements AutoCloseable { + + public static final String GROUP = "producer-metrics"; + private static final String FLUSH = "flush"; + private static final String TXN_INIT = "txn-init"; + private static final String TXN_BEGIN = "txn-begin"; + private static final String TXN_SEND_OFFSETS = "txn-send-offsets"; + private static final String TXN_COMMIT = "txn-commit"; + private static final String TXN_ABORT = "txn-abort"; + private static final String TOTAL_TIME_SUFFIX = "-time-total"; + + final Map<String, String> tags; + final Metrics metrics; + final Sensor initTimeSensor; + final Sensor beginTxnTimeSensor; + final Sensor flushTimeSensor; + final Sensor sendOffsetsSensor; + final Sensor commitTxnSensor; + final Sensor abortTxnSensor; Review comment: Could you please specify this member fields as `private`? ########## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetrics.java ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.producer.internals; + +import java.util.Map; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.CumulativeSum; + +public class KafkaProducerMetrics implements AutoCloseable { + + public static final String GROUP = "producer-metrics"; + private static final String FLUSH = "flush"; + private static final String TXN_INIT = "txn-init"; + private static final String TXN_BEGIN = "txn-begin"; + private static final String TXN_SEND_OFFSETS = "txn-send-offsets"; + private static final String TXN_COMMIT = "txn-commit"; + private static final String TXN_ABORT = "txn-abort"; + private static final String TOTAL_TIME_SUFFIX = "-time-total"; + + final Map<String, String> tags; + final Metrics metrics; + final Sensor initTimeSensor; + final Sensor beginTxnTimeSensor; + final Sensor flushTimeSensor; + final Sensor sendOffsetsSensor; + final Sensor commitTxnSensor; + final Sensor abortTxnSensor; + + public KafkaProducerMetrics(Metrics metrics) { + this.metrics = metrics; + this.tags = this.metrics.config().tags(); + this.flushTimeSensor = newLatencySensor(FLUSH); + this.initTimeSensor = newLatencySensor(TXN_INIT); + this.beginTxnTimeSensor = newLatencySensor(TXN_BEGIN); + this.sendOffsetsSensor = newLatencySensor(TXN_SEND_OFFSETS); + this.commitTxnSensor = newLatencySensor(TXN_COMMIT); + this.abortTxnSensor = newLatencySensor(TXN_ABORT); Review comment: ```suggestion this.tags = this.metrics.config().tags(); this.flushTimeSensor = newLatencySensor(FLUSH); initTimeSensor = newLatencySensor(TXN_INIT); beginTxnTimeSensor = newLatencySensor(TXN_BEGIN); sendOffsetsSensor = newLatencySensor(TXN_SEND_OFFSETS); commitTxnSensor = newLatencySensor(TXN_COMMIT); abortTxnSensor = newLatencySensor(TXN_ABORT); ``` ########## File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ########## @@ -1124,12 +1137,16 @@ private void ensureValidRecordSize(int size) { @Override public void flush() { log.trace("Flushing accumulated records in producer."); + this.accumulator.beginFlush(); this.sender.wakeup(); Review comment: Why are those lines not included in the measurement? They do contribute to the flush time, right? ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetrics.java ########## @@ -63,6 +66,18 @@ public KafkaConsumerMetrics(Metrics metrics, String metricGrpPrefix) { metricGroupName, "The average fraction of time the consumer's poll() is idle as opposed to waiting for the user code to process records."), new Avg()); + + this.commitSyncSensor = metrics.sensor("commit-sync-time-total"); + this.commitSyncSensor.add( + metrics.metricName("commit-sync-time-total", metricGroupName), + new CumulativeSum() + ); + + this.committedSensor = metrics.sensor("committed-time-total"); + this.committedSensor.add( + metrics.metricName("committed-time-total", metricGroupName), + new CumulativeSum() + ); Review comment: I think you forgot to remove the sensors in `close()`. I know that there do not exist unit tests for this class, but maybe you should add them. Maybe in a separate PR to not make this PR larger as needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org