junrao commented on code in PR #15234: URL: https://github.com/apache/kafka/pull/15234#discussion_r1462316337
########## server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsInstance.java: ########## @@ -120,4 +127,16 @@ public synchronized boolean maybeUpdatePushRequestTimestamp(long currentTime) { } return canAccept; } + + public synchronized void cancelExpirationTimerTask() { + if (this.expirationTimerTask != null) { Review Comment: Let's be consistent with the usage of `this`. We don't use `this` else where. Ditto below. ########## server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java: ########## @@ -919,4 +922,102 @@ public void testPushTelemetryConcurrentRequestAfterSubscriptionUpdate() throws U // 1 request should fail with throttling error. assertEquals(1, throttlingErrorCount); } + + @Test + public void testCacheEviction() throws UnknownHostException, InterruptedException { + Properties properties = new Properties(); + properties.put("metrics", ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG); + properties.put(ClientMetricsConfigs.PUSH_INTERVAL_MS, "100"); + clientMetricsManager.updateSubscription("sub-1", properties); + + GetTelemetrySubscriptionsRequest request = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData(), true).build(); + + GetTelemetrySubscriptionsResponse response = clientMetricsManager.processGetTelemetrySubscriptionRequest( + request, ClientMetricsTestUtils.requestContext()); + assertEquals(Errors.NONE, response.error()); + + assertNotNull(clientMetricsManager.clientInstance(response.data().clientInstanceId())); + assertEquals(1, clientMetricsManager.expirationTimer().size()); + // Cache expiry should occur after 100 * 3 = 300 ms, wait for at most 600 ms for the eviction + // to happen as eviction timer is scheduled in different thread. + assertTimeoutPreemptively(Duration.ofMillis(600), () -> { + // Validate that cache eviction happens and client instance is removed from cache. + while (clientMetricsManager.expirationTimer().size() != 0 || Review Comment: The is a busy loop and is not CPU friendly. Could we use sth like `TestUtils.waitUntilTrue` instead? TestUtils is in `core`. We could probably move it to `server-common`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org