[ https://issues.apache.org/jira/browse/KAFKA-6986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16528191#comment-16528191 ]
ASF GitHub Bot commented on KAFKA-6986: --------------------------------------- guozhangwang closed pull request #5210: KAFKA-6986: Export Admin Client metrics through Stream Threads URL: https://github.com/apache/kafka/pull/5210 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java index 0171b617e7c..75c93b63228 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java @@ -17,6 +17,8 @@ package org.apache.kafka.clients.admin; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionReplica; import org.apache.kafka.common.acl.AclBinding; @@ -768,4 +770,11 @@ public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId) { public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds) { return deleteConsumerGroups(groupIds, new DeleteConsumerGroupsOptions()); } + + /** + * Get the metrics kept by the adminClient + * + * @return + */ + public abstract Map<MetricName, ? extends Metric> metrics(); } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 62b6b6ee752..f4a2faea3af 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -35,6 +35,8 @@ import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; @@ -2739,4 +2741,9 @@ void handleFailure(Throwable throwable) { return new DeleteConsumerGroupsResult(new HashMap<String, KafkaFuture<Void>>(futures)); } + + @Override + public Map<MetricName, ? extends Metric> metrics() { + return Collections.unmodifiableMap(this.metrics.metrics()); + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index 51750720240..b5131aea0ea 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -17,6 +17,8 @@ package org.apache.kafka.clients.admin; import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; @@ -47,6 +49,8 @@ private Node controller; private int timeoutNextRequests = 0; + private Map<MetricName, Metric> mockMetrics = new HashMap<>(); + /** * Creates MockAdminClient for a cluster with the given brokers. The Kafka cluster ID uses the default value from * DEFAULT_CLUSTER_ID. @@ -390,4 +394,13 @@ public void close(long duration, TimeUnit unit) {} this.configs = configs != null ? configs : Collections.<String, String>emptyMap(); } } + + public void setMockMetrics(MetricName name, Metric metric) { + mockMetrics.put(name, metric); + } + + @Override + public Map<MetricName, ? extends Metric> metrics() { + return mockMetrics; + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 6a707ff986d..cef8116e880 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -382,12 +382,12 @@ public void setGlobalStateRestoreListener(final StateRestoreListener globalState * * @return Map of all metrics. */ - // TODO: we can add metrics for admin client as well public Map<MetricName, ? extends Metric> metrics() { final Map<MetricName, Metric> result = new LinkedHashMap<>(); for (final StreamThread thread : threads) { result.putAll(thread.producerMetrics()); result.putAll(thread.consumerMetrics()); + result.putAll(thread.adminClientMetrics()); } if (globalStreamThread != null) result.putAll(globalStreamThread.consumerMetrics()); result.putAll(metrics.metrics()); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index a159e7b6c7a..fbf7fb60e80 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -1271,4 +1271,11 @@ TaskManager taskManager() { result.putAll(restoreConsumerMetrics); return result; } + + public Map<MetricName, Metric> adminClientMetrics() { + final Map<MetricName, ? extends Metric> adminClientMetrics = taskManager.getAdminClient().metrics(); + final LinkedHashMap<MetricName, Metric> result = new LinkedHashMap<>(); + result.putAll(adminClientMetrics); + return result; + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 44db70d8b60..9da27020c5f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -278,6 +278,10 @@ void shutdown(final boolean clean) { } } + AdminClient getAdminClient() { + return adminClient; + } + Set<TaskId> suspendedActiveTaskIds() { return active.previousTaskIds(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 513d1c01b05..2ccc89348fb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.clients.admin.MockAdminClient; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -72,6 +73,7 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -1303,4 +1305,53 @@ public double measure(MetricConfig config, long now) { Map<MetricName, Metric> producerMetrics = thread.producerMetrics(); assertEquals(testMetricName, producerMetrics.get(testMetricName).metricName()); } + + @Test + public void adminClientMetricsVerification() { + final Node broker1 = new Node(0, "dummyHost-1", 1234); + final Node broker2 = new Node(1, "dummyHost-2", 1234); + List<Node> cluster = Arrays.asList(broker1, broker2); + + final MockAdminClient adminClient = new MockAdminClient(cluster, broker1, null); + + final MockProducer<byte[], byte[]> producer = new MockProducer<>(); + final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class); + final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); + + final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics, ""); + final StreamThread thread = new StreamThread( + mockTime, + config, + producer, + consumer, + consumer, + null, + taskManager, + streamsMetrics, + internalTopologyBuilder, + clientId, + new LogContext(""), + new AtomicBoolean()); + final MetricName testMetricName = new MetricName("test_metric", "", "", new HashMap<String, String>()); + final Metric testMetric = new KafkaMetric( + new Object(), + testMetricName, + new Measurable() { + @Override + public double measure(MetricConfig config, long now) { + return 0; + } + }, + null, + new MockTime()); + + + EasyMock.expect(taskManager.getAdminClient()).andReturn(adminClient); + EasyMock.expectLastCall(); + EasyMock.replay(taskManager, consumer); + + adminClient.setMockMetrics(testMetricName, testMetric); + Map<MetricName, Metric> adminClientMetrics = thread.adminClientMetrics(); + assertEquals(testMetricName, adminClientMetrics.get(testMetricName).metricName()); + } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Export Admin Client metrics through Stream Threads > -------------------------------------------------- > > Key: KAFKA-6986 > URL: https://issues.apache.org/jira/browse/KAFKA-6986 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Boyang Chen > Assignee: Shun Guan > Priority: Minor > Labels: Newcomer, beginner, newbie > Fix For: 2.1.0 > > > We already exported producer and consumer metrics through KafkaStreams class: > [https://github.com/apache/kafka/pull/4998] > It makes sense to also export the Admin client metrics. > If any new contributor wishes to take over this one, please let me know. I > will revisit and close this ticket in one or two months later in case no one > claims it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)