[ 
https://issues.apache.org/jira/browse/KAFKA-6986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16528191#comment-16528191
 ] 

ASF GitHub Bot commented on KAFKA-6986:
---------------------------------------

guozhangwang closed pull request #5210: KAFKA-6986: Export Admin Client metrics 
through Stream Threads
URL: https://github.com/apache/kafka/pull/5210
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
index 0171b617e7c..75c93b63228 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
@@ -17,6 +17,8 @@
 
 package org.apache.kafka.clients.admin;
 
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.TopicPartitionReplica;
 import org.apache.kafka.common.acl.AclBinding;
@@ -768,4 +770,11 @@ public ListConsumerGroupOffsetsResult 
listConsumerGroupOffsets(String groupId) {
     public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> 
groupIds) {
         return deleteConsumerGroups(groupIds, new 
DeleteConsumerGroupsOptions());
     }
+
+    /**
+     * Get the metrics kept by the adminClient
+     *
+     * @return
+     */
+    public abstract Map<MetricName, ? extends Metric> metrics();
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 62b6b6ee752..f4a2faea3af 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -35,6 +35,8 @@
 import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
@@ -2739,4 +2741,9 @@ void handleFailure(Throwable throwable) {
 
         return new DeleteConsumerGroupsResult(new HashMap<String, 
KafkaFuture<Void>>(futures));
     }
+
+    @Override
+    public Map<MetricName, ? extends Metric> metrics() {
+        return Collections.unmodifiableMap(this.metrics.metrics());
+    }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java 
b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index 51750720240..b5131aea0ea 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -17,6 +17,8 @@
 package org.apache.kafka.clients.admin;
 
 import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.TopicPartitionInfo;
@@ -47,6 +49,8 @@
     private Node controller;
     private int timeoutNextRequests = 0;
 
+    private Map<MetricName, Metric> mockMetrics = new HashMap<>();
+
     /**
      * Creates MockAdminClient for a cluster with the given brokers. The Kafka 
cluster ID uses the default value from
      * DEFAULT_CLUSTER_ID.
@@ -390,4 +394,13 @@ public void close(long duration, TimeUnit unit) {}
             this.configs = configs != null ? configs : Collections.<String, 
String>emptyMap();
         }
     }
+
+    public void setMockMetrics(MetricName name, Metric metric) {
+        mockMetrics.put(name, metric);
+    }
+
+    @Override
+    public Map<MetricName, ? extends Metric> metrics() {
+        return mockMetrics;
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 6a707ff986d..cef8116e880 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -382,12 +382,12 @@ public void setGlobalStateRestoreListener(final 
StateRestoreListener globalState
      *
      * @return Map of all metrics.
      */
-    // TODO: we can add metrics for admin client as well
     public Map<MetricName, ? extends Metric> metrics() {
         final Map<MetricName, Metric> result = new LinkedHashMap<>();
         for (final StreamThread thread : threads) {
             result.putAll(thread.producerMetrics());
             result.putAll(thread.consumerMetrics());
+            result.putAll(thread.adminClientMetrics());
         }
         if (globalStreamThread != null) 
result.putAll(globalStreamThread.consumerMetrics());
         result.putAll(metrics.metrics());
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index a159e7b6c7a..fbf7fb60e80 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -1271,4 +1271,11 @@ TaskManager taskManager() {
         result.putAll(restoreConsumerMetrics);
         return result;
     }
+
+    public Map<MetricName, Metric> adminClientMetrics() {
+        final Map<MetricName, ? extends Metric> adminClientMetrics = 
taskManager.getAdminClient().metrics();
+        final LinkedHashMap<MetricName, Metric> result = new LinkedHashMap<>();
+        result.putAll(adminClientMetrics);
+        return result;
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 44db70d8b60..9da27020c5f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -278,6 +278,10 @@ void shutdown(final boolean clean) {
         }
     }
 
+    AdminClient getAdminClient() {
+        return adminClient;
+    }
+
     Set<TaskId> suspendedActiveTaskIds() {
         return active.previousTaskIds();
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 513d1c01b05..2ccc89348fb 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.admin.MockAdminClient;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -72,6 +73,7 @@
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -1303,4 +1305,53 @@ public double measure(MetricConfig config, long now) {
         Map<MetricName, Metric> producerMetrics = thread.producerMetrics();
         assertEquals(testMetricName, 
producerMetrics.get(testMetricName).metricName());
     }
+
+    @Test
+    public void adminClientMetricsVerification() {
+        final Node broker1 = new Node(0, "dummyHost-1", 1234);
+        final Node broker2 = new Node(1, "dummyHost-2", 1234);
+        List<Node> cluster = Arrays.asList(broker1, broker2);
+
+        final MockAdminClient adminClient = new MockAdminClient(cluster, 
broker1, null);
+
+        final MockProducer<byte[], byte[]> producer = new MockProducer<>();
+        final Consumer<byte[], byte[]> consumer = 
EasyMock.createNiceMock(Consumer.class);
+        final TaskManager taskManager = 
EasyMock.createNiceMock(TaskManager.class);
+
+        final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new 
StreamThread.StreamsMetricsThreadImpl(metrics, "");
+        final StreamThread thread = new StreamThread(
+                mockTime,
+                config,
+                producer,
+                consumer,
+                consumer,
+                null,
+                taskManager,
+                streamsMetrics,
+                internalTopologyBuilder,
+                clientId,
+                new LogContext(""),
+                new AtomicBoolean());
+        final MetricName testMetricName = new MetricName("test_metric", "", 
"", new HashMap<String, String>());
+        final Metric testMetric = new KafkaMetric(
+                new Object(),
+                testMetricName,
+                new Measurable() {
+                    @Override
+                    public double measure(MetricConfig config, long now) {
+                        return 0;
+                    }
+                },
+                null,
+                new MockTime());
+
+
+        EasyMock.expect(taskManager.getAdminClient()).andReturn(adminClient);
+        EasyMock.expectLastCall();
+        EasyMock.replay(taskManager, consumer);
+
+        adminClient.setMockMetrics(testMetricName, testMetric);
+        Map<MetricName, Metric> adminClientMetrics = 
thread.adminClientMetrics();
+        assertEquals(testMetricName, 
adminClientMetrics.get(testMetricName).metricName());
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Export Admin Client metrics through Stream Threads
> --------------------------------------------------
>
>                 Key: KAFKA-6986
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6986
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Boyang Chen
>            Assignee: Shun Guan
>            Priority: Minor
>              Labels: Newcomer, beginner, newbie
>             Fix For: 2.1.0
>
>
> We already exported producer and consumer metrics through KafkaStreams class:
> [https://github.com/apache/kafka/pull/4998]
> It makes sense to also export the Admin client metrics.
> If any new contributor wishes to take over this one, please let me know. I 
> will revisit and close this ticket in one or two months later in case no one 
> claims it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to