guozhangwang commented on a change in pull request #11149: URL: https://github.com/apache/kafka/pull/11149#discussion_r688142016
########## File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ########## @@ -1369,4 +1385,67 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { this.userCallback.onCompletion(metadata, exception); } } + + private static class KafkaProducerMetrics { + private static final String FLUSH = "flush"; + private static final String TXN_INIT = "txn-init"; + private static final String TXN_BEGIN = "txn-begin"; + private static final String TXN_SEND_OFFSETS = "txn-send-offsets"; + private static final String TXN_COMMIT = "txn-commit"; + private static final String TXN_ABORT = "txn-abort"; + private static final String TOTAL_TIME_SUFFIX = "-time-total"; + + final Map<String, String> tags; + final Metrics metrics; + final Sensor initTimeSensor; + final Sensor beginTimeSensor; + final Sensor flushTimeSensor; + final Sensor sendOffsetsSensor; + final Sensor commitSensor; + final Sensor abortSensor; + + private KafkaProducerMetrics(Metrics metrics) { + this.metrics = metrics; + this.tags = this.metrics.config().tags(); + this.flushTimeSensor = newLatencySensor(FLUSH); + this.initTimeSensor = newLatencySensor(TXN_INIT); + this.beginTimeSensor = newLatencySensor(TXN_BEGIN); + this.sendOffsetsSensor = newLatencySensor(TXN_SEND_OFFSETS); + this.commitSensor = newLatencySensor(TXN_COMMIT); + this.abortSensor = newLatencySensor(TXN_ABORT); + } + + private Sensor newLatencySensor(String name) { + Sensor sensor = metrics.sensor(name + TOTAL_TIME_SUFFIX); + sensor.add( + metrics.metricName(name + TOTAL_TIME_SUFFIX, ProducerMetrics.GROUP, tags), + new CumulativeSum() + ); + return sensor; + } + + private void recordFlush(long duration) { + flushTimeSensor.record(duration); + } + + private void recordInit(long duration) { + initTimeSensor.record(duration); + } + + private void recordBegin(long duration) { Review comment: nit: better leave the full name as recordBeginTxn/AbortTxn/CommitTxn. ########## File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ########## @@ -1369,4 +1385,67 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { this.userCallback.onCompletion(metadata, exception); } } + + private static class KafkaProducerMetrics { + private static final String FLUSH = "flush"; + private static final String TXN_INIT = "txn-init"; + private static final String TXN_BEGIN = "txn-begin"; + private static final String TXN_SEND_OFFSETS = "txn-send-offsets"; + private static final String TXN_COMMIT = "txn-commit"; + private static final String TXN_ABORT = "txn-abort"; + private static final String TOTAL_TIME_SUFFIX = "-time-total"; + + final Map<String, String> tags; + final Metrics metrics; + final Sensor initTimeSensor; + final Sensor beginTimeSensor; Review comment: Ditto here; better rename it to `beginTxn` / `commitTxn` / `abortTxn`? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java ########## @@ -178,12 +180,39 @@ public void resetProducer() { throw new IllegalStateException("Expected eos-v2 to be enabled, but the processing mode was " + processingMode); } + final long start = Time.SYSTEM.nanoseconds(); producer.close(); + final long closeTime = Time.SYSTEM.nanoseconds() - start; + + oldProducerTotalBlockedTime += closeTime + totalBlockedTime(producer); producer = clientSupplier.getProducer(eosV2ProducerConfigs); transactionInitialized = false; } + private static double getMetricValue(final Map<MetricName, ? extends Metric> metrics, + final String name) { + return metrics.keySet().stream() + .filter(n -> n.name().equals(name)) + .findFirst() Review comment: Maybe worth checking there's only one element after the filtering? It should not be expected to have more than one right? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsThreadTotalBlockedTime.java ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor.internals; + +import java.util.Map; +import java.util.function.Supplier; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; + +public class StreamsThreadTotalBlockedTime { + final Consumer<?, ?> consumer; + final Consumer<?, ?> restoreConsumer; + final Supplier<Double> producerTotalBlockedTime; + + StreamsThreadTotalBlockedTime( + final Consumer<?, ?> consumer, + final Consumer<?, ?> restoreConsumer, + final Supplier<Double> producerTotalBlockedTime + ) { Review comment: very nit: in AK repo we usually do not have a new line for `)` but just have it at the same line of the last param. Ditto elsewhere. ########## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetrics.java ########## @@ -26,6 +26,7 @@ import org.apache.kafka.common.metrics.Metrics; public class ProducerMetrics { + public static final String GROUP = "producer-metrics"; Review comment: I think it's better be in `KafkaProducerMetrics` rather than here. ########## File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ########## @@ -590,9 +593,11 @@ else if (acks != -1) public void initTransactions() { throwIfNoTransactionManager(); throwIfProducerClosed(); + long now = time.nanoseconds(); Review comment: Do we want to measure them in millis or nanos? For most latency measures we are currently in ms, and if we do measure in ns we usually name the metric as e.g. "xyz-total-ns". ########## File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java ########## @@ -868,6 +904,36 @@ public void testAbortTransaction() { } } + @Test + public void testMeasureAbortTransactionDuration() { + Map<String, Object> configs = new HashMap<>(); + configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id"); + configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); + Time time = new MockTime(1); + MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1)); + ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE); + MockClient client = new MockClient(time, metadata); + client.updateMetadata(initialUpdateResponse); + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1)); + client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE)); + + try (KafkaProducer<String, String> producer = kafkaProducer(configs, new StringSerializer(), + new StringSerializer(), metadata, client, null, time)) { + producer.initTransactions(); + + client.prepareResponse(endTxnResponse(Errors.NONE)); + producer.beginTransaction(); + producer.abortTransaction(); + double first = getMetricValue(producer, "txn-abort-time-total"); + assertTrue(first > 999999.0); Review comment: Ditto here and elsewhere. ########## File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java ########## @@ -730,6 +732,41 @@ public void testFlushCompleteSendOfInflightBatches() { } } + private static Double getMetricValue(final KafkaProducer<?, ?> producer, final String name) { + Metrics metrics = producer.metrics; + Metric metric = metrics.metric(metrics.metricName(name, "producer-metrics")); + return (Double) metric.metricValue(); + } + + @Test + public void testFlushMeasureLatency() { + Map<String, Object> configs = new HashMap<>(); + configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); + + Time time = new MockTime(1); + MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1)); + ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE); + + MockClient client = new MockClient(time, metadata); + client.updateMetadata(initialUpdateResponse); + + try (KafkaProducer<String, String> producer = kafkaProducer( + configs, + new StringSerializer(), + new StringSerializer(), + metadata, + client, + null, + time + )) { + producer.flush(); + double first = getMetricValue(producer, "flush-time-total"); + assertTrue(first > 999999.0); Review comment: Why we want to assert this latency larger than `999999.0` nano seconds? Could this result in flakiness if it is time dependent? If all we want to measure is a not-null value, then just asserting that is > 0 is sufficient? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java ########## @@ -178,12 +180,39 @@ public void resetProducer() { throw new IllegalStateException("Expected eos-v2 to be enabled, but the processing mode was " + processingMode); } + final long start = Time.SYSTEM.nanoseconds(); Review comment: Ditto here: nano seconds seems unnecessary? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsThreadTotalBlockedTime.java ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor.internals; + +import java.util.Map; +import java.util.function.Supplier; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; + +public class StreamsThreadTotalBlockedTime { + final Consumer<?, ?> consumer; + final Consumer<?, ?> restoreConsumer; + final Supplier<Double> producerTotalBlockedTime; + + StreamsThreadTotalBlockedTime( + final Consumer<?, ?> consumer, + final Consumer<?, ?> restoreConsumer, + final Supplier<Double> producerTotalBlockedTime + ) { + this.consumer = consumer; + this.restoreConsumer = restoreConsumer; + this.producerTotalBlockedTime = producerTotalBlockedTime; + } + + final double getMetricValue( Review comment: What about consolidating this function and the other in `StreamsProducer` as a static in `StreamsMetricsImpl#getMetricValue`, and we can also use this in unit test e.g. https://github.com/apache/kafka/pull/11149/files#diff-599de0f96fbd5ba6b3d919881426269fc72fe8bbe8e2436fab87d9abe84e8dbaR735 as well. ########## File path: gradle.properties ########## @@ -20,7 +20,7 @@ group=org.apache.kafka # - tests/kafkatest/__init__.py # - tests/kafkatest/version.py (variable DEV_VERSION) # - kafka-merge-pr.py -version=3.1.0-SNAPSHOT +version=ROHAN-SNAPSHOT Review comment: :) ########## File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ########## @@ -590,9 +593,11 @@ else if (acks != -1) public void initTransactions() { throwIfNoTransactionManager(); throwIfProducerClosed(); + long now = time.nanoseconds(); Review comment: Personally I think we can just measure in ms since it is cumulative sum as `total` anyways. ########## File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ########## @@ -1369,4 +1385,67 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { this.userCallback.onCompletion(metadata, exception); } } + + private static class KafkaProducerMetrics { Review comment: What about moving it to `org.apache.kafka.clients.producer.internals` as a separate class? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ########## @@ -508,6 +508,21 @@ public StreamThread(final Time time, ThreadMetrics.createTaskSensor(threadId, streamsMetrics); ThreadMetrics.closeTaskSensor(threadId, streamsMetrics); + ThreadMetrics.addThreadStartTimeMetric( + threadId, + streamsMetrics, + time.nanoseconds() Review comment: I think milli seconds would be sufficient? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java ########## @@ -200,6 +201,30 @@ public RocksDBMetricsRecordingTrigger rocksDBMetricsRecordingTrigger() { } } + public <T> void addThreadLevelImmutableMetric(final String name, + final String description, + final String threadId, + final T value) { + final MetricName metricName = metrics.metricName( + name, THREAD_LEVEL_GROUP, description, threadLevelTagMap(threadId)); + synchronized (threadLevelMetrics) { Review comment: Why we want to not prefix the thread id but directly use it as the key in the `threadLevelMetrics` map? Ditto below. ########## File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ########## @@ -1369,4 +1385,67 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { this.userCallback.onCompletion(metadata, exception); } } + + private static class KafkaProducerMetrics { Review comment: Also it seems we do not have logic to de-register the metrics from `metrics` registry when closing the producer? Maybe we can follow the same as consumer to declare it as `implements AutoCloseable` and then call its `close` when shutting down the producer. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java ########## @@ -178,12 +180,39 @@ public void resetProducer() { throw new IllegalStateException("Expected eos-v2 to be enabled, but the processing mode was " + processingMode); } + final long start = Time.SYSTEM.nanoseconds(); Review comment: Also just to follow my other comment, if we de-register all the metrics upon producer closure then we'd better read it out before closing and starting a new producer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org