Repository: kafka Updated Branches: refs/heads/trunk cd59976ee -> 2fb5664bf
KAFKA-5597: Autogenerate producer sender metrics Subtask of https://issues.apache.org/jira/browse/KAFKA-3480 The changes are very similar to what was done for the consumer in https://issues.apache.org/jira/browse/KAFKA-5191 (pull request https://github.com/apache/kafka/pull/2993) Author: James Cheng <jylch...@yahoo.com> Reviewers: Ismael Juma <ism...@juma.me.uk>, Rajini Sivaram <rajinisiva...@googlemail.com>, Guozhang Wang <wangg...@gmail.com> Closes #3535 from wushujames/producer_sender_metrics_docs Fix one minor naming bug Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2fb5664b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2fb5664b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2fb5664b Branch: refs/heads/trunk Commit: 2fb5664bf4591f3e7bdc02894b9de392bf72913c Parents: cd59976 Author: James Cheng <jylch...@yahoo.com> Authored: Tue Sep 5 17:36:53 2017 -0700 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Tue Sep 5 17:38:58 2017 -0700 ---------------------------------------------------------------------- build.gradle | 9 +- checkstyle/suppressions.xml | 2 + .../kafka/clients/producer/KafkaProducer.java | 5 +- .../producer/internals/ProducerMetrics.java | 48 +++++++ .../clients/producer/internals/Sender.java | 66 +++++----- .../internals/SenderMetricsRegistry.java | 125 +++++++++++++++++++ .../clients/producer/internals/SenderTest.java | 29 +++-- .../internals/TransactionManagerTest.java | 2 +- docs/ops.html | 114 +---------------- 9 files changed, 240 insertions(+), 160 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/2fb5664b/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index c56fe82..62e9f08 100644 --- a/build.gradle +++ b/build.gradle @@ -666,11 +666,18 @@ project(':core') { standardOutput = new File(generatedDocsDir, "consumer_metrics.html").newOutputStream() } + task genProducerMetricsDocs(type: JavaExec) { + classpath = sourceSets.test.runtimeClasspath + main = 'org.apache.kafka.clients.producer.internals.ProducerMetrics' + if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() } + standardOutput = new File(generatedDocsDir, "producer_metrics.html").newOutputStream() + } + task siteDocsTar(dependsOn: ['genProtocolErrorDocs', 'genProtocolApiKeyDocs', 'genProtocolMessageDocs', 'genAdminClientConfigDocs', 'genProducerConfigDocs', 'genConsumerConfigDocs', 'genKafkaConfigDocs', 'genTopicConfigDocs', ':connect:runtime:genConnectConfigDocs', ':connect:runtime:genConnectTransformationDocs', - ':streams:genStreamsConfigDocs', 'genConsumerMetricsDocs'], type: Tar) { + ':streams:genStreamsConfigDocs', 'genConsumerMetricsDocs', 'genProducerMetricsDocs'], type: Tar) { classifier = 'site-docs' compression = Compression.GZIP from project.file("$rootDir/docs") http://git-wip-us.apache.org/repos/asf/kafka/blob/2fb5664b/checkstyle/suppressions.xml ---------------------------------------------------------------------- diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 990e366..027d07f 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -32,6 +32,8 @@ <suppress checks="ParameterNumber" files="Fetcher.java"/> <suppress checks="ParameterNumber" + files="Sender.java"/> + <suppress checks="ParameterNumber" files="ConfigDef.java"/> <suppress checks="ParameterNumber" files="DefaultRecordBatch.java"/> http://git-wip-us.apache.org/repos/asf/kafka/blob/2fb5664b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 3fa007a..8766107 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.internals.ProducerInterceptors; +import org.apache.kafka.clients.producer.internals.ProducerMetrics; import org.apache.kafka.clients.producer.internals.RecordAccumulator; import org.apache.kafka.clients.producer.internals.Sender; import org.apache.kafka.clients.producer.internals.TransactionManager; @@ -328,6 +329,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { MetricsReporter.class); reporters.add(new JmxReporter(JMX_PREFIX)); this.metrics = new Metrics(metricConfig, reporters, time); + ProducerMetrics metricsRegistry = new ProducerMetrics(metricTags.keySet(), "producer"); this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class); long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG); if (keySerializer == null) { @@ -380,7 +382,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds()); ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config); - Sensor throttleTimeSensor = Sender.throttleTimeSensor(metrics); + Sensor throttleTimeSensor = Sender.throttleTimeSensor(metrics, metricsRegistry.senderMetrics); NetworkClient client = new NetworkClient( new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder), @@ -405,6 +407,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { acks, retries, this.metrics, + metricsRegistry.senderMetrics, Time.SYSTEM, this.requestTimeoutMs, config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), http://git-wip-us.apache.org/repos/asf/kafka/blob/2fb5664b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetrics.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetrics.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetrics.java new file mode 100644 index 0000000..6b8487e --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetrics.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer.internals; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.metrics.Metrics; + +public class ProducerMetrics { + + public SenderMetricsRegistry senderMetrics; + + public ProducerMetrics(Set<String> tags, String metricGrpPrefix) { + this.senderMetrics = new SenderMetricsRegistry(tags); + } + + private List<MetricNameTemplate> getAllTemplates() { + List<MetricNameTemplate> l = new ArrayList<>(); + l.addAll(this.senderMetrics.getAllTemplates()); + return l; + } + + public static void main(String[] args) { + Set<String> tags = new HashSet<>(); + tags.add("client-id"); + ProducerMetrics metrics = new ProducerMetrics(tags, "producer"); + System.out.println(Metrics.toHtmlTable("kafka.producer", metrics.getAllTemplates())); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/2fb5664b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 9c3b4d2..bf3714e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -129,6 +129,7 @@ public class Sender implements Runnable { short acks, int retries, Metrics metrics, + SenderMetricsRegistry metricsRegistry, Time time, int requestTimeout, long retryBackoffMs, @@ -144,7 +145,7 @@ public class Sender implements Runnable { this.acks = acks; this.retries = retries; this.time = time; - this.sensors = new SenderMetrics(metrics); + this.sensors = new SenderMetrics(metrics, metricsRegistry); this.requestTimeout = requestTimeout; this.retryBackoffMs = retryBackoffMs; this.apiVersions = apiVersions; @@ -676,13 +677,12 @@ public class Sender implements Runnable { this.client.wakeup(); } - public static Sensor throttleTimeSensor(Metrics metrics) { - String metricGrpName = SenderMetrics.METRIC_GROUP_NAME; + public static Sensor throttleTimeSensor(Metrics metrics, SenderMetricsRegistry metricsRegistry) { Sensor produceThrottleTimeSensor = metrics.sensor("produce-throttle-time"); - produceThrottleTimeSensor.add(metrics.metricName("produce-throttle-time-avg", - metricGrpName, "The average throttle time in ms"), new Avg()); - produceThrottleTimeSensor.add(metrics.metricName("produce-throttle-time-max", - metricGrpName, "The maximum throttle time in ms"), new Max()); + MetricName m = metrics.metricInstance(metricsRegistry.produceThrottleTimeAvg); + produceThrottleTimeSensor.add(m, new Avg()); + m = metrics.metricInstance(metricsRegistry.produceThrottleTimeMax); + produceThrottleTimeSensor.add(m, new Max()); return produceThrottleTimeSensor; } @@ -690,8 +690,6 @@ public class Sender implements Runnable { * A collection of sensors for the sender */ private class SenderMetrics { - - private static final String METRIC_GROUP_NAME = "producer-metrics"; private final Metrics metrics; public final Sensor retrySensor; public final Sensor errorSensor; @@ -702,60 +700,61 @@ public class Sender implements Runnable { public final Sensor compressionRateSensor; public final Sensor maxRecordSizeSensor; public final Sensor batchSplitSensor; + private SenderMetricsRegistry metricsRegistry; - public SenderMetrics(Metrics metrics) { + public SenderMetrics(Metrics metrics, SenderMetricsRegistry metricsRegistry) { this.metrics = metrics; - String metricGrpName = METRIC_GROUP_NAME; + this.metricsRegistry = metricsRegistry; this.batchSizeSensor = metrics.sensor("batch-size"); - MetricName m = metrics.metricName("batch-size-avg", metricGrpName, "The average number of bytes sent per partition per-request."); + MetricName m = metrics.metricInstance(metricsRegistry.batchSizeAvg); this.batchSizeSensor.add(m, new Avg()); - m = metrics.metricName("batch-size-max", metricGrpName, "The max number of bytes sent per partition per-request."); + m = metrics.metricInstance(metricsRegistry.batchSizeMax); this.batchSizeSensor.add(m, new Max()); this.compressionRateSensor = metrics.sensor("compression-rate"); - m = metrics.metricName("compression-rate-avg", metricGrpName, "The average compression rate of record batches."); + m = metrics.metricInstance(metricsRegistry.compressionRateAvg); this.compressionRateSensor.add(m, new Avg()); this.queueTimeSensor = metrics.sensor("queue-time"); - m = metrics.metricName("record-queue-time-avg", metricGrpName, "The average time in ms record batches spent in the record accumulator."); + m = metrics.metricInstance(metricsRegistry.recordQueueTimeAvg); this.queueTimeSensor.add(m, new Avg()); - m = metrics.metricName("record-queue-time-max", metricGrpName, "The maximum time in ms record batches spent in the record accumulator."); + m = metrics.metricInstance(metricsRegistry.recordQueueTimeMax); this.queueTimeSensor.add(m, new Max()); this.requestTimeSensor = metrics.sensor("request-time"); - m = metrics.metricName("request-latency-avg", metricGrpName, "The average request latency in ms"); + m = metrics.metricInstance(metricsRegistry.requestLatencyAvg); this.requestTimeSensor.add(m, new Avg()); - m = metrics.metricName("request-latency-max", metricGrpName, "The maximum request latency in ms"); + m = metrics.metricInstance(metricsRegistry.requestLatencyMax); this.requestTimeSensor.add(m, new Max()); this.recordsPerRequestSensor = metrics.sensor("records-per-request"); - m = metrics.metricName("record-send-rate", metricGrpName, "The average number of records sent per second."); + m = metrics.metricInstance(metricsRegistry.recordSendRate); this.recordsPerRequestSensor.add(m, new Rate()); - m = metrics.metricName("records-per-request-avg", metricGrpName, "The average number of records per request."); + m = metrics.metricInstance(metricsRegistry.recordsPerRequestAvg); this.recordsPerRequestSensor.add(m, new Avg()); this.retrySensor = metrics.sensor("record-retries"); - m = metrics.metricName("record-retry-rate", metricGrpName, "The average per-second number of retried record sends"); + m = metrics.metricInstance(metricsRegistry.recordRetryRate); this.retrySensor.add(m, new Rate()); this.errorSensor = metrics.sensor("errors"); - m = metrics.metricName("record-error-rate", metricGrpName, "The average per-second number of record sends that resulted in errors"); + m = metrics.metricInstance(metricsRegistry.recordErrorRate); this.errorSensor.add(m, new Rate()); - this.maxRecordSizeSensor = metrics.sensor("record-size-max"); - m = metrics.metricName("record-size-max", metricGrpName, "The maximum record size"); + this.maxRecordSizeSensor = metrics.sensor("record-size"); + m = metrics.metricInstance(metricsRegistry.recordSizeMax); this.maxRecordSizeSensor.add(m, new Max()); - m = metrics.metricName("record-size-avg", metricGrpName, "The average record size"); + m = metrics.metricInstance(metricsRegistry.recordSizeAvg); this.maxRecordSizeSensor.add(m, new Avg()); - m = metrics.metricName("requests-in-flight", metricGrpName, "The current number of in-flight requests awaiting a response."); + m = metrics.metricInstance(metricsRegistry.requestsInFlight); this.metrics.addMetric(m, new Measurable() { public double measure(MetricConfig config, long now) { return client.inFlightRequestCount(); } }); - m = metrics.metricName("metadata-age", metricGrpName, "The age in seconds of the current producer metadata being used."); + m = metrics.metricInstance(metricsRegistry.metadataAge); metrics.addMetric(m, new Measurable() { public double measure(MetricConfig config, long now) { return (now - metadata.lastSuccessfulUpdate()) / 1000.0; @@ -763,7 +762,7 @@ public class Sender implements Runnable { }); this.batchSplitSensor = metrics.sensor("batch-split-rate"); - m = metrics.metricName("batch-split-rate", metricGrpName, "The rate of record batch split"); + m = metrics.metricInstance(metricsRegistry.batchSplitRate); this.batchSplitSensor.add(m, new Rate()); } @@ -774,30 +773,29 @@ public class Sender implements Runnable { Sensor topicRecordCount = this.metrics.getSensor(topicRecordsCountName); if (topicRecordCount == null) { Map<String, String> metricTags = Collections.singletonMap("topic", topic); - String metricGrpName = "producer-topic-metrics"; topicRecordCount = this.metrics.sensor(topicRecordsCountName); - MetricName m = this.metrics.metricName("record-send-rate", metricGrpName, metricTags); + MetricName m = this.metrics.metricInstance(metricsRegistry.topicRecordSendRate, metricTags); topicRecordCount.add(m, new Rate()); String topicByteRateName = "topic." + topic + ".bytes"; Sensor topicByteRate = this.metrics.sensor(topicByteRateName); - m = this.metrics.metricName("byte-rate", metricGrpName, metricTags); + m = this.metrics.metricInstance(metricsRegistry.topicByteRate, metricTags); topicByteRate.add(m, new Rate()); String topicCompressionRateName = "topic." + topic + ".compression-rate"; Sensor topicCompressionRate = this.metrics.sensor(topicCompressionRateName); - m = this.metrics.metricName("compression-rate", metricGrpName, metricTags); + m = this.metrics.metricInstance(metricsRegistry.topicCompressionRate, metricTags); topicCompressionRate.add(m, new Avg()); String topicRetryName = "topic." + topic + ".record-retries"; Sensor topicRetrySensor = this.metrics.sensor(topicRetryName); - m = this.metrics.metricName("record-retry-rate", metricGrpName, metricTags); + m = this.metrics.metricInstance(metricsRegistry.topicRecordRetryRate, metricTags); topicRetrySensor.add(m, new Rate()); String topicErrorName = "topic." + topic + ".record-errors"; Sensor topicErrorSensor = this.metrics.sensor(topicErrorName); - m = this.metrics.metricName("record-error-rate", metricGrpName, metricTags); + m = this.metrics.metricInstance(metricsRegistry.topicRecordErrorRate, metricTags); topicErrorSensor.add(m, new Rate()); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/2fb5664b/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java new file mode 100644 index 0000000..f29d319 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer.internals; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.kafka.common.MetricNameTemplate; + +public class SenderMetricsRegistry { + + final static String METRIC_GROUP_NAME = "producer-metrics"; + final static String TOPIC_METRIC_GROUP_NAME = "producer-topic-metrics"; + + public MetricNameTemplate batchSizeAvg; + public MetricNameTemplate batchSizeMax; + public MetricNameTemplate compressionRateAvg; + public MetricNameTemplate recordQueueTimeAvg; + public MetricNameTemplate recordQueueTimeMax; + public MetricNameTemplate requestLatencyAvg; + public MetricNameTemplate requestLatencyMax; + public MetricNameTemplate produceThrottleTimeAvg; + public MetricNameTemplate produceThrottleTimeMax; + public MetricNameTemplate recordSendRate; + public MetricNameTemplate recordsPerRequestAvg; + public MetricNameTemplate recordRetryRate; + public MetricNameTemplate recordErrorRate; + public MetricNameTemplate recordSizeMax; + public MetricNameTemplate recordSizeAvg; + public MetricNameTemplate requestsInFlight; + public MetricNameTemplate metadataAge; + public MetricNameTemplate topicRecordSendRate; + public MetricNameTemplate topicByteRate; + public MetricNameTemplate topicCompressionRate; + public MetricNameTemplate topicRecordRetryRate; + public MetricNameTemplate topicRecordErrorRate; + public MetricNameTemplate batchSplitRate; + + public SenderMetricsRegistry() { + this(new HashSet<String>()); + } + + public SenderMetricsRegistry(Set<String> tags) { + + /* ***** Client level *****/ + + this.batchSizeAvg = new MetricNameTemplate("batch-size-avg", METRIC_GROUP_NAME, "The average number of bytes sent per partition per-request.", tags); + this.batchSizeMax = new MetricNameTemplate("batch-size-max", METRIC_GROUP_NAME, "The max number of bytes sent per partition per-request.", tags); + this.compressionRateAvg = new MetricNameTemplate("compression-rate-avg", METRIC_GROUP_NAME, "The average compression rate of record batches.", tags); + this.recordQueueTimeAvg = new MetricNameTemplate("record-queue-time-avg", METRIC_GROUP_NAME, "The average time in ms record batches spent in the send buffer.", tags); + this.recordQueueTimeMax = new MetricNameTemplate("record-queue-time-max", METRIC_GROUP_NAME, "The maximum time in ms record batches spent in the send buffer.", tags); + this.requestLatencyAvg = new MetricNameTemplate("request-latency-avg", METRIC_GROUP_NAME, "The average request latency in ms", tags); + this.requestLatencyMax = new MetricNameTemplate("request-latency-max", METRIC_GROUP_NAME, "The maximum request latency in ms", tags); + this.recordSendRate = new MetricNameTemplate("record-send-rate", METRIC_GROUP_NAME, "The average number of records sent per second.", tags); + this.recordsPerRequestAvg = new MetricNameTemplate("records-per-request-avg", METRIC_GROUP_NAME, "The average number of records per request.", tags); + this.recordRetryRate = new MetricNameTemplate("record-retry-rate", METRIC_GROUP_NAME, "The average per-second number of retried record sends", tags); + this.recordErrorRate = new MetricNameTemplate("record-error-rate", METRIC_GROUP_NAME, "The average per-second number of record sends that resulted in errors", tags); + this.recordSizeMax = new MetricNameTemplate("record-size-max", METRIC_GROUP_NAME, "The maximum record size", tags); + this.recordSizeAvg = new MetricNameTemplate("record-size-avg", METRIC_GROUP_NAME, "The average record size", tags); + this.requestsInFlight = new MetricNameTemplate("requests-in-flight", METRIC_GROUP_NAME, "The current number of in-flight requests awaiting a response.", tags); + this.metadataAge = new MetricNameTemplate("metadata-age", METRIC_GROUP_NAME, "The age in seconds of the current producer metadata being used.", tags); + this.batchSplitRate = new MetricNameTemplate("batch-split-rate", METRIC_GROUP_NAME, "The average number of batch splits per second", tags); + + this.produceThrottleTimeAvg = new MetricNameTemplate("produce-throttle-time-avg", METRIC_GROUP_NAME, "The average time in ms a request was throttled by a broker", tags); + this.produceThrottleTimeMax = new MetricNameTemplate("produce-throttle-time-max", METRIC_GROUP_NAME, "The maximum time in ms a request was throttled by a broker", tags); + + /* ***** Topic level *****/ + Set<String> topicTags = new HashSet<String>(tags); + topicTags.add("topic"); + + this.topicRecordSendRate = new MetricNameTemplate("record-send-rate", TOPIC_METRIC_GROUP_NAME, "The average number of records sent per second for a topic.", topicTags); + this.topicByteRate = new MetricNameTemplate("byte-rate", TOPIC_METRIC_GROUP_NAME, "The average number of bytes sent per second for a topic.", topicTags); + this.topicCompressionRate = new MetricNameTemplate("compression-rate", TOPIC_METRIC_GROUP_NAME, "The average compression rate of record batches for a topic.", topicTags); + this.topicRecordRetryRate = new MetricNameTemplate("record-retry-rate", TOPIC_METRIC_GROUP_NAME, "The average per-second number of retried record sends for a topic", topicTags); + this.topicRecordErrorRate = new MetricNameTemplate("record-error-rate", TOPIC_METRIC_GROUP_NAME, "The average per-second number of record sends that resulted in errors for a topic", topicTags); + + } + + public List<MetricNameTemplate> getAllTemplates() { + return Arrays.asList(this.batchSizeAvg, + this.batchSizeMax, + this.compressionRateAvg, + this.recordQueueTimeAvg, + this.recordQueueTimeMax, + this.requestLatencyAvg, + this.requestLatencyMax, + this.recordSendRate, + this.recordsPerRequestAvg, + this.recordRetryRate, + this.recordErrorRate, + this.recordSizeMax, + this.recordSizeAvg, + this.requestsInFlight, + this.metadataAge, + this.batchSplitRate, + + this.produceThrottleTimeAvg, + this.produceThrottleTimeMax, + + // per-topic metrics + this.topicRecordSendRate, + this.topicByteRate, + this.topicCompressionRate, + this.topicRecordRetryRate, + this.topicRecordErrorRate + ); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/2fb5664b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index e66cce0..096e7c1 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -88,7 +88,6 @@ public class SenderTest { private static final short ACKS_ALL = -1; private static final int MAX_RETRIES = 0; private static final String CLIENT_ID = "clientId"; - private static final String METRIC_GROUP = "producer-metrics"; private static final double EPS = 0.0001; private static final int MAX_BLOCK_TIMEOUT = 1000; private static final int REQUEST_TIMEOUT = 1000; @@ -104,6 +103,7 @@ public class SenderTest { private Metrics metrics = null; private RecordAccumulator accumulator = null; private Sender sender = null; + private SenderMetricsRegistry senderMetricsRegistry = null; private final LogContext loggerFactory = new LogContext(); @Before @@ -235,7 +235,7 @@ public class SenderTest { @Test public void testQuotaMetrics() throws Exception { MockSelector selector = new MockSelector(time); - Sensor throttleTimeSensor = Sender.throttleTimeSensor(metrics); + Sensor throttleTimeSensor = Sender.throttleTimeSensor(metrics, this.senderMetricsRegistry); Cluster cluster = TestUtils.singletonCluster("test", 1); Node node = cluster.nodes().get(0); NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, @@ -263,8 +263,8 @@ public class SenderTest { selector.clear(); } Map<MetricName, KafkaMetric> allMetrics = metrics.metrics(); - KafkaMetric avgMetric = allMetrics.get(metrics.metricName("produce-throttle-time-avg", METRIC_GROUP, "")); - KafkaMetric maxMetric = allMetrics.get(metrics.metricName("produce-throttle-time-max", METRIC_GROUP, "")); + KafkaMetric avgMetric = allMetrics.get(metrics.metricInstance(this.senderMetricsRegistry.produceThrottleTimeAvg)); + KafkaMetric maxMetric = allMetrics.get(metrics.metricInstance(this.senderMetricsRegistry.produceThrottleTimeMax)); // Throttle times are ApiVersions=400, Produce=(100, 200, 300) assertEquals(250, avgMetric.value(), EPS); assertEquals(400, maxMetric.value(), EPS); @@ -278,7 +278,7 @@ public class SenderTest { Metrics m = new Metrics(); try { Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL, - maxRetries, m, time, REQUEST_TIMEOUT, 50, null, apiVersions); + maxRetries, m, new SenderMetricsRegistry(), time, REQUEST_TIMEOUT, 50, null, apiVersions); // do a successful retry Future<RecordMetadata> future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; sender.run(time.milliseconds()); // connect @@ -325,7 +325,7 @@ public class SenderTest { Metrics m = new Metrics(); try { Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, - m, time, REQUEST_TIMEOUT, 50, null, apiVersions); + m, new SenderMetricsRegistry(), time, REQUEST_TIMEOUT, 50, null, apiVersions); // Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1 Cluster cluster1 = TestUtils.clusterWith(2, "test", 2); metadata.update(cluster1, Collections.<String>emptySet(), time.milliseconds()); @@ -577,7 +577,7 @@ public class SenderTest { int maxRetries = 10; Metrics m = new Metrics(); Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, - m, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions); + m, new SenderMetricsRegistry(), time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions); Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; client.prepareResponse(new MockClient.RequestMatcher() { @@ -617,8 +617,9 @@ public class SenderTest { int maxRetries = 10; Metrics m = new Metrics(); + SenderMetricsRegistry metricsRegistry = new SenderMetricsRegistry(); Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, - m, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions); + m, metricsRegistry, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions); Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; sender.run(time.milliseconds()); // connect. @@ -637,7 +638,7 @@ public class SenderTest { sender.run(time.milliseconds()); // nothing to do, since the pid has changed. We should check the metrics for errors. assertEquals("Expected requests to be aborted after pid change", 0, client.inFlightRequestCount()); - KafkaMetric recordErrors = m.metrics().get(m.metricName("record-error-rate", METRIC_GROUP, "")); + KafkaMetric recordErrors = m.metrics().get(m.metricInstance(metricsRegistry.recordErrorRate)); assertTrue("Expected non-zero value for record send errors", recordErrors.value() > 0); assertTrue(responseFuture.isDone()); @@ -655,7 +656,7 @@ public class SenderTest { int maxRetries = 10; Metrics m = new Metrics(); Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, - m, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions); + m, new SenderMetricsRegistry(), time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions); Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; sender.run(time.milliseconds()); // connect. @@ -706,8 +707,9 @@ public class SenderTest { try (Metrics m = new Metrics()) { accumulator = new RecordAccumulator(loggerFactory, batchSize, 1024 * 1024, CompressionType.GZIP, 0L, 0L, m, time, new ApiVersions(), txnManager); + SenderMetricsRegistry metricsRegistry = new SenderMetricsRegistry(); Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, - m, time, REQUEST_TIMEOUT, 1000L, txnManager, new ApiVersions()); + m, metricsRegistry, time, REQUEST_TIMEOUT, 1000L, txnManager, new ApiVersions()); // Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1 Cluster cluster1 = TestUtils.clusterWith(2, topic, 2); metadata.update(cluster1, Collections.<String>emptySet(), time.milliseconds()); @@ -769,7 +771,7 @@ public class SenderTest { assertTrue("There should be no batch in the accumulator", accumulator.batches().get(tp).isEmpty()); assertTrue("There should be a split", - m.metrics().get(m.metricName("batch-split-rate", "producer-metrics")).value() > 0); + m.metrics().get(m.metricInstance(metricsRegistry.batchSplitRate)).value() > 0); } } @@ -826,8 +828,9 @@ public class SenderTest { this.metrics = new Metrics(metricConfig, time); this.accumulator = new RecordAccumulator(loggerFactory, batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time, apiVersions, transactionManager); + this.senderMetricsRegistry = new SenderMetricsRegistry(metricTags.keySet()); this.sender = new Sender(loggerFactory, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, - MAX_RETRIES, this.metrics, this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions); + MAX_RETRIES, this.metrics, this.senderMetricsRegistry, this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions); this.metadata.update(this.cluster, Collections.<String>emptySet(), time.milliseconds()); } http://git-wip-us.apache.org/repos/asf/kafka/blob/2fb5664b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index 282d91b..1219b9c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -124,7 +124,7 @@ public class TransactionManagerTest { Metrics metrics = new Metrics(metricConfig, time); this.accumulator = new RecordAccumulator(logContext, batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time, apiVersions, transactionManager); this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, - MAX_RETRIES, metrics, this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions); + MAX_RETRIES, metrics, new SenderMetricsRegistry(metricTags.keySet()), this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions); this.metadata.update(this.cluster, Collections.<String>emptySet(), time.milliseconds()); client.setNode(brokerNode); } http://git-wip-us.apache.org/repos/asf/kafka/blob/2fb5664b/docs/ops.html ---------------------------------------------------------------------- diff --git a/docs/ops.html b/docs/ops.html index b34aba4..c55b768 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -1039,119 +1039,13 @@ <td>The fraction of time an appender waits for space allocation.</td> <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td> </tr> - <tr> - <td>batch-size-avg</td> - <td>The average number of bytes sent per partition per-request.</td> - <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td> - </tr> - <tr> - <td>batch-size-max</td> - <td>The max number of bytes sent per partition per-request.</td> - <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td> - </tr> - <tr> - <td>compression-rate-avg</td> - <td>The average compression rate of record batches.</td> - <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td> - </tr> - <tr> - <td>record-queue-time-avg</td> - <td>The average time in ms record batches spent in the record accumulator.</td> - <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td> - </tr> - <tr> - <td>record-queue-time-max</td> - <td>The maximum time in ms record batches spent in the record accumulator.</td> - <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td> - </tr> - <tr> - <td>request-latency-avg</td> - <td>The average request latency in ms.</td> - <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td> - </tr> - <tr> - <td>request-latency-max</td> - <td>The maximum request latency in ms.</td> - <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td> - </tr> - <tr> - <td>record-send-rate</td> - <td>The average number of records sent per second.</td> - <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td> - </tr> - <tr> - <td>records-per-request-avg</td> - <td>The average number of records per request.</td> - <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td> - </tr> - <tr> - <td>record-retry-rate</td> - <td>The average per-second number of retried record sends.</td> - <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td> - </tr> - <tr> - <td>record-error-rate</td> - <td>The average per-second number of record sends that resulted in errors.</td> - <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td> - </tr> - <tr> - <td>record-size-max</td> - <td>The maximum record size.</td> - <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td> - </tr> - <tr> - <td>record-size-avg</td> - <td>The average record size.</td> - <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td> - </tr> - <tr> - <td>requests-in-flight</td> - <td>The current number of in-flight requests awaiting a response.</td> - <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td> - </tr> - <tr> - <td>metadata-age</td> - <td>The age in seconds of the current producer metadata being used.</td> - <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td> - </tr> - <tr> - <td>produce-throttle-time-max</td> - <td>The maximum time in ms a request was throttled by a broker.</td> - <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td> - </tr> - <tr> - <td>produce-throttle-time-avg</td> - <td>The average time in ms a request was throttled by a broker.</td> - <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td> - </tr> - <tr> - <td>record-send-rate</td> - <td>The average number of records sent per second for a topic.</td> - <td>kafka.producer:type=producer-topic-metrics,client-id=([-.\w]+),topic=([-.\w]+)</td> - </tr> - <tr> - <td>byte-rate</td> - <td>The average number of bytes sent per second for a topic.</td> - <td>kafka.producer:type=producer-topic-metrics,client-id=([-.\w]+),topic=([-.\w]+)</td> - </tr> - <tr> - <td>compression-rate</td> - <td>The average compression rate of record batches for a topic.</td> - <td>kafka.producer:type=producer-topic-metrics,client-id=([-.\w]+),topic=([-.\w]+)</td> - </tr> - <tr> - <td>record-retry-rate</td> - <td>The average per-second number of retried record sends for a topic.</td> - <td>kafka.producer:type=producer-topic-metrics,client-id=([-.\w]+),topic=([-.\w]+)</td> - </tr> - <tr> - <td>record-error-rate</td> - <td>The average per-second number of record sends that resulted in errors for a topic.</td> - <td>kafka.producer:type=producer-topic-metrics,client-id=([-.\w]+),topic=([-.\w]+)</td> - </tr> </tbody></table> + <h5><a id="producer_sender_monitoring" href="#producer_sender_monitoring">Producer Sender Metrics</a></h5> + + <!--#include virtual="generated/producer_metrics.html" --> + <h4><a id="new_consumer_monitoring" href="#new_consumer_monitoring">New consumer monitoring</a></h4>