junrao commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1391737904


##########
clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java:
##########
@@ -60,17 +62,39 @@ public PushTelemetryRequest(PushTelemetryRequestData data, 
short version) {
 
     @Override
     public PushTelemetryResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
-        PushTelemetryResponseData responseData = new 
PushTelemetryResponseData()
-                .setErrorCode(Errors.forException(e).code())
-                .setThrottleTimeMs(throttleTimeMs);
-        return new PushTelemetryResponse(responseData);
+        return errorResponse(throttleTimeMs, Errors.forException(e));

Review Comment:
   This is an existing issue, but `getErrorResponse` can just be 
`errorResponse`.



##########
core/src/test/java/kafka/metrics/ClientMetricsInstanceTest.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.net.UnknownHostException;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientMetricsInstanceTest {
+
+    private Uuid uuid;
+    private ClientMetricsInstanceMetadata instanceMetadata;
+    private ClientMetricsInstance clientInstance;
+
+    @BeforeEach
+    public void setUp() throws UnknownHostException {
+        uuid = Uuid.randomUuid();
+        instanceMetadata = new ClientMetricsInstanceMetadata(uuid,
+            ClientMetricsTestUtils.requestContext());

Review Comment:
   merge with previous line?



##########
core/src/test/java/kafka/server/ClientMetricsManagerTest.java:
##########
@@ -0,0 +1,921 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server;
+
+import kafka.metrics.ClientMetricsConfigs;
+import kafka.metrics.ClientMetricsInstance;
+import kafka.metrics.ClientMetricsTestUtils;
+import kafka.server.ClientMetricsManager.SubscriptionInfo;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryRequest.Builder;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientMetricsManagerTest {
+
+    private ClientMetricsManager clientMetricsManager;
+
+    @BeforeEach
+    public void setUp() {
+        clientMetricsManager = new ClientMetricsManager();
+    }
+
+    @Test
+    public void testUpdateSubscription() {
+        assertTrue(clientMetricsManager.subscriptions().isEmpty());
+
+        long previousEpoch = 
clientMetricsManager.lastSubscriptionUpdateEpoch();
+        clientMetricsManager.updateSubscription("sub-1", 
ClientMetricsTestUtils.defaultProperties());
+
+        assertEquals(1, clientMetricsManager.subscriptions().size());
+        assertNotNull(clientMetricsManager.subscriptionInfo("sub-1"));
+
+        SubscriptionInfo subscriptionInfo = 
clientMetricsManager.subscriptionInfo("sub-1");
+        Set<String> metrics = subscriptionInfo.metrics();
+
+        // Validate metrics.
+        assertEquals(ClientMetricsTestUtils.DEFAULT_METRICS.split(",").length, 
metrics.size());
+        
Arrays.stream(ClientMetricsTestUtils.DEFAULT_METRICS.split(",")).forEach(metric 
->
+            assertTrue(metrics.contains(metric)));
+        // Validate push interval.
+        
assertEquals(ClientMetricsTestUtils.defaultProperties().getProperty(ClientMetricsConfigs.PUSH_INTERVAL_MS),
+            String.valueOf(subscriptionInfo.intervalMs()));
+
+        // Validate match patterns.
+        
assertEquals(ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.size(),
+            subscriptionInfo.matchPattern().size());
+        ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.forEach(pattern 
-> {
+            String[] split = pattern.split("=");
+            assertTrue(subscriptionInfo.matchPattern().containsKey(split[0]));
+            assertEquals(split[1], 
subscriptionInfo.matchPattern().get(split[0]).pattern());
+        });
+        assertTrue(clientMetricsManager.lastSubscriptionUpdateEpoch() > 
previousEpoch);
+    }
+
+    @Test
+    public void testUpdateSubscriptionWithEmptyProperties() {
+        assertTrue(clientMetricsManager.subscriptions().isEmpty());
+        long previousEpoch = 
clientMetricsManager.lastSubscriptionUpdateEpoch();
+        clientMetricsManager.updateSubscription("sub-1", new Properties());
+        // No subscription should be added as the properties are empty.
+        assertEquals(0, clientMetricsManager.subscriptions().size());
+        assertEquals(previousEpoch, 
clientMetricsManager.lastSubscriptionUpdateEpoch());

Review Comment:
   `clientMetricsManager.lastSubscriptionUpdateEpoch()` could  change during 
the test, right? Could we use MockTime to make the test more reliable?



##########
core/src/main/java/kafka/server/ClientMetricsManager.java:
##########
@@ -30,17 +68,366 @@ public class ClientMetricsManager implements Closeable {
 
     private static final Logger log = 
LoggerFactory.getLogger(ClientMetricsManager.class);
     private static final ClientMetricsManager INSTANCE = new 
ClientMetricsManager();
+    private static final List<Byte> SUPPORTED_COMPRESSION_TYPES = 
Collections.unmodifiableList(
+        Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, 
CompressionType.GZIP.id, CompressionType.SNAPPY.id));
 
     public static ClientMetricsManager instance() {
         return INSTANCE;
     }
+    // Max cache size (16k active client connections per broker)
+    private static final int CM_CACHE_MAX_SIZE = 16384;
+    private final Cache<Uuid, ClientMetricsInstance> clientInstanceCache;
+    private final Map<String, SubscriptionInfo> subscriptionMap;
+
+    // The last subscription updated time is used to determine if the next 
telemetry request needs
+    // to re-evaluate the subscription id as per changes subscriptions.
+    private volatile long lastSubscriptionUpdateEpoch;
+
+    // Visible for testing
+    ClientMetricsManager() {
+        subscriptionMap = new ConcurrentHashMap<>();
+        clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+    }
 
     public void updateSubscription(String subscriptionName, Properties 
properties) {
-        // TODO: Implement the update logic to manage subscriptions.
+        // IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+        // for respective subscription. In that case, we need to remove the 
subscription from the map.
+        if (properties.isEmpty()) {
+            // Remove the subscription from the map if it exists, else ignore 
the config update.
+            if (subscriptionMap.containsKey(subscriptionName)) {
+                log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+                subscriptionMap.remove(subscriptionName);
+                updateLastSubscriptionUpdateEpoch();
+            }
+            return;
+        }
+
+        ClientMetricsConfigs configs = new ClientMetricsConfigs(properties);
+        updateClientSubscription(subscriptionName, configs);
+        /*
+         Update last subscription updated time to current time to indicate 
that there is a change
+         in the subscription. This will be used to determine if the next 
telemetry request needs
+         to re-evaluate the subscription id as per changes subscriptions.
+        */
+        updateLastSubscriptionUpdateEpoch();
+    }
+
+    public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+        GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+        long now = System.currentTimeMillis();
+        Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+            .filter(id -> !id.equals(Uuid.ZERO_UUID))
+            .orElse(generateNewClientId());
+
+        /*
+         Get the client instance from the cache or create a new one. If 
subscription has changed
+         since the last request, then the client instance will be 
re-evaluated. Validation of the
+         request will be done after the client instance is created. If client 
issued get telemetry
+         request prior to push interval, then the client should get a throttle 
error but if the
+         subscription has changed since the last request then the client 
should get the updated
+         subscription immediately.
+        */
+        ClientMetricsInstance clientInstance = 
clientInstance(clientInstanceId, requestContext, now);
+
+        try {
+            // Validate the get request parameters for the client instance.
+            validateGetRequest(request, clientInstance, now);
+        } catch (ApiException exception) {
+            return request.getErrorResponse(throttleMs, exception);
+        }
+
+        clientInstance.lastKnownError(Errors.NONE);
+        return createGetSubscriptionResponse(clientInstanceId, clientInstance, 
telemetryMaxBytes, throttleMs);
+    }
+
+    public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest request,
+        int telemetryMaxBytes, RequestContext requestContext, int throttleMs) {
+
+        Uuid clientInstanceId = request.data().clientInstanceId();
+        if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) {
+            String msg = String.format("Invalid request from the client [%s], 
invalid client instance id",
+                clientInstanceId);
+            return request.getErrorResponse(throttleMs, new 
InvalidRequestException(msg));
+        }
+
+        long now = System.currentTimeMillis();
+        ClientMetricsInstance clientInstance = 
clientInstance(clientInstanceId, requestContext, now);
+
+        try {
+            // Validate the push request parameters for the client instance.
+            validatePushRequest(request, telemetryMaxBytes, clientInstance, 
now);
+        } catch (ApiException exception) {
+            log.debug("Error validating push telemetry request from client 
[{}]", clientInstanceId, exception);
+            clientInstance.lastKnownError(Errors.forException(exception));
+            return request.getErrorResponse(throttleMs, exception);
+        } finally {
+            // Update the client instance with the latest push request 
parameters.
+            clientInstance.terminating(request.data().terminating());
+        }
+
+        // Push the metrics to the external client receiver plugin.
+        byte[] metrics = request.data().metrics();
+        if (metrics != null && metrics.length > 0) {
+            try {
+                
ClientMetricsReceiverPlugin.instance().exportMetrics(requestContext, request);
+            } catch (Exception exception) {
+                clientInstance.lastKnownError(Errors.INVALID_RECORD);
+                return request.errorResponse(throttleMs, 
Errors.INVALID_RECORD);
+            }
+        }
+
+        clientInstance.lastKnownError(Errors.NONE);
+        return new PushTelemetryResponse(new 
PushTelemetryResponseData().setThrottleTimeMs(throttleMs));
     }
 
     @Override
     public void close() throws IOException {
-        // TODO: Implement the close logic to close the client metrics manager.
+        subscriptionMap.clear();
+    }
+
+    private synchronized void updateLastSubscriptionUpdateEpoch() {
+        this.lastSubscriptionUpdateEpoch = System.currentTimeMillis();
+    }
+
+    private void updateClientSubscription(String subscriptionName, 
ClientMetricsConfigs configs) {
+        List<String> metrics = 
configs.getList(ClientMetricsConfigs.SUBSCRIPTION_METRICS);
+        int pushInterval = 
configs.getInt(ClientMetricsConfigs.PUSH_INTERVAL_MS);
+        List<String> clientMatchPattern = 
configs.getList(ClientMetricsConfigs.CLIENT_MATCH_PATTERN);
+
+        SubscriptionInfo newSubscription =
+            new SubscriptionInfo(subscriptionName, metrics, pushInterval,
+                
ClientMetricsConfigs.parseMatchingPatterns(clientMatchPattern));
+
+        subscriptionMap.put(subscriptionName, newSubscription);
+    }
+
+    private Uuid generateNewClientId() {
+        Uuid id = Uuid.randomUuid();
+        while (clientInstanceCache.get(id) != null) {
+            id = Uuid.randomUuid();
+        }
+        return id;
+    }
+
+    private ClientMetricsInstance clientInstance(Uuid clientInstanceId, 
RequestContext requestContext,
+        long timestamp) {
+        ClientMetricsInstance clientInstance = 
clientInstanceCache.get(clientInstanceId);
+
+        if (clientInstance == null) {
+            /*
+             If the client instance is not present in the cache, then create a 
new client instance
+             and update the cache. This can also happen when the telemetry 
request is received by
+             the separate broker instance. Though cache is synchronized, but 
it is possible that
+             concurrent calls can create the same client instance. Hence, 
safeguard the client instance
+             creation with a double-checked lock to ensure that only one 
instance is created.
+            */
+            synchronized (this) {
+                clientInstance = clientInstanceCache.get(clientInstanceId);
+                if (clientInstance != null) {
+                    return clientInstance;
+                }
+
+                ClientMetricsInstanceMetadata instanceMetadata = new 
ClientMetricsInstanceMetadata(
+                    clientInstanceId, requestContext);
+                clientInstance = 
createClientInstanceAndUpdateCache(clientInstanceId, instanceMetadata, 
timestamp);
+            }
+        } else if (clientInstance.subscriptionUpdateEpoch() < 
lastSubscriptionUpdateEpoch) {
+            /*
+             If the last subscription update time for client instance is older 
than the subscription
+             updated time, then re-evaluate the subscription information for 
the client as per the
+             updated subscriptions. This is to ensure that the client instance 
is always in sync with
+             the latest subscription information. Though cache is 
synchronized, but it is possible that
+             concurrent calls can create the same client instance. Hence, 
safeguard the client instance
+             update with a double-checked lock to ensure that only one 
instance is created.
+            */
+            synchronized (this) {
+                clientInstance = clientInstanceCache.get(clientInstanceId);
+                if (clientInstance.subscriptionUpdateEpoch() >= 
lastSubscriptionUpdateEpoch) {
+                    return clientInstance;
+                }
+                clientInstance = 
createClientInstanceAndUpdateCache(clientInstanceId, 
clientInstance.instanceMetadata(), timestamp);
+            }
+        }
+
+        return clientInstance;
+    }
+
+    private ClientMetricsInstance createClientInstanceAndUpdateCache(Uuid 
clientInstanceId,
+        ClientMetricsInstanceMetadata instanceMetadata, long timestamp) {
+
+        ClientMetricsInstance clientInstance = 
createClientInstance(clientInstanceId, instanceMetadata,
+            timestamp);
+        clientInstanceCache.put(clientInstanceId, clientInstance);
+        return clientInstance;
+    }
+
+    private ClientMetricsInstance createClientInstance(Uuid clientInstanceId,
+        ClientMetricsInstanceMetadata instanceMetadata, long timestamp) {
+
+        int pushIntervalMs = ClientMetricsConfigs.DEFAULT_INTERVAL_MS;
+        // Keep a set of metrics to avoid duplicates in case of overlapping 
subscriptions.
+        Set<String> subscribedMetrics = new HashSet<>();
+        boolean allMetricsSubscribed = false;
+
+        for (SubscriptionInfo info : subscriptions()) {
+            if (instanceMetadata.isMatch(info.matchPattern())) {
+                allMetricsSubscribed = allMetricsSubscribed || 
info.metrics().contains(
+                    ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG);
+                subscribedMetrics.addAll(info.metrics());
+                pushIntervalMs = Math.min(pushIntervalMs, info.intervalMs());
+            }
+        }
+
+        /*
+         If client matches with any subscription that has empty metrics 
string, then it means that client
+         is subscribed to all the metrics, so just send the empty string as 
the subscribed metrics.
+        */
+        if (allMetricsSubscribed) {
+            subscribedMetrics.clear();
+            // Add an empty string to indicate that all metrics are subscribed.
+            subscribedMetrics.add(ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS);
+        }
+
+        int subscriptionId = computeSubscriptionId(subscribedMetrics, 
pushIntervalMs, clientInstanceId);
+
+        return new ClientMetricsInstance(clientInstanceId, instanceMetadata, 
subscriptionId, timestamp,
+            subscribedMetrics, pushIntervalMs);
+    }
+
+    /**
+     * Computes the SubscriptionId as a unique identifier for a client 
instance's subscription set,
+     * the id is generated by calculating a CRC32 of the configured metrics 
subscriptions including
+     * the PushIntervalMs, XORed with the ClientInstanceId.
+     */
+    private int computeSubscriptionId(Set<String> metrics, int pushIntervalMs, 
Uuid clientInstanceId) {
+        CRC32 crc = new CRC32();

Review Comment:
   We have moved to CRC32C since it's more efficient 
(https://issues.apache.org/jira/browse/KAFKA-1449). Should we use CRC32C here 
too?



##########
core/src/test/java/kafka/metrics/ClientMetricsInstanceTest.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.net.UnknownHostException;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientMetricsInstanceTest {
+
+    private Uuid uuid;
+    private ClientMetricsInstanceMetadata instanceMetadata;
+    private ClientMetricsInstance clientInstance;
+
+    @BeforeEach
+    public void setUp() throws UnknownHostException {
+        uuid = Uuid.randomUuid();
+        instanceMetadata = new ClientMetricsInstanceMetadata(uuid,
+            ClientMetricsTestUtils.requestContext());
+        clientInstance = new ClientMetricsInstance(Uuid.randomUuid(), 
instanceMetadata, 0, 0,
+            null, ClientMetricsConfigs.DEFAULT_INTERVAL_MS);
+    }
+
+    @Test
+    public void testMaybeUpdateRequestEpochValid() {
+        // First request should be accepted.
+        
assertTrue(clientInstance.maybeUpdateGetRequestEpoch(System.currentTimeMillis()));
+        
assertTrue(clientInstance.maybeUpdatePushRequestEpoch(System.currentTimeMillis()));
+    }
+
+    @Test
+    public void testMaybeUpdateGetRequestAfterElapsedTimeValid() throws 
InterruptedException {
+        ClientMetricsInstance clientInstance = new ClientMetricsInstance(uuid, 
instanceMetadata, 0, 0,
+            null, 2);
+        
assertTrue(clientInstance.maybeUpdateGetRequestEpoch(System.currentTimeMillis()));
+        // sleep for 3 ms to ensure that the next request is accepted.
+        Thread.sleep(3);
+        // Second request should be accepted as time since last request is 
greater than the retry interval.

Review Comment:
   retry interval => push interval



##########
core/src/test/java/kafka/metrics/ClientMetricsInstanceMetadataTest.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.api.Test;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientMetricsInstanceMetadataTest {
+
+    @Test
+    public void testIsMatchValid() throws UnknownHostException {
+        Uuid uuid = Uuid.randomUuid();
+        ClientMetricsInstanceMetadata instanceMetadata = new 
ClientMetricsInstanceMetadata(uuid,
+            ClientMetricsTestUtils.requestContext());
+        // We consider empty/missing client matching patterns as valid
+        assertTrue(instanceMetadata.isMatch(Collections.emptyMap()));
+        assertTrue(instanceMetadata.isMatch(null));

Review Comment:
   How could a user create a null pattern?



##########
core/src/test/java/kafka/metrics/ClientMetricsInstanceTest.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.net.UnknownHostException;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientMetricsInstanceTest {
+
+    private Uuid uuid;
+    private ClientMetricsInstanceMetadata instanceMetadata;
+    private ClientMetricsInstance clientInstance;
+
+    @BeforeEach
+    public void setUp() throws UnknownHostException {
+        uuid = Uuid.randomUuid();
+        instanceMetadata = new ClientMetricsInstanceMetadata(uuid,
+            ClientMetricsTestUtils.requestContext());
+        clientInstance = new ClientMetricsInstance(Uuid.randomUuid(), 
instanceMetadata, 0, 0,
+            null, ClientMetricsConfigs.DEFAULT_INTERVAL_MS);
+    }
+
+    @Test
+    public void testMaybeUpdateRequestEpochValid() {
+        // First request should be accepted.
+        
assertTrue(clientInstance.maybeUpdateGetRequestEpoch(System.currentTimeMillis()));
+        
assertTrue(clientInstance.maybeUpdatePushRequestEpoch(System.currentTimeMillis()));
+    }
+
+    @Test
+    public void testMaybeUpdateGetRequestAfterElapsedTimeValid() throws 
InterruptedException {
+        ClientMetricsInstance clientInstance = new ClientMetricsInstance(uuid, 
instanceMetadata, 0, 0,
+            null, 2);
+        
assertTrue(clientInstance.maybeUpdateGetRequestEpoch(System.currentTimeMillis()));
+        // sleep for 3 ms to ensure that the next request is accepted.
+        Thread.sleep(3);
+        // Second request should be accepted as time since last request is 
greater than the retry interval.
+        
assertTrue(clientInstance.maybeUpdateGetRequestEpoch(System.currentTimeMillis()));
+    }
+
+    @Test
+    public void testMaybeUpdateGetRequestWithImmediateRetryFail() {
+        
assertTrue(clientInstance.maybeUpdateGetRequestEpoch(System.currentTimeMillis()));
+        // Second request should be rejected as time since last request is 
less than the retry interval.
+        
assertFalse(clientInstance.maybeUpdateGetRequestEpoch(System.currentTimeMillis()));
+    }
+
+    @Test
+    public void testMaybeUpdatePushRequestAfterElapsedTimeValid() throws 
InterruptedException {

Review Comment:
   How is this test different from 
`testMaybeUpdateGetRequestAfterElapsedTimeValid`?



##########
core/src/test/java/kafka/metrics/ClientMetricsReceiverPluginTest.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import kafka.metrics.ClientMetricsTestUtils.TestClientMetricsReceiver;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.junit.jupiter.api.Test;
+
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientMetricsReceiverPluginTest {
+
+    TestClientMetricsReceiver telemetryReceiver = new 
TestClientMetricsReceiver();
+
+    @Test
+    public void testExportMetrics() throws UnknownHostException {
+        assertTrue(ClientMetricsReceiverPlugin.instance().isEmpty());
+
+        ClientMetricsReceiverPlugin.instance().add(telemetryReceiver);
+        assertFalse(ClientMetricsReceiverPlugin.instance().isEmpty());
+
+        assertEquals(0, telemetryReceiver.exportMetricsInvokedCount);
+        assertTrue(telemetryReceiver.metricsData.isEmpty());
+
+        
ClientMetricsReceiverPlugin.instance().exportMetrics(ClientMetricsTestUtils.requestContext(),
+            new PushTelemetryRequest.Builder(
+                new PushTelemetryRequestData()
+                    
.setMetrics("test-metrics".getBytes(StandardCharsets.UTF_8)))
+                .build());
+
+        assertEquals(1, telemetryReceiver.exportMetricsInvokedCount);
+        assertEquals(1, telemetryReceiver.metricsData.size());
+        assertEquals("test-metrics", new String(

Review Comment:
   Could we just directly compare the bytes?



##########
core/src/main/java/kafka/metrics/ClientMetricsReceiverPlugin.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.server.telemetry.ClientTelemetryReceiver;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Plugin to register client telemetry receivers and export metrics. This 
class is used by the Kafka
+ * server to export client metrics to the registered receivers.
+ */
+public class ClientMetricsReceiverPlugin {
+
+    private static final ClientMetricsReceiverPlugin INSTANCE = new 
ClientMetricsReceiverPlugin();

Review Comment:
   static fields like that could make it more difficult to test multiple broker 
instances in the same JVM. Could we create an instance when creating a broker 
and pass that instance along? Ditto for `ClientMetricsManager.INSTANCE`.



##########
core/src/test/java/kafka/metrics/ClientMetricsInstanceTest.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.net.UnknownHostException;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientMetricsInstanceTest {
+
+    private Uuid uuid;
+    private ClientMetricsInstanceMetadata instanceMetadata;
+    private ClientMetricsInstance clientInstance;
+
+    @BeforeEach
+    public void setUp() throws UnknownHostException {
+        uuid = Uuid.randomUuid();
+        instanceMetadata = new ClientMetricsInstanceMetadata(uuid,
+            ClientMetricsTestUtils.requestContext());
+        clientInstance = new ClientMetricsInstance(Uuid.randomUuid(), 
instanceMetadata, 0, 0,
+            null, ClientMetricsConfigs.DEFAULT_INTERVAL_MS);
+    }
+
+    @Test
+    public void testMaybeUpdateRequestEpochValid() {
+        // First request should be accepted.
+        
assertTrue(clientInstance.maybeUpdateGetRequestEpoch(System.currentTimeMillis()));
+        
assertTrue(clientInstance.maybeUpdatePushRequestEpoch(System.currentTimeMillis()));
+    }
+
+    @Test
+    public void testMaybeUpdateGetRequestAfterElapsedTimeValid() throws 
InterruptedException {
+        ClientMetricsInstance clientInstance = new ClientMetricsInstance(uuid, 
instanceMetadata, 0, 0,
+            null, 2);
+        
assertTrue(clientInstance.maybeUpdateGetRequestEpoch(System.currentTimeMillis()));
+        // sleep for 3 ms to ensure that the next request is accepted.
+        Thread.sleep(3);

Review Comment:
   Could we use MockTime to avoid actual sleeping during tests?



##########
core/src/test/java/kafka/server/ClientMetricsManagerTest.java:
##########
@@ -0,0 +1,921 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server;
+
+import kafka.metrics.ClientMetricsConfigs;
+import kafka.metrics.ClientMetricsInstance;
+import kafka.metrics.ClientMetricsTestUtils;
+import kafka.server.ClientMetricsManager.SubscriptionInfo;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryRequest.Builder;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientMetricsManagerTest {
+
+    private ClientMetricsManager clientMetricsManager;
+
+    @BeforeEach
+    public void setUp() {
+        clientMetricsManager = new ClientMetricsManager();
+    }
+
+    @Test
+    public void testUpdateSubscription() {
+        assertTrue(clientMetricsManager.subscriptions().isEmpty());
+
+        long previousEpoch = 
clientMetricsManager.lastSubscriptionUpdateEpoch();
+        clientMetricsManager.updateSubscription("sub-1", 
ClientMetricsTestUtils.defaultProperties());
+
+        assertEquals(1, clientMetricsManager.subscriptions().size());
+        assertNotNull(clientMetricsManager.subscriptionInfo("sub-1"));
+
+        SubscriptionInfo subscriptionInfo = 
clientMetricsManager.subscriptionInfo("sub-1");
+        Set<String> metrics = subscriptionInfo.metrics();
+
+        // Validate metrics.
+        assertEquals(ClientMetricsTestUtils.DEFAULT_METRICS.split(",").length, 
metrics.size());
+        
Arrays.stream(ClientMetricsTestUtils.DEFAULT_METRICS.split(",")).forEach(metric 
->
+            assertTrue(metrics.contains(metric)));
+        // Validate push interval.
+        
assertEquals(ClientMetricsTestUtils.defaultProperties().getProperty(ClientMetricsConfigs.PUSH_INTERVAL_MS),
+            String.valueOf(subscriptionInfo.intervalMs()));
+
+        // Validate match patterns.
+        
assertEquals(ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.size(),
+            subscriptionInfo.matchPattern().size());
+        ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.forEach(pattern 
-> {
+            String[] split = pattern.split("=");
+            assertTrue(subscriptionInfo.matchPattern().containsKey(split[0]));
+            assertEquals(split[1], 
subscriptionInfo.matchPattern().get(split[0]).pattern());
+        });
+        assertTrue(clientMetricsManager.lastSubscriptionUpdateEpoch() > 
previousEpoch);
+    }
+
+    @Test
+    public void testUpdateSubscriptionWithEmptyProperties() {
+        assertTrue(clientMetricsManager.subscriptions().isEmpty());
+        long previousEpoch = 
clientMetricsManager.lastSubscriptionUpdateEpoch();
+        clientMetricsManager.updateSubscription("sub-1", new Properties());
+        // No subscription should be added as the properties are empty.
+        assertEquals(0, clientMetricsManager.subscriptions().size());
+        assertEquals(previousEpoch, 
clientMetricsManager.lastSubscriptionUpdateEpoch());
+    }
+
+    @Test
+    public void testUpdateSubscriptionWithNullProperties() {
+        assertTrue(clientMetricsManager.subscriptions().isEmpty());
+        long previousEpoch = 
clientMetricsManager.lastSubscriptionUpdateEpoch();
+        // Properties shouldn't be passed as null.
+        assertThrows(NullPointerException.class, () ->
+            clientMetricsManager.updateSubscription("sub-1", null));
+        assertEquals(0, clientMetricsManager.subscriptions().size());
+        assertEquals(previousEpoch, 
clientMetricsManager.lastSubscriptionUpdateEpoch());
+    }
+
+    @Test
+    public void testUpdateSubscriptionWithInvalidMetricsProperties() {
+        assertTrue(clientMetricsManager.subscriptions().isEmpty());
+
+        long previousEpoch = 
clientMetricsManager.lastSubscriptionUpdateEpoch();
+        Properties properties = new Properties();
+        properties.put("random", "random");
+        clientMetricsManager.updateSubscription("sub-1", properties);
+
+        // Undefined keys are ignored.

Review Comment:
   Hmm, this is kind of inconsistent. If we remove all properties in a config, 
we end up removing the subscription. However, if we create a subscription with 
a non-existing property, we end up with a subscription with the default 
properties.



##########
core/src/main/java/kafka/server/ClientMetricsManager.java:
##########
@@ -30,17 +68,366 @@ public class ClientMetricsManager implements Closeable {
 
     private static final Logger log = 
LoggerFactory.getLogger(ClientMetricsManager.class);
     private static final ClientMetricsManager INSTANCE = new 
ClientMetricsManager();
+    private static final List<Byte> SUPPORTED_COMPRESSION_TYPES = 
Collections.unmodifiableList(
+        Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, 
CompressionType.GZIP.id, CompressionType.SNAPPY.id));
 
     public static ClientMetricsManager instance() {
         return INSTANCE;
     }
+    // Max cache size (16k active client connections per broker)
+    private static final int CM_CACHE_MAX_SIZE = 16384;
+    private final Cache<Uuid, ClientMetricsInstance> clientInstanceCache;
+    private final Map<String, SubscriptionInfo> subscriptionMap;
+
+    // The last subscription updated time is used to determine if the next 
telemetry request needs
+    // to re-evaluate the subscription id as per changes subscriptions.
+    private volatile long lastSubscriptionUpdateEpoch;
+
+    // Visible for testing
+    ClientMetricsManager() {
+        subscriptionMap = new ConcurrentHashMap<>();
+        clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+    }
 
     public void updateSubscription(String subscriptionName, Properties 
properties) {
-        // TODO: Implement the update logic to manage subscriptions.
+        // IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+        // for respective subscription. In that case, we need to remove the 
subscription from the map.
+        if (properties.isEmpty()) {
+            // Remove the subscription from the map if it exists, else ignore 
the config update.
+            if (subscriptionMap.containsKey(subscriptionName)) {
+                log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+                subscriptionMap.remove(subscriptionName);
+                updateLastSubscriptionUpdateEpoch();
+            }
+            return;
+        }
+
+        ClientMetricsConfigs configs = new ClientMetricsConfigs(properties);
+        updateClientSubscription(subscriptionName, configs);
+        /*
+         Update last subscription updated time to current time to indicate 
that there is a change
+         in the subscription. This will be used to determine if the next 
telemetry request needs
+         to re-evaluate the subscription id as per changes subscriptions.
+        */
+        updateLastSubscriptionUpdateEpoch();
+    }
+
+    public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+        GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+        long now = System.currentTimeMillis();
+        Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+            .filter(id -> !id.equals(Uuid.ZERO_UUID))
+            .orElse(generateNewClientId());
+
+        /*
+         Get the client instance from the cache or create a new one. If 
subscription has changed
+         since the last request, then the client instance will be 
re-evaluated. Validation of the
+         request will be done after the client instance is created. If client 
issued get telemetry
+         request prior to push interval, then the client should get a throttle 
error but if the
+         subscription has changed since the last request then the client 
should get the updated
+         subscription immediately.
+        */
+        ClientMetricsInstance clientInstance = 
clientInstance(clientInstanceId, requestContext, now);
+
+        try {
+            // Validate the get request parameters for the client instance.
+            validateGetRequest(request, clientInstance, now);
+        } catch (ApiException exception) {
+            return request.getErrorResponse(throttleMs, exception);
+        }
+
+        clientInstance.lastKnownError(Errors.NONE);
+        return createGetSubscriptionResponse(clientInstanceId, clientInstance, 
telemetryMaxBytes, throttleMs);
+    }
+
+    public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest request,
+        int telemetryMaxBytes, RequestContext requestContext, int throttleMs) {
+
+        Uuid clientInstanceId = request.data().clientInstanceId();
+        if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) {

Review Comment:
   It seems that `clientInstanceId` can't be any in `Uuid.RESERVED`?



##########
core/src/test/java/kafka/server/ClientMetricsManagerTest.java:
##########
@@ -0,0 +1,921 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server;
+
+import kafka.metrics.ClientMetricsConfigs;
+import kafka.metrics.ClientMetricsInstance;
+import kafka.metrics.ClientMetricsTestUtils;
+import kafka.server.ClientMetricsManager.SubscriptionInfo;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryRequest.Builder;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientMetricsManagerTest {
+
+    private ClientMetricsManager clientMetricsManager;
+
+    @BeforeEach
+    public void setUp() {
+        clientMetricsManager = new ClientMetricsManager();
+    }
+
+    @Test
+    public void testUpdateSubscription() {
+        assertTrue(clientMetricsManager.subscriptions().isEmpty());
+
+        long previousEpoch = 
clientMetricsManager.lastSubscriptionUpdateEpoch();
+        clientMetricsManager.updateSubscription("sub-1", 
ClientMetricsTestUtils.defaultProperties());
+
+        assertEquals(1, clientMetricsManager.subscriptions().size());
+        assertNotNull(clientMetricsManager.subscriptionInfo("sub-1"));
+
+        SubscriptionInfo subscriptionInfo = 
clientMetricsManager.subscriptionInfo("sub-1");
+        Set<String> metrics = subscriptionInfo.metrics();
+
+        // Validate metrics.
+        assertEquals(ClientMetricsTestUtils.DEFAULT_METRICS.split(",").length, 
metrics.size());
+        
Arrays.stream(ClientMetricsTestUtils.DEFAULT_METRICS.split(",")).forEach(metric 
->
+            assertTrue(metrics.contains(metric)));
+        // Validate push interval.
+        
assertEquals(ClientMetricsTestUtils.defaultProperties().getProperty(ClientMetricsConfigs.PUSH_INTERVAL_MS),
+            String.valueOf(subscriptionInfo.intervalMs()));
+
+        // Validate match patterns.
+        
assertEquals(ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.size(),
+            subscriptionInfo.matchPattern().size());
+        ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.forEach(pattern 
-> {
+            String[] split = pattern.split("=");
+            assertTrue(subscriptionInfo.matchPattern().containsKey(split[0]));
+            assertEquals(split[1], 
subscriptionInfo.matchPattern().get(split[0]).pattern());
+        });
+        assertTrue(clientMetricsManager.lastSubscriptionUpdateEpoch() > 
previousEpoch);
+    }
+
+    @Test
+    public void testUpdateSubscriptionWithEmptyProperties() {
+        assertTrue(clientMetricsManager.subscriptions().isEmpty());
+        long previousEpoch = 
clientMetricsManager.lastSubscriptionUpdateEpoch();
+        clientMetricsManager.updateSubscription("sub-1", new Properties());
+        // No subscription should be added as the properties are empty.
+        assertEquals(0, clientMetricsManager.subscriptions().size());
+        assertEquals(previousEpoch, 
clientMetricsManager.lastSubscriptionUpdateEpoch());
+    }
+
+    @Test
+    public void testUpdateSubscriptionWithNullProperties() {
+        assertTrue(clientMetricsManager.subscriptions().isEmpty());
+        long previousEpoch = 
clientMetricsManager.lastSubscriptionUpdateEpoch();
+        // Properties shouldn't be passed as null.
+        assertThrows(NullPointerException.class, () ->
+            clientMetricsManager.updateSubscription("sub-1", null));
+        assertEquals(0, clientMetricsManager.subscriptions().size());
+        assertEquals(previousEpoch, 
clientMetricsManager.lastSubscriptionUpdateEpoch());
+    }
+
+    @Test
+    public void testUpdateSubscriptionWithInvalidMetricsProperties() {
+        assertTrue(clientMetricsManager.subscriptions().isEmpty());
+
+        long previousEpoch = 
clientMetricsManager.lastSubscriptionUpdateEpoch();
+        Properties properties = new Properties();
+        properties.put("random", "random");
+        clientMetricsManager.updateSubscription("sub-1", properties);
+
+        // Undefined keys are ignored.
+        assertEquals(1, clientMetricsManager.subscriptions().size());
+        assertNotNull(clientMetricsManager.subscriptionInfo("sub-1"));
+        // Config def default properties.
+        
assertTrue(clientMetricsManager.subscriptionInfo("sub-1").metrics().isEmpty());
+        assertEquals(ClientMetricsConfigs.DEFAULT_INTERVAL_MS, 
clientMetricsManager.subscriptionInfo("sub-1").intervalMs());
+        
assertTrue(clientMetricsManager.subscriptionInfo("sub-1").matchPattern().isEmpty());
+        assertTrue(clientMetricsManager.lastSubscriptionUpdateEpoch() > 
previousEpoch);
+    }
+
+    @Test
+    public void testUpdateSubscriptionWithPropertiesDeletion() throws 
InterruptedException {
+        assertTrue(clientMetricsManager.subscriptions().isEmpty());
+
+        long previousEpoch = 
clientMetricsManager.lastSubscriptionUpdateEpoch();
+        Properties properties = new Properties();
+        properties.put("interval.ms", "100");
+        clientMetricsManager.updateSubscription("sub-1", properties);
+        assertEquals(1, clientMetricsManager.subscriptions().size());
+        assertNotNull(clientMetricsManager.subscriptionInfo("sub-1"));
+        assertTrue(clientMetricsManager.lastSubscriptionUpdateEpoch() > 
previousEpoch);
+
+        previousEpoch = clientMetricsManager.lastSubscriptionUpdateEpoch();
+        Thread.sleep(1);
+        clientMetricsManager.updateSubscription("sub-1", new Properties());
+        // Subscription should be removed as all properties are removed.
+        assertEquals(0, clientMetricsManager.subscriptions().size());
+        assertTrue(clientMetricsManager.lastSubscriptionUpdateEpoch() > 
previousEpoch);
+    }
+
+    @Test
+    public void testGetTelemetry() throws UnknownHostException {
+        clientMetricsManager.updateSubscription("sub-1", 
ClientMetricsTestUtils.defaultProperties());
+        assertEquals(1, clientMetricsManager.subscriptions().size());
+
+        GetTelemetrySubscriptionsRequest request = new 
GetTelemetrySubscriptionsRequest.Builder(
+            new GetTelemetrySubscriptionsRequestData()).build();
+
+        GetTelemetrySubscriptionsResponse response = 
clientMetricsManager.processGetTelemetrySubscriptionRequest(
+            request, 100, ClientMetricsTestUtils.requestContext(), 0);
+
+        assertNotNull(response.data().clientInstanceId());
+        assertTrue(response.data().subscriptionId() != 0);
+
+        assertEquals(ClientMetricsTestUtils.DEFAULT_METRICS.split(",").length, 
response.data().requestedMetrics().size());
+        
Arrays.stream(ClientMetricsTestUtils.DEFAULT_METRICS.split(",")).forEach(metric 
->
+            assertTrue(response.data().requestedMetrics().contains(metric)));
+
+        assertEquals(4, response.data().acceptedCompressionTypes().size());
+        // validate compression types order.
+        assertEquals(CompressionType.ZSTD.id, 
response.data().acceptedCompressionTypes().get(0));
+        assertEquals(CompressionType.LZ4.id, 
response.data().acceptedCompressionTypes().get(1));
+        assertEquals(CompressionType.GZIP.id, 
response.data().acceptedCompressionTypes().get(2));
+        assertEquals(CompressionType.SNAPPY.id, 
response.data().acceptedCompressionTypes().get(3));
+        assertEquals(ClientMetricsTestUtils.DEFAULT_PUSH_INTERVAL_MS, 
response.data().pushIntervalMs());
+        assertTrue(response.data().deltaTemporality());
+        assertEquals(100, response.data().telemetryMaxBytes());
+        assertEquals(Errors.NONE, response.error());
+
+        ClientMetricsInstance instance = 
clientMetricsManager.clientInstance(response.data().clientInstanceId());
+        assertNotNull(instance);
+        assertEquals(Errors.NONE, instance.lastKnownError());
+    }
+
+    @Test
+    public void testGetTelemetryWithoutSubscription() throws 
UnknownHostException {

Review Comment:
   Does this test provide more coverage than `testGetTelemetry()`?



##########
core/src/main/java/kafka/metrics/ClientMetricsInstance.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Contains the metrics instance metadata and the state of the client instance.
+ */
+public class ClientMetricsInstance {
+
+    private final Uuid clientInstanceId;
+    private final ClientMetricsInstanceMetadata instanceMetadata;
+    private final int subscriptionId;
+    private final long subscriptionUpdateEpoch;
+    private final Set<String> metrics;
+    private final int pushIntervalMs;
+
+    private long lastGetRequestEpoch;
+    private long lastPushRequestEpoch;
+    private volatile boolean terminating;
+    private volatile Errors lastKnownError;
+
+    public ClientMetricsInstance(Uuid clientInstanceId, 
ClientMetricsInstanceMetadata instanceMetadata,
+        int subscriptionId, long subscriptionUpdateEpoch, Set<String> metrics, 
int pushIntervalMs) {
+        this.clientInstanceId = Objects.requireNonNull(clientInstanceId);
+        this.instanceMetadata = Objects.requireNonNull(instanceMetadata);
+        this.subscriptionId = subscriptionId;
+        this.subscriptionUpdateEpoch = subscriptionUpdateEpoch;
+        this.metrics = metrics;
+        this.terminating = false;
+        this.pushIntervalMs = pushIntervalMs;
+        this.lastKnownError = Errors.NONE;
+    }
+
+    public Uuid clientInstanceId() {
+        return clientInstanceId;
+    }
+
+    public ClientMetricsInstanceMetadata instanceMetadata() {
+        return instanceMetadata;
+    }
+
+    public int pushIntervalMs() {
+        return pushIntervalMs;
+    }
+
+    public long subscriptionUpdateEpoch() {
+        return subscriptionUpdateEpoch;
+    }
+
+    public int subscriptionId() {
+        return subscriptionId;
+    }
+
+    public Set<String> metrics() {
+        return metrics;
+    }
+
+    public boolean terminating() {
+        return terminating;

Review Comment:
   After the instance is terminated, when do we remove the instance form the 
in-memory state?



##########
core/src/main/java/kafka/server/ClientMetricsManager.java:
##########
@@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable {
     public static ClientMetricsManager instance() {
         return INSTANCE;
     }
+    // Max cache size (16k active client connections per broker)
+    private static final int CM_CACHE_MAX_SIZE = 16384;
+    private final Cache<Uuid, ClientMetricsInstance> clientInstanceCache;
+    private final Map<String, SubscriptionInfo> subscriptionMap;
+
+    // The last subscription updated time is used to determine if the next 
telemetry request needs
+    // to re-evaluate the subscription id as per changes subscriptions.
+    private long lastSubscriptionUpdateEpoch;
+
+    // Visible for testing
+    ClientMetricsManager() {
+        subscriptionMap = new ConcurrentHashMap<>();
+        clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+    }
 
     public void updateSubscription(String subscriptionName, Properties 
properties) {
-        // TODO: Implement the update logic to manage subscriptions.
+        // IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+        // for respective subscription. In that case, we need to remove the 
subscription from the map.
+        if (properties.isEmpty()) {
+            // Remove the subscription from the map if it exists, else ignore 
the config update.
+            if (subscriptionMap.containsKey(subscriptionName)) {
+                log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+                subscriptionMap.remove(subscriptionName);
+                updateLastSubscriptionUpdateEpoch();
+            }
+            return;
+        }
+
+        ClientMetricsConfigs configs = new ClientMetricsConfigs(properties);
+        updateClientSubscription(subscriptionName, configs);
+        /*
+         Update last subscription updated time to current time to indicate 
that there is a change
+         in the subscription. This will be used to determine if the next 
telemetry request needs
+         to re-evaluate the subscription id as per changes subscriptions.
+        */
+        updateLastSubscriptionUpdateEpoch();
+    }
+
+    public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+        GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+        long now = System.currentTimeMillis();
+        Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+            .filter(id -> !id.equals(Uuid.ZERO_UUID))
+            .orElse(generateNewClientId());
+
+        /*
+         Get the client instance from the cache or create a new one. If 
subscription has changed
+         since the last request, then the client instance will be 
re-evaluated. Validation of the
+         request will be done after the client instance is created. If client 
issued get telemetry
+         request prior to push interval, then the client should get a throttle 
error but if the
+         subscription has changed since the last request then the client 
should get the updated
+         subscription immediately.
+        */
+        ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+        try {
+            // Validate the get request parameters for the client instance.
+            validateGetRequest(request, clientInstance);
+        } catch (ApiException exception) {
+            return request.getErrorResponse(throttleMs, exception);
+        } finally {
+            clientInstance.lastGetRequestEpoch(now);
+        }
+
+        clientInstance.lastKnownError(Errors.NONE);
+        return createGetSubscriptionResponse(clientInstanceId, clientInstance, 
telemetryMaxBytes, throttleMs);
+    }
+
+    public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest request,
+        int telemetryMaxBytes, RequestContext requestContext, int throttleMs) {
+
+        Uuid clientInstanceId = request.data().clientInstanceId();
+        if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) {
+            String msg = String.format("Invalid request from the client [%s], 
invalid client instance id",
+                clientInstanceId);
+            return request.getErrorResponse(throttleMs, new 
InvalidRequestException(msg));
+        }
+
+        long now = System.currentTimeMillis();
+        ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+        try {
+            // Validate the push request parameters for the client instance.
+            validatePushRequest(request, telemetryMaxBytes, clientInstance);
+        } catch (ApiException exception) {
+            clientInstance.lastKnownError(Errors.forException(exception));
+            return request.getErrorResponse(throttleMs, exception);
+        } finally {
+            // Update the client instance with the latest push request 
parameters.
+            clientInstance.terminating(request.data().terminating());
+            clientInstance.lastPushRequestEpoch(now);
+        }
+
+        // Push the metrics to the external client receiver plugin.
+        byte[] metrics = request.data().metrics();
+        if (metrics != null && metrics.length > 0) {
+            ClientMetricsReceiverPlugin.exportMetrics(requestContext, request);
+        }
+
+        clientInstance.lastKnownError(Errors.NONE);
+        return request.createResponse(throttleMs, Errors.NONE);
     }
 
     @Override
     public void close() throws IOException {
-        // TODO: Implement the close logic to close the client metrics manager.
+        // Do nothing for now.
+    }
+
+    private void updateLastSubscriptionUpdateEpoch() {
+        this.lastSubscriptionUpdateEpoch = System.currentTimeMillis();
+    }
+
+    private void updateClientSubscription(String subscriptionName, 
ClientMetricsConfigs configs) {
+        List<String> metrics = 
configs.getList(ClientMetricsConfigs.SUBSCRIPTION_METRICS);
+        int pushInterval = 
configs.getInt(ClientMetricsConfigs.PUSH_INTERVAL_MS);
+        List<String> clientMatchPattern = 
configs.getList(ClientMetricsConfigs.CLIENT_MATCH_PATTERN);
+
+        SubscriptionInfo newSubscription =
+            new SubscriptionInfo(subscriptionName, metrics, pushInterval,
+                
ClientMetricsConfigs.parseMatchingPatterns(clientMatchPattern));
+
+        subscriptionMap.put(subscriptionName, newSubscription);
+    }
+
+    private Uuid generateNewClientId() {
+        Uuid id = Uuid.randomUuid();
+        while (clientInstanceCache.get(id) != null) {
+            id = Uuid.randomUuid();
+        }
+        return id;
+    }
+
+    private ClientMetricsInstance getClientInstance(Uuid clientInstanceId, 
RequestContext requestContext,
+        long timestamp) {
+        // Check if null can be called on the cache. if can then we can avoid 
the method call.
+        ClientMetricsInstance clientInstance = 
clientInstanceCache.get(clientInstanceId);
+
+        if (clientInstance == null) {
+            // If the client instance is not present in the cache, then create 
a new client instance
+            // and update the cache. This can also happen when the telemetry 
request is received by
+            // the separate broker instance.
+            ClientMetricsInstanceMetadata instanceMetadata = new 
ClientMetricsInstanceMetadata(
+                clientInstanceId, requestContext);
+            clientInstance = 
createClientInstanceAndUpdateCache(clientInstanceId, instanceMetadata, 
timestamp);
+        } else if (clientInstance.subscriptionUpdateEpoch() < 
lastSubscriptionUpdateEpoch) {

Review Comment:
   Hmm, is skipping the `=` check enough? It seems that it's possible that a 
client picks up the first subscription, but misses the second subscription. If 
that happens, it's possible that a client won't pick up a newly matched 
subscription for a long time, right? Could we just use a counter and bump it up 
each time when there is a subscription update?



##########
core/src/main/java/kafka/server/ClientMetricsManager.java:
##########
@@ -30,17 +68,366 @@ public class ClientMetricsManager implements Closeable {
 
     private static final Logger log = 
LoggerFactory.getLogger(ClientMetricsManager.class);
     private static final ClientMetricsManager INSTANCE = new 
ClientMetricsManager();
+    private static final List<Byte> SUPPORTED_COMPRESSION_TYPES = 
Collections.unmodifiableList(
+        Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, 
CompressionType.GZIP.id, CompressionType.SNAPPY.id));
 
     public static ClientMetricsManager instance() {
         return INSTANCE;
     }
+    // Max cache size (16k active client connections per broker)
+    private static final int CM_CACHE_MAX_SIZE = 16384;
+    private final Cache<Uuid, ClientMetricsInstance> clientInstanceCache;
+    private final Map<String, SubscriptionInfo> subscriptionMap;
+
+    // The last subscription updated time is used to determine if the next 
telemetry request needs
+    // to re-evaluate the subscription id as per changes subscriptions.
+    private volatile long lastSubscriptionUpdateEpoch;
+
+    // Visible for testing
+    ClientMetricsManager() {
+        subscriptionMap = new ConcurrentHashMap<>();
+        clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+    }
 
     public void updateSubscription(String subscriptionName, Properties 
properties) {
-        // TODO: Implement the update logic to manage subscriptions.
+        // IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+        // for respective subscription. In that case, we need to remove the 
subscription from the map.
+        if (properties.isEmpty()) {
+            // Remove the subscription from the map if it exists, else ignore 
the config update.
+            if (subscriptionMap.containsKey(subscriptionName)) {
+                log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+                subscriptionMap.remove(subscriptionName);
+                updateLastSubscriptionUpdateEpoch();
+            }
+            return;
+        }
+
+        ClientMetricsConfigs configs = new ClientMetricsConfigs(properties);
+        updateClientSubscription(subscriptionName, configs);
+        /*
+         Update last subscription updated time to current time to indicate 
that there is a change
+         in the subscription. This will be used to determine if the next 
telemetry request needs
+         to re-evaluate the subscription id as per changes subscriptions.
+        */
+        updateLastSubscriptionUpdateEpoch();
+    }
+
+    public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+        GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+        long now = System.currentTimeMillis();
+        Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+            .filter(id -> !id.equals(Uuid.ZERO_UUID))
+            .orElse(generateNewClientId());
+
+        /*
+         Get the client instance from the cache or create a new one. If 
subscription has changed
+         since the last request, then the client instance will be 
re-evaluated. Validation of the
+         request will be done after the client instance is created. If client 
issued get telemetry
+         request prior to push interval, then the client should get a throttle 
error but if the
+         subscription has changed since the last request then the client 
should get the updated
+         subscription immediately.
+        */
+        ClientMetricsInstance clientInstance = 
clientInstance(clientInstanceId, requestContext, now);
+
+        try {
+            // Validate the get request parameters for the client instance.
+            validateGetRequest(request, clientInstance, now);
+        } catch (ApiException exception) {
+            return request.getErrorResponse(throttleMs, exception);
+        }
+
+        clientInstance.lastKnownError(Errors.NONE);
+        return createGetSubscriptionResponse(clientInstanceId, clientInstance, 
telemetryMaxBytes, throttleMs);
+    }
+
+    public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest request,
+        int telemetryMaxBytes, RequestContext requestContext, int throttleMs) {
+
+        Uuid clientInstanceId = request.data().clientInstanceId();
+        if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) {
+            String msg = String.format("Invalid request from the client [%s], 
invalid client instance id",
+                clientInstanceId);
+            return request.getErrorResponse(throttleMs, new 
InvalidRequestException(msg));
+        }
+
+        long now = System.currentTimeMillis();
+        ClientMetricsInstance clientInstance = 
clientInstance(clientInstanceId, requestContext, now);
+
+        try {
+            // Validate the push request parameters for the client instance.
+            validatePushRequest(request, telemetryMaxBytes, clientInstance, 
now);
+        } catch (ApiException exception) {
+            log.debug("Error validating push telemetry request from client 
[{}]", clientInstanceId, exception);
+            clientInstance.lastKnownError(Errors.forException(exception));
+            return request.getErrorResponse(throttleMs, exception);
+        } finally {
+            // Update the client instance with the latest push request 
parameters.
+            clientInstance.terminating(request.data().terminating());
+        }
+
+        // Push the metrics to the external client receiver plugin.
+        byte[] metrics = request.data().metrics();
+        if (metrics != null && metrics.length > 0) {
+            try {
+                
ClientMetricsReceiverPlugin.instance().exportMetrics(requestContext, request);
+            } catch (Exception exception) {
+                clientInstance.lastKnownError(Errors.INVALID_RECORD);
+                return request.errorResponse(throttleMs, 
Errors.INVALID_RECORD);
+            }
+        }
+
+        clientInstance.lastKnownError(Errors.NONE);
+        return new PushTelemetryResponse(new 
PushTelemetryResponseData().setThrottleTimeMs(throttleMs));
     }
 
     @Override
     public void close() throws IOException {
-        // TODO: Implement the close logic to close the client metrics manager.
+        subscriptionMap.clear();
+    }
+
+    private synchronized void updateLastSubscriptionUpdateEpoch() {
+        this.lastSubscriptionUpdateEpoch = System.currentTimeMillis();
+    }
+
+    private void updateClientSubscription(String subscriptionName, 
ClientMetricsConfigs configs) {
+        List<String> metrics = 
configs.getList(ClientMetricsConfigs.SUBSCRIPTION_METRICS);
+        int pushInterval = 
configs.getInt(ClientMetricsConfigs.PUSH_INTERVAL_MS);
+        List<String> clientMatchPattern = 
configs.getList(ClientMetricsConfigs.CLIENT_MATCH_PATTERN);
+
+        SubscriptionInfo newSubscription =
+            new SubscriptionInfo(subscriptionName, metrics, pushInterval,
+                
ClientMetricsConfigs.parseMatchingPatterns(clientMatchPattern));
+
+        subscriptionMap.put(subscriptionName, newSubscription);
+    }
+
+    private Uuid generateNewClientId() {
+        Uuid id = Uuid.randomUuid();
+        while (clientInstanceCache.get(id) != null) {
+            id = Uuid.randomUuid();
+        }
+        return id;
+    }
+
+    private ClientMetricsInstance clientInstance(Uuid clientInstanceId, 
RequestContext requestContext,
+        long timestamp) {
+        ClientMetricsInstance clientInstance = 
clientInstanceCache.get(clientInstanceId);
+
+        if (clientInstance == null) {
+            /*
+             If the client instance is not present in the cache, then create a 
new client instance
+             and update the cache. This can also happen when the telemetry 
request is received by
+             the separate broker instance. Though cache is synchronized, but 
it is possible that
+             concurrent calls can create the same client instance. Hence, 
safeguard the client instance
+             creation with a double-checked lock to ensure that only one 
instance is created.
+            */
+            synchronized (this) {
+                clientInstance = clientInstanceCache.get(clientInstanceId);
+                if (clientInstance != null) {
+                    return clientInstance;
+                }
+
+                ClientMetricsInstanceMetadata instanceMetadata = new 
ClientMetricsInstanceMetadata(
+                    clientInstanceId, requestContext);
+                clientInstance = 
createClientInstanceAndUpdateCache(clientInstanceId, instanceMetadata, 
timestamp);
+            }
+        } else if (clientInstance.subscriptionUpdateEpoch() < 
lastSubscriptionUpdateEpoch) {
+            /*
+             If the last subscription update time for client instance is older 
than the subscription
+             updated time, then re-evaluate the subscription information for 
the client as per the
+             updated subscriptions. This is to ensure that the client instance 
is always in sync with
+             the latest subscription information. Though cache is 
synchronized, but it is possible that
+             concurrent calls can create the same client instance. Hence, 
safeguard the client instance
+             update with a double-checked lock to ensure that only one 
instance is created.
+            */
+            synchronized (this) {
+                clientInstance = clientInstanceCache.get(clientInstanceId);
+                if (clientInstance.subscriptionUpdateEpoch() >= 
lastSubscriptionUpdateEpoch) {
+                    return clientInstance;
+                }
+                clientInstance = 
createClientInstanceAndUpdateCache(clientInstanceId, 
clientInstance.instanceMetadata(), timestamp);
+            }
+        }
+
+        return clientInstance;
+    }
+
+    private ClientMetricsInstance createClientInstanceAndUpdateCache(Uuid 
clientInstanceId,
+        ClientMetricsInstanceMetadata instanceMetadata, long timestamp) {
+
+        ClientMetricsInstance clientInstance = 
createClientInstance(clientInstanceId, instanceMetadata,
+            timestamp);
+        clientInstanceCache.put(clientInstanceId, clientInstance);
+        return clientInstance;
+    }
+
+    private ClientMetricsInstance createClientInstance(Uuid clientInstanceId,
+        ClientMetricsInstanceMetadata instanceMetadata, long timestamp) {
+
+        int pushIntervalMs = ClientMetricsConfigs.DEFAULT_INTERVAL_MS;
+        // Keep a set of metrics to avoid duplicates in case of overlapping 
subscriptions.
+        Set<String> subscribedMetrics = new HashSet<>();
+        boolean allMetricsSubscribed = false;
+
+        for (SubscriptionInfo info : subscriptions()) {
+            if (instanceMetadata.isMatch(info.matchPattern())) {
+                allMetricsSubscribed = allMetricsSubscribed || 
info.metrics().contains(
+                    ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG);
+                subscribedMetrics.addAll(info.metrics());
+                pushIntervalMs = Math.min(pushIntervalMs, info.intervalMs());
+            }
+        }
+
+        /*
+         If client matches with any subscription that has empty metrics 
string, then it means that client
+         is subscribed to all the metrics, so just send the empty string as 
the subscribed metrics.
+        */
+        if (allMetricsSubscribed) {
+            subscribedMetrics.clear();
+            // Add an empty string to indicate that all metrics are subscribed.
+            subscribedMetrics.add(ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS);
+        }
+
+        int subscriptionId = computeSubscriptionId(subscribedMetrics, 
pushIntervalMs, clientInstanceId);
+
+        return new ClientMetricsInstance(clientInstanceId, instanceMetadata, 
subscriptionId, timestamp,
+            subscribedMetrics, pushIntervalMs);
+    }
+
+    /**
+     * Computes the SubscriptionId as a unique identifier for a client 
instance's subscription set,
+     * the id is generated by calculating a CRC32 of the configured metrics 
subscriptions including
+     * the PushIntervalMs, XORed with the ClientInstanceId.
+     */
+    private int computeSubscriptionId(Set<String> metrics, int pushIntervalMs, 
Uuid clientInstanceId) {
+        CRC32 crc = new CRC32();
+        byte[] metricsBytes = (metrics.toString() + 
pushIntervalMs).getBytes(StandardCharsets.UTF_8);
+        crc.update(ByteBuffer.wrap(metricsBytes));
+        return (int) crc.getValue() ^ clientInstanceId.hashCode();
+    }
+
+    private GetTelemetrySubscriptionsResponse 
createGetSubscriptionResponse(Uuid clientInstanceId,
+        ClientMetricsInstance clientInstance, int telemetryMaxBytes, int 
throttleMs) {
+
+        GetTelemetrySubscriptionsResponseData data = new 
GetTelemetrySubscriptionsResponseData()
+            .setClientInstanceId(clientInstanceId)
+            .setSubscriptionId(clientInstance.subscriptionId())
+            .setRequestedMetrics(new ArrayList<>(clientInstance.metrics()))
+            .setAcceptedCompressionTypes(SUPPORTED_COMPRESSION_TYPES)
+            .setPushIntervalMs(clientInstance.pushIntervalMs())
+            .setTelemetryMaxBytes(telemetryMaxBytes)
+            .setDeltaTemporality(true)
+            .setErrorCode(Errors.NONE.code())
+            .setThrottleTimeMs(throttleMs);
+
+        return new GetTelemetrySubscriptionsResponse(data);
+    }
+
+    private void validateGetRequest(GetTelemetrySubscriptionsRequest request,
+        ClientMetricsInstance clientInstance, long timestamp) {
+
+        if (!clientInstance.maybeUpdateGetRequestEpoch(timestamp) && 
(clientInstance.lastKnownError() != Errors.UNKNOWN_SUBSCRIPTION_ID

Review Comment:
   Why only testing these two errors? What about other errors like 
`InvalidRequest`?



##########
core/src/test/java/kafka/server/ClientMetricsManagerTest.java:
##########
@@ -0,0 +1,921 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server;
+
+import kafka.metrics.ClientMetricsConfigs;
+import kafka.metrics.ClientMetricsInstance;
+import kafka.metrics.ClientMetricsTestUtils;
+import kafka.server.ClientMetricsManager.SubscriptionInfo;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryRequest.Builder;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientMetricsManagerTest {
+
+    private ClientMetricsManager clientMetricsManager;
+
+    @BeforeEach
+    public void setUp() {
+        clientMetricsManager = new ClientMetricsManager();
+    }
+
+    @Test
+    public void testUpdateSubscription() {
+        assertTrue(clientMetricsManager.subscriptions().isEmpty());
+
+        long previousEpoch = 
clientMetricsManager.lastSubscriptionUpdateEpoch();
+        clientMetricsManager.updateSubscription("sub-1", 
ClientMetricsTestUtils.defaultProperties());
+
+        assertEquals(1, clientMetricsManager.subscriptions().size());
+        assertNotNull(clientMetricsManager.subscriptionInfo("sub-1"));
+
+        SubscriptionInfo subscriptionInfo = 
clientMetricsManager.subscriptionInfo("sub-1");
+        Set<String> metrics = subscriptionInfo.metrics();
+
+        // Validate metrics.
+        assertEquals(ClientMetricsTestUtils.DEFAULT_METRICS.split(",").length, 
metrics.size());
+        
Arrays.stream(ClientMetricsTestUtils.DEFAULT_METRICS.split(",")).forEach(metric 
->
+            assertTrue(metrics.contains(metric)));
+        // Validate push interval.
+        
assertEquals(ClientMetricsTestUtils.defaultProperties().getProperty(ClientMetricsConfigs.PUSH_INTERVAL_MS),
+            String.valueOf(subscriptionInfo.intervalMs()));
+
+        // Validate match patterns.
+        
assertEquals(ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.size(),
+            subscriptionInfo.matchPattern().size());
+        ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.forEach(pattern 
-> {
+            String[] split = pattern.split("=");
+            assertTrue(subscriptionInfo.matchPattern().containsKey(split[0]));
+            assertEquals(split[1], 
subscriptionInfo.matchPattern().get(split[0]).pattern());
+        });
+        assertTrue(clientMetricsManager.lastSubscriptionUpdateEpoch() > 
previousEpoch);
+    }
+
+    @Test
+    public void testUpdateSubscriptionWithEmptyProperties() {
+        assertTrue(clientMetricsManager.subscriptions().isEmpty());
+        long previousEpoch = 
clientMetricsManager.lastSubscriptionUpdateEpoch();
+        clientMetricsManager.updateSubscription("sub-1", new Properties());
+        // No subscription should be added as the properties are empty.
+        assertEquals(0, clientMetricsManager.subscriptions().size());
+        assertEquals(previousEpoch, 
clientMetricsManager.lastSubscriptionUpdateEpoch());
+    }
+
+    @Test
+    public void testUpdateSubscriptionWithNullProperties() {
+        assertTrue(clientMetricsManager.subscriptions().isEmpty());
+        long previousEpoch = 
clientMetricsManager.lastSubscriptionUpdateEpoch();
+        // Properties shouldn't be passed as null.
+        assertThrows(NullPointerException.class, () ->
+            clientMetricsManager.updateSubscription("sub-1", null));
+        assertEquals(0, clientMetricsManager.subscriptions().size());
+        assertEquals(previousEpoch, 
clientMetricsManager.lastSubscriptionUpdateEpoch());
+    }
+
+    @Test
+    public void testUpdateSubscriptionWithInvalidMetricsProperties() {
+        assertTrue(clientMetricsManager.subscriptions().isEmpty());
+
+        long previousEpoch = 
clientMetricsManager.lastSubscriptionUpdateEpoch();
+        Properties properties = new Properties();
+        properties.put("random", "random");
+        clientMetricsManager.updateSubscription("sub-1", properties);
+
+        // Undefined keys are ignored.
+        assertEquals(1, clientMetricsManager.subscriptions().size());
+        assertNotNull(clientMetricsManager.subscriptionInfo("sub-1"));
+        // Config def default properties.
+        
assertTrue(clientMetricsManager.subscriptionInfo("sub-1").metrics().isEmpty());
+        assertEquals(ClientMetricsConfigs.DEFAULT_INTERVAL_MS, 
clientMetricsManager.subscriptionInfo("sub-1").intervalMs());
+        
assertTrue(clientMetricsManager.subscriptionInfo("sub-1").matchPattern().isEmpty());
+        assertTrue(clientMetricsManager.lastSubscriptionUpdateEpoch() > 
previousEpoch);
+    }
+
+    @Test
+    public void testUpdateSubscriptionWithPropertiesDeletion() throws 
InterruptedException {
+        assertTrue(clientMetricsManager.subscriptions().isEmpty());
+
+        long previousEpoch = 
clientMetricsManager.lastSubscriptionUpdateEpoch();
+        Properties properties = new Properties();
+        properties.put("interval.ms", "100");
+        clientMetricsManager.updateSubscription("sub-1", properties);
+        assertEquals(1, clientMetricsManager.subscriptions().size());
+        assertNotNull(clientMetricsManager.subscriptionInfo("sub-1"));
+        assertTrue(clientMetricsManager.lastSubscriptionUpdateEpoch() > 
previousEpoch);
+
+        previousEpoch = clientMetricsManager.lastSubscriptionUpdateEpoch();
+        Thread.sleep(1);
+        clientMetricsManager.updateSubscription("sub-1", new Properties());
+        // Subscription should be removed as all properties are removed.
+        assertEquals(0, clientMetricsManager.subscriptions().size());
+        assertTrue(clientMetricsManager.lastSubscriptionUpdateEpoch() > 
previousEpoch);
+    }
+
+    @Test
+    public void testGetTelemetry() throws UnknownHostException {
+        clientMetricsManager.updateSubscription("sub-1", 
ClientMetricsTestUtils.defaultProperties());
+        assertEquals(1, clientMetricsManager.subscriptions().size());
+
+        GetTelemetrySubscriptionsRequest request = new 
GetTelemetrySubscriptionsRequest.Builder(
+            new GetTelemetrySubscriptionsRequestData()).build();
+
+        GetTelemetrySubscriptionsResponse response = 
clientMetricsManager.processGetTelemetrySubscriptionRequest(
+            request, 100, ClientMetricsTestUtils.requestContext(), 0);
+
+        assertNotNull(response.data().clientInstanceId());
+        assertTrue(response.data().subscriptionId() != 0);
+
+        assertEquals(ClientMetricsTestUtils.DEFAULT_METRICS.split(",").length, 
response.data().requestedMetrics().size());
+        
Arrays.stream(ClientMetricsTestUtils.DEFAULT_METRICS.split(",")).forEach(metric 
->
+            assertTrue(response.data().requestedMetrics().contains(metric)));
+
+        assertEquals(4, response.data().acceptedCompressionTypes().size());
+        // validate compression types order.
+        assertEquals(CompressionType.ZSTD.id, 
response.data().acceptedCompressionTypes().get(0));
+        assertEquals(CompressionType.LZ4.id, 
response.data().acceptedCompressionTypes().get(1));
+        assertEquals(CompressionType.GZIP.id, 
response.data().acceptedCompressionTypes().get(2));
+        assertEquals(CompressionType.SNAPPY.id, 
response.data().acceptedCompressionTypes().get(3));
+        assertEquals(ClientMetricsTestUtils.DEFAULT_PUSH_INTERVAL_MS, 
response.data().pushIntervalMs());
+        assertTrue(response.data().deltaTemporality());
+        assertEquals(100, response.data().telemetryMaxBytes());
+        assertEquals(Errors.NONE, response.error());
+
+        ClientMetricsInstance instance = 
clientMetricsManager.clientInstance(response.data().clientInstanceId());
+        assertNotNull(instance);
+        assertEquals(Errors.NONE, instance.lastKnownError());
+    }
+
+    @Test
+    public void testGetTelemetryWithoutSubscription() throws 
UnknownHostException {
+        assertTrue(clientMetricsManager.subscriptions().isEmpty());
+
+        GetTelemetrySubscriptionsRequest request = new 
GetTelemetrySubscriptionsRequest.Builder(
+            new GetTelemetrySubscriptionsRequestData()).build();
+
+        GetTelemetrySubscriptionsResponse response = 
clientMetricsManager.processGetTelemetrySubscriptionRequest(
+            request, 1024 * 1024, ClientMetricsTestUtils.requestContext(), 0);
+
+        assertNotNull(response.data().clientInstanceId());
+        assertTrue(response.data().subscriptionId() != 0);
+        assertTrue(response.data().requestedMetrics().isEmpty());
+        assertEquals(4, response.data().acceptedCompressionTypes().size());
+        assertEquals(ClientMetricsConfigs.DEFAULT_INTERVAL_MS, 
response.data().pushIntervalMs());
+        assertTrue(response.data().deltaTemporality());
+        assertEquals(1024 * 1024, response.data().telemetryMaxBytes());
+        assertEquals(Errors.NONE, response.error());
+
+        ClientMetricsInstance instance = 
clientMetricsManager.clientInstance(response.data().clientInstanceId());
+        assertNotNull(instance);
+        assertEquals(Errors.NONE, instance.lastKnownError());
+    }
+
+    @Test
+    public void testGetTelemetryAllMetricSubscribedSubscription() throws 
UnknownHostException {
+        clientMetricsManager.updateSubscription("sub-1", 
ClientMetricsTestUtils.defaultProperties());
+        Properties properties = new Properties();
+        properties.put("metrics", 
ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG);
+        clientMetricsManager.updateSubscription("sub-2", properties);
+
+        assertEquals(2, clientMetricsManager.subscriptions().size());
+
+        GetTelemetrySubscriptionsRequest request = new 
GetTelemetrySubscriptionsRequest.Builder(
+            new GetTelemetrySubscriptionsRequestData()).build();
+
+        GetTelemetrySubscriptionsResponse response = 
clientMetricsManager.processGetTelemetrySubscriptionRequest(
+            request, 100, ClientMetricsTestUtils.requestContext(), 0);
+
+        assertNotNull(response.data().clientInstanceId());
+        assertTrue(response.data().subscriptionId() != 0);
+
+        assertEquals(1, response.data().requestedMetrics().size());
+        
assertTrue(response.data().requestedMetrics().contains(ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS));
+
+        assertEquals(4, response.data().acceptedCompressionTypes().size());
+        assertEquals(ClientMetricsTestUtils.DEFAULT_PUSH_INTERVAL_MS, 
response.data().pushIntervalMs());
+        assertTrue(response.data().deltaTemporality());
+        assertEquals(100, response.data().telemetryMaxBytes());
+        assertEquals(Errors.NONE, response.error());
+
+        ClientMetricsInstance instance = 
clientMetricsManager.clientInstance(response.data().clientInstanceId());
+        assertNotNull(instance);
+        assertEquals(Errors.NONE, instance.lastKnownError());
+    }
+
+    @Test
+    public void testGetTelemetrySameClientImmediateRetryFail() throws 
UnknownHostException {
+        GetTelemetrySubscriptionsRequest request = new 
GetTelemetrySubscriptionsRequest.Builder(
+            new GetTelemetrySubscriptionsRequestData()).build();
+
+        GetTelemetrySubscriptionsResponse response = 
clientMetricsManager.processGetTelemetrySubscriptionRequest(
+            request, 100, ClientMetricsTestUtils.requestContext(), 0);
+
+        Uuid clientInstanceId = response.data().clientInstanceId();
+        assertNotNull(clientInstanceId);
+        assertEquals(Errors.NONE, response.error());
+
+        request = new GetTelemetrySubscriptionsRequest.Builder(
+            new 
GetTelemetrySubscriptionsRequestData().setClientInstanceId(clientInstanceId)).build();
+        response = clientMetricsManager.processGetTelemetrySubscriptionRequest(
+            request, 100, ClientMetricsTestUtils.requestContext(), 0);
+
+        assertEquals(Errors.THROTTLING_QUOTA_EXCEEDED, response.error());
+    }
+
+    @Test
+    public void testGetTelemetrySameClientImmediateRetryAfterPushFail() throws 
UnknownHostException {
+        GetTelemetrySubscriptionsRequest request = new 
GetTelemetrySubscriptionsRequest.Builder(
+            new GetTelemetrySubscriptionsRequestData()).build();
+
+        GetTelemetrySubscriptionsResponse response = 
clientMetricsManager.processGetTelemetrySubscriptionRequest(
+            request, 100, ClientMetricsTestUtils.requestContext(), 0);
+
+        Uuid clientInstanceId = response.data().clientInstanceId();
+        assertNotNull(clientInstanceId);
+        assertEquals(Errors.NONE, response.error());
+
+        // Create new client metrics manager which simulates a new server as 
it will not have any
+        // last request information but request should succeed as subscription 
id should match
+        // the one with new client instance.
+
+        ClientMetricsManager newClientMetricsManager = new 
ClientMetricsManager();
+
+        PushTelemetryRequest pushRequest = new Builder(
+            new PushTelemetryRequestData()
+                .setClientInstanceId(response.data().clientInstanceId())
+                .setSubscriptionId(response.data().subscriptionId())
+                .setCompressionType(CompressionType.NONE.id)
+                
.setMetrics("test-data".getBytes(StandardCharsets.UTF_8))).build();
+
+        PushTelemetryResponse pushResponse = 
newClientMetricsManager.processPushTelemetryRequest(
+            pushRequest, 100, ClientMetricsTestUtils.requestContext(), 0);
+
+        assertEquals(Errors.NONE, pushResponse.error());
+
+        request = new GetTelemetrySubscriptionsRequest.Builder(
+            new 
GetTelemetrySubscriptionsRequestData().setClientInstanceId(clientInstanceId)).build();
+
+        response = 
newClientMetricsManager.processGetTelemetrySubscriptionRequest(
+            request, 100, ClientMetricsTestUtils.requestContext(), 0);
+
+        assertEquals(Errors.THROTTLING_QUOTA_EXCEEDED, response.error());
+    }
+
+    @Test
+    public void testGetTelemetryUpdateSubscription() throws Exception {
+        clientMetricsManager.updateSubscription("sub-1", 
ClientMetricsTestUtils.defaultProperties());
+        assertEquals(1, clientMetricsManager.subscriptions().size());
+
+        GetTelemetrySubscriptionsRequest request = new 
GetTelemetrySubscriptionsRequest.Builder(
+            new GetTelemetrySubscriptionsRequestData()).build();
+
+        GetTelemetrySubscriptionsResponse response = 
clientMetricsManager.processGetTelemetrySubscriptionRequest(
+            request, 100, ClientMetricsTestUtils.requestContext(), 0);
+
+        Uuid clientInstanceId = response.data().clientInstanceId();
+        int subscriptionId = response.data().subscriptionId();
+        assertNotNull(clientInstanceId);
+        assertTrue(subscriptionId != 0);
+        assertEquals(Errors.NONE, response.error());
+
+        // Wait for subscription time to be greater from last client instance 
time.
+        Thread.sleep(10);
+
+        // Update subscription
+        Properties properties = new Properties();
+        properties.put("metrics", 
ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG);
+        clientMetricsManager.updateSubscription("sub-2", properties);
+        assertEquals(2, clientMetricsManager.subscriptions().size());
+
+        request = new GetTelemetrySubscriptionsRequest.Builder(
+            new 
GetTelemetrySubscriptionsRequestData().setClientInstanceId(clientInstanceId)).build();
+        response = clientMetricsManager.processGetTelemetrySubscriptionRequest(
+            request, 100, ClientMetricsTestUtils.requestContext(), 0);
+
+        // No throttle error as the subscription has changed.
+        assertEquals(Errors.NONE, response.error());
+        // Subscription id updated in next request
+        assertTrue(subscriptionId != response.data().subscriptionId());
+    }
+
+    @Test
+    public void testGetTelemetryConcurrentRequestNewClientInstance() throws 
InterruptedException {
+        GetTelemetrySubscriptionsRequest request = new 
GetTelemetrySubscriptionsRequest.Builder(
+            new 
GetTelemetrySubscriptionsRequestData().setClientInstanceId(Uuid.randomUuid())).build();
+
+        CountDownLatch lock = new CountDownLatch(2);
+        List<GetTelemetrySubscriptionsResponse> responses = 
Collections.synchronizedList(new ArrayList<>());
+
+        Thread thread = new Thread(() -> {
+            try {
+                GetTelemetrySubscriptionsResponse response = 
clientMetricsManager.processGetTelemetrySubscriptionRequest(
+                    request, 100, ClientMetricsTestUtils.requestContext(), 0);
+
+                responses.add(response);
+                lock.countDown();
+            } catch (UnknownHostException e) {
+                e.printStackTrace();

Review Comment:
   We should use logging instead of print. Ditto in other places.



##########
clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java:
##########
@@ -71,6 +73,31 @@ public PushTelemetryRequestData data() {
         return data;
     }
 
+    public PushTelemetryResponse createResponse(int throttleTimeMs, Errors 
errors) {
+        PushTelemetryResponseData responseData = new 
PushTelemetryResponseData();
+        responseData.setErrorCode(errors.code());
+        responseData.setThrottleTimeMs(throttleTimeMs);

Review Comment:
   The following is my understanding. There are two types of throttling.
   
   1. Generic request throttling based on % of CPU a client uses on the broker. 
This applies to any request, including `PushTelemetry`. In this case, the error 
code is none and the `throttleTimeMs` field is set. The client will mute the 
channel for `throttleTimeMs` before sending future requests.
   2. `PushTelemetry` specific throttling because PushTelemetry is sent too 
frequently. In this case, we should set THROTTLING_QUOTA_EXCEEDED as the error 
code and avoid setting the `throttleTimeMs` field since we don't want to the 
client to mute the channel for all requests.



##########
core/src/main/java/kafka/metrics/ClientMetricsConfigs.java:
##########
@@ -80,6 +82,11 @@ public class ClientMetricsConfigs {
     public static final String CLIENT_SOURCE_ADDRESS = "client_source_address";
     public static final String CLIENT_SOURCE_PORT = "client_source_port";
 
+    // Empty string in client-metrics resource configs indicates that all the 
metrics are subscribed.
+    public static final String ALL_SUBSCRIBED_METRICS_CONFIG = "\"\"";

Review Comment:
   Hmm, this can be confusing. Would it be better to change the KIP to use sth 
like `*` to represent all metrics? cc @AndrewJSchofield 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to