This is an automated email from the ASF dual-hosted git repository.
mittal pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push:
new 4c2d86f4114 MINOR: Fixing client telemetry validate request (#19959)
4c2d86f4114 is described below
commit 4c2d86f411440aeaebcf6c1755bdeb690dfdb35a
Author: Apoorv Mittal <[email protected]>
AuthorDate: Thu Jun 12 22:52:50 2025 +0100
MINOR: Fixing client telemetry validate request (#19959)
Minor fix to correct the validate condition for GetTelemetryRequests.
Added respective tests as well.
Reviewers: Andrew Schofield <[email protected]>
---
.../apache/kafka/server/ClientMetricsManager.java | 2 +-
.../kafka/server/ClientMetricsManagerTest.java | 110 ++++++++++++++++++++-
2 files changed, 110 insertions(+), 2 deletions(-)
diff --git
a/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
index b143466574c..d7343098b4e 100644
--- a/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
+++ b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
@@ -391,7 +391,7 @@ public class ClientMetricsManager implements AutoCloseable {
ClientMetricsInstance clientInstance, long timestamp) {
if (!clientInstance.maybeUpdateGetRequestTimestamp(timestamp) &&
(clientInstance.lastKnownError() != Errors.UNKNOWN_SUBSCRIPTION_ID
- || clientInstance.lastKnownError() !=
Errors.UNSUPPORTED_COMPRESSION_TYPE)) {
+ && clientInstance.lastKnownError() !=
Errors.UNSUPPORTED_COMPRESSION_TYPE)) {
clientMetricsStats.recordThrottleCount(clientInstance.clientInstanceId());
String msg = String.format("Request from the client [%s] arrived
before the next push interval time",
request.data().clientInstanceId());
diff --git
a/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java
b/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java
index 3b0bd181b91..c5b13eefbe4 100644
--- a/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java
+++ b/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java
@@ -826,7 +826,6 @@ public class ClientMetricsManagerTest {
assertEquals((double) 0,
getMetric(ClientMetricsManager.ClientMetricsStats.PLUGIN_ERROR +
"-count").metricValue());
assertEquals(Double.NaN,
getMetric(ClientMetricsManager.ClientMetricsStats.PLUGIN_EXPORT_TIME +
"-avg").metricValue());
assertEquals(Double.NaN,
getMetric(ClientMetricsManager.ClientMetricsStats.PLUGIN_EXPORT_TIME +
"-max").metricValue());
-
}
@Test
@@ -1124,6 +1123,115 @@ public class ClientMetricsManagerTest {
}
}
+ @Test
+ public void
testGetTelemetrySubscriptionAfterPushTelemetryUnknownSubscriptionSucceeds()
throws Exception {
+ clientMetricsManager.updateSubscription("sub-1",
ClientMetricsTestUtils.defaultTestProperties());
+ assertEquals(1, clientMetricsManager.subscriptions().size());
+
+ GetTelemetrySubscriptionsRequest subscriptionsRequest = new
GetTelemetrySubscriptionsRequest.Builder(
+ new GetTelemetrySubscriptionsRequestData(), true).build();
+
+ GetTelemetrySubscriptionsResponse subscriptionsResponse =
clientMetricsManager.processGetTelemetrySubscriptionRequest(
+ subscriptionsRequest, ClientMetricsTestUtils.requestContext());
+
+ Properties properties = new Properties();
+ properties.put("interval.ms", "100");
+ clientMetricsManager.updateSubscription("sub-2", properties);
+ assertEquals(2, clientMetricsManager.subscriptions().size());
+
+ PushTelemetryRequest request = new Builder(
+ new PushTelemetryRequestData()
+
.setClientInstanceId(subscriptionsResponse.data().clientInstanceId())
+
.setSubscriptionId(subscriptionsResponse.data().subscriptionId())
+ .setCompressionType(CompressionType.NONE.id)
+
.setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))),
true).build();
+
+ PushTelemetryResponse response =
clientMetricsManager.processPushTelemetryRequest(
+ request, ClientMetricsTestUtils.requestContext());
+
+ assertEquals(Errors.UNKNOWN_SUBSCRIPTION_ID, response.error());
+ ClientMetricsInstance instance =
clientMetricsManager.clientInstance(subscriptionsResponse.data().clientInstanceId());
+ assertNotNull(instance);
+ assertEquals(Errors.UNKNOWN_SUBSCRIPTION_ID,
instance.lastKnownError());
+
+ subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder(
+ new
GetTelemetrySubscriptionsRequestData().setClientInstanceId(subscriptionsResponse.data().clientInstanceId()),
true).build();
+ subscriptionsResponse =
clientMetricsManager.processGetTelemetrySubscriptionRequest(
+ subscriptionsRequest, ClientMetricsTestUtils.requestContext());
+ assertEquals(Errors.NONE, subscriptionsResponse.error());
+ }
+
+ @Test
+ public void
testGetTelemetrySubscriptionAfterPushTelemetryUnknownCompressionSucceeds()
throws Exception {
+ clientMetricsManager.updateSubscription("sub-1",
ClientMetricsTestUtils.defaultTestProperties());
+ assertEquals(1, clientMetricsManager.subscriptions().size());
+
+ GetTelemetrySubscriptionsRequest subscriptionsRequest = new
GetTelemetrySubscriptionsRequest.Builder(
+ new GetTelemetrySubscriptionsRequestData(), true).build();
+
+ GetTelemetrySubscriptionsResponse subscriptionsResponse =
clientMetricsManager.processGetTelemetrySubscriptionRequest(
+ subscriptionsRequest, ClientMetricsTestUtils.requestContext());
+
+ PushTelemetryRequest request = new Builder(
+ new PushTelemetryRequestData()
+
.setClientInstanceId(subscriptionsResponse.data().clientInstanceId())
+
.setSubscriptionId(subscriptionsResponse.data().subscriptionId())
+ .setCompressionType((byte) 10) // // Invalid compression type
+
.setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))),
true).build();
+
+ PushTelemetryResponse response =
clientMetricsManager.processPushTelemetryRequest(
+ request, ClientMetricsTestUtils.requestContext());
+
+ assertEquals(Errors.UNSUPPORTED_COMPRESSION_TYPE, response.error());
+ ClientMetricsInstance instance =
clientMetricsManager.clientInstance(subscriptionsResponse.data().clientInstanceId());
+ assertNotNull(instance);
+ assertEquals(Errors.UNSUPPORTED_COMPRESSION_TYPE,
instance.lastKnownError());
+
+ subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder(
+ new
GetTelemetrySubscriptionsRequestData().setClientInstanceId(subscriptionsResponse.data().clientInstanceId()),
true).build();
+ subscriptionsResponse =
clientMetricsManager.processGetTelemetrySubscriptionRequest(
+ subscriptionsRequest, ClientMetricsTestUtils.requestContext());
+ assertEquals(Errors.NONE, subscriptionsResponse.error());
+ }
+
+ @Test
+ public void
testGetTelemetrySubscriptionAfterPushTelemetryBytesExceptionFails() throws
Exception {
+ try (
+ Metrics kafkaMetrics = new Metrics();
+ ClientMetricsManager clientMetricsManager = new
ClientMetricsManager(clientMetricsReceiverPlugin, 1, time, kafkaMetrics)
+ ) {
+ GetTelemetrySubscriptionsRequest subscriptionsRequest = new
GetTelemetrySubscriptionsRequest.Builder(
+ new GetTelemetrySubscriptionsRequestData(), true).build();
+
+ GetTelemetrySubscriptionsResponse subscriptionsResponse =
clientMetricsManager.processGetTelemetrySubscriptionRequest(
+ subscriptionsRequest, ClientMetricsTestUtils.requestContext());
+
+ byte[] metrics = "ab".getBytes(StandardCharsets.UTF_8);
+ assertEquals(2, metrics.length);
+
+ PushTelemetryRequest request = new PushTelemetryRequest.Builder(
+ new PushTelemetryRequestData()
+
.setClientInstanceId(subscriptionsResponse.data().clientInstanceId())
+
.setSubscriptionId(subscriptionsResponse.data().subscriptionId())
+ .setMetrics(ByteBuffer.wrap(metrics)), true).build();
+
+ // Set the max bytes 1 to force the error.
+ PushTelemetryResponse response =
clientMetricsManager.processPushTelemetryRequest(
+ request, ClientMetricsTestUtils.requestContext());
+
+ assertEquals(Errors.TELEMETRY_TOO_LARGE, response.error());
+ ClientMetricsInstance instance =
clientMetricsManager.clientInstance(subscriptionsResponse.data().clientInstanceId());
+ assertNotNull(instance);
+ assertEquals(Errors.TELEMETRY_TOO_LARGE,
instance.lastKnownError());
+
+ subscriptionsRequest = new
GetTelemetrySubscriptionsRequest.Builder(
+ new
GetTelemetrySubscriptionsRequestData().setClientInstanceId(subscriptionsResponse.data().clientInstanceId()),
true).build();
+ subscriptionsResponse =
clientMetricsManager.processGetTelemetrySubscriptionRequest(
+ subscriptionsRequest, ClientMetricsTestUtils.requestContext());
+ assertEquals(Errors.THROTTLING_QUOTA_EXCEEDED,
subscriptionsResponse.error());
+ }
+ }
+
@Test
public void testCacheEviction() throws Exception {
Properties properties = new Properties();