junrao commented on code in PR #15234:
URL: https://github.com/apache/kafka/pull/15234#discussion_r1462316337


##########
server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsInstance.java:
##########
@@ -120,4 +127,16 @@ public synchronized boolean 
maybeUpdatePushRequestTimestamp(long currentTime) {
         }
         return canAccept;
     }
+
+    public synchronized void cancelExpirationTimerTask() {
+        if (this.expirationTimerTask != null) {

Review Comment:
   Let's be consistent with the usage of `this`.  We don't use `this` else 
where. Ditto below.



##########
server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java:
##########
@@ -919,4 +922,102 @@ public void 
testPushTelemetryConcurrentRequestAfterSubscriptionUpdate() throws U
         // 1 request should fail with throttling error.
         assertEquals(1, throttlingErrorCount);
     }
+
+    @Test
+    public void testCacheEviction() throws UnknownHostException, 
InterruptedException {
+        Properties properties = new Properties();
+        properties.put("metrics", 
ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG);
+        properties.put(ClientMetricsConfigs.PUSH_INTERVAL_MS, "100");
+        clientMetricsManager.updateSubscription("sub-1", properties);
+
+        GetTelemetrySubscriptionsRequest request = new 
GetTelemetrySubscriptionsRequest.Builder(
+            new GetTelemetrySubscriptionsRequestData(), true).build();
+
+        GetTelemetrySubscriptionsResponse response = 
clientMetricsManager.processGetTelemetrySubscriptionRequest(
+            request, ClientMetricsTestUtils.requestContext());
+        assertEquals(Errors.NONE, response.error());
+
+        
assertNotNull(clientMetricsManager.clientInstance(response.data().clientInstanceId()));
+        assertEquals(1, clientMetricsManager.expirationTimer().size());
+        // Cache expiry should occur after 100 * 3 = 300 ms, wait for at most 
600 ms for the eviction
+        // to happen as eviction timer is scheduled in different thread.
+        assertTimeoutPreemptively(Duration.ofMillis(600), () -> {
+            // Validate that cache eviction happens and client instance is 
removed from cache.
+            while (clientMetricsManager.expirationTimer().size() != 0 ||

Review Comment:
   The is a busy loop and is not CPU friendly. Could we use sth like 
`TestUtils.waitUntilTrue` instead? TestUtils is in `core`. We could probably 
move it to `server-common`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to