This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b4bf0bf693f KAFKA-19506 Implement dynamic compression type selection
and fallback for client telemetry (#20144)
b4bf0bf693f is described below
commit b4bf0bf693f98d21312c4a3971911d5a60f67bd6
Author: Kaushik Raina <[email protected]>
AuthorDate: Sat Aug 23 22:49:19 2025 +0530
KAFKA-19506 Implement dynamic compression type selection and fallback for
client telemetry (#20144)
#### Summary
This PR implements dynamic compression type selection and fallback
mechanism for client telemetry to handle cases where compression
libraries are not available on the client classpath.
#### Problem
Currently, when a compression library is missing (e.g.,
NoClassDefFoundError), the client telemetry system catches the generic
Throwable but doesn't learn from the failure. This means, the same
unsupported compression type will be attempted on every telemetry push
#### Solution
This PR introduces a comprehensive fallback mechanism:
- Specific Exception Handling: Replace generic Throwable catching with
specific exceptions (IOException, NoClassDefFoundError)
- Unsupported Compression Tracking: Add unsupportedCompressionTypes
collection to track compression types that have failed due to missing
libraries
- Dynamic Selection: Enhance
ClientTelemetryUtils.preferredCompressionType() to accept an unsupported
types parameter and filter out known problematic compression types
- Thread Safety: Use ConcurrentHashMap.newKeySet() for thread-safe
access to the unsupported types collection
- Improved Logging: Include exception details in log messages for better
debugging
#### Key Changes
- Modified createPushRequest() to track failed compression types in
unsupportedCompressionTypes
- Updated ClientTelemetryUtils.preferredCompressionType() to filter out
unsupported types
- Enhanced exception handling with specific exception types instead of
Throwable
#### Testing
- Added appropriate Unit tests
- Testing apache kafka on local logs:
```
✗ cat ~/Desktop/kafka-client.log | grep "
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter"
2025-07-17 07:56:52:602 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry subscription request with client instance id
AAAAAAAAAAAAAAAAAAAAAA
2025-07-17 07:56:52:602 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from SUBSCRIPTION_NEEDED to
SUBSCRIPTION_IN_PROGRESS
2025-07-17 07:56:52:640 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from SUBSCRIPTION_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:56:52:640 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Telemetry subscription push interval value from broker was 5000; to
stagger requests the first push interval is being adjusted to 4551
2025-07-17 07:56:52:640 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Updating subscription - subscription:
ClientTelemetrySubscription{clientInstanceId=aVd3fzviRGSgEuAWNY5mMA,
subscriptionId=1650084878, pushIntervalMs=5000,
acceptedCompressionTypes=[zstd, lz4, snappy, none],
deltaTemporality=true,
selector=org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils$$Lambda$308/0x00000005011ce470@2f16e398};
intervalMs: 4551, lastRequestMs: 1752739012639
2025-07-17 07:56:52:640 [kafka-producer-network-thread |
kr-kafka-producer] INFO
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Client telemetry registered with client instance id:
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:56:57:196 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:56:57:196 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:56:57:224 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Compression library zstd not found, sending uncompressed data
at
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:722)
at
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:703)
at
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createRequest(ClientTelemetryReporter.java:389)
2025-07-17 07:56:57:295 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:02:296 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:02:297 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:02:300 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Compression library lz4 not found, sending uncompressed data
at
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:722)
at
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:703)
at
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createRequest(ClientTelemetryReporter.java:389)
2025-07-17 07:57:02:329 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:07:329 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:07:330 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:07:331 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Compression library snappy not found, sending uncompressed data
at
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:722)
at
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:703)
at
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createRequest(ClientTelemetryReporter.java:389)
2025-07-17 07:57:07:344 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:12:346 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:12:346 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:12:400 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:17:402 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:17:402 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:17:442 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:22:442 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:22:442 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:22:508 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:27:512 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:27:512 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:27:555 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:32:555 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:32:555 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:32:578 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:37:580 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:37:580 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:37:606 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:42:606 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:42:606 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:42:646 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:47:647 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:47:647 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:47:673 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:52:673 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:52:673 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:52:711 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
2025-07-17 07:57:57:711 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Creating telemetry push request with client instance id
aVd3fzviRGSgEuAWNY5mMA
2025-07-17 07:57:57:711 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS
2025-07-17 07:57:57:765 [kafka-producer-network-thread |
kr-kafka-producer] DEBUG
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED
```
Reviewers: poorv Mittal <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../internals/ClientTelemetryReporter.java | 20 +++-
.../telemetry/internals/ClientTelemetryUtils.java | 25 ++--
.../internals/ClientTelemetryReporterTest.java | 131 +++++++++++++++++++++
.../internals/ClientTelemetryUtilsTest.java | 43 ++++---
4 files changed, 195 insertions(+), 24 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
index e0491943fef..bef65977be4 100644
---
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
+++
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
@@ -50,6 +50,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.StringJoiner;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
@@ -269,6 +270,7 @@ public class ClientTelemetryReporter implements
MetricsReporter {
private static final double INITIAL_PUSH_JITTER_LOWER = 0.5;
private static final double INITIAL_PUSH_JITTER_UPPER = 1.5;
+ private final Set<CompressionType> unsupportedCompressionTypes =
ConcurrentHashMap.newKeySet();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Condition subscriptionLoaded =
lock.writeLock().newCondition();
/*
@@ -713,12 +715,26 @@ public class ClientTelemetryReporter implements
MetricsReporter {
return Optional.empty();
}
- CompressionType compressionType =
ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes());
+ CompressionType compressionType =
ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes(),
unsupportedCompressionTypes);
ByteBuffer compressedPayload;
try {
compressedPayload = ClientTelemetryUtils.compress(payload,
compressionType);
} catch (Throwable e) {
- log.debug("Failed to compress telemetry payload for
compression: {}, sending uncompressed data", compressionType);
+ // Distinguish between recoverable errors
(NoClassDefFoundError for missing compression libs)
+ // and fatal errors (OutOfMemoryError, etc.) that should
terminate telemetry.
+ if (e instanceof Error && !(e instanceof NoClassDefFoundError)
&& !(e.getCause() instanceof NoClassDefFoundError)) {
+ lock.writeLock().lock();
+ try {
+ state = ClientTelemetryState.TERMINATED;
+ } finally {
+ lock.writeLock().unlock();
+ }
+ log.error("Unexpected error occurred while compressing
telemetry payload for compression: {}, stopping client telemetry",
compressionType, e);
+ throw new KafkaException("Unexpected compression error",
e);
+ }
+
+ log.debug("Failed to compress telemetry payload for
compression: {}, sending uncompressed data", compressionType, e);
+ unsupportedCompressionTypes.add(compressionType);
compressedPayload = ByteBuffer.wrap(payload.toByteArray());
compressionType = CompressionType.NONE;
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java
index 3c555afb3b0..111b041946c 100644
---
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java
+++
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java
@@ -39,6 +39,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.function.Predicate;
import io.opentelemetry.proto.metrics.v1.MetricsData;
@@ -181,13 +182,23 @@ public class ClientTelemetryUtils {
return validateResourceLabel(metadata, MetricsContext.NAMESPACE);
}
- public static CompressionType
preferredCompressionType(List<CompressionType> acceptedCompressionTypes) {
- if (acceptedCompressionTypes != null &&
!acceptedCompressionTypes.isEmpty()) {
- // Broker is providing the compression types in order of
preference. Grab the
- // first one.
- return acceptedCompressionTypes.get(0);
- }
- return CompressionType.NONE;
+ /**
+ * Determines the preferred compression type from broker-accepted types,
avoiding unsupported ones.
+ *
+ * @param acceptedCompressionTypes the list of compression types accepted
by the broker in order
+ * of preference (must not be null, use
empty list if no compression is accepted)
+ * @param unsupportedCompressionTypes the set of compression types that
should be avoided due to
+ * missing libraries or previous
failures (must not be null)
+ * @return the preferred compression type to use, or {@link
CompressionType#NONE} if no acceptable
+ * compression type is available
+ */
+ public static CompressionType
preferredCompressionType(List<CompressionType> acceptedCompressionTypes,
Set<CompressionType> unsupportedCompressionTypes) {
+ // Broker is providing the compression types in order of preference.
Grab the
+ // first one that's supported.
+ return acceptedCompressionTypes.stream()
+ .filter(t -> !unsupportedCompressionTypes.contains(t))
+ .findFirst()
+ .orElse(CompressionType.NONE);
}
public static ByteBuffer compress(MetricsData metrics, CompressionType
compressionType) throws IOException {
diff --git
a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
index b708b4eeb60..c06e853b073 100644
---
a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.telemetry.internals;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
@@ -63,8 +64,10 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
public class ClientTelemetryReporterTest {
@@ -413,6 +416,134 @@ public class ClientTelemetryReporterTest {
}
}
+ @Test
+ public void testCreateRequestPushCompressionFallbackToNextType() {
+ clientTelemetryReporter.configure(configs);
+ clientTelemetryReporter.contextChange(metricsContext);
+
+ ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender =
(ClientTelemetryReporter.DefaultClientTelemetrySender)
clientTelemetryReporter.telemetrySender();
+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
+
+ // Set up subscription with multiple compression types: GZIP -> LZ4 ->
SNAPPY
+ ClientTelemetryReporter.ClientTelemetrySubscription subscription = new
ClientTelemetryReporter.ClientTelemetrySubscription(
+ uuid, 1234, 20000, List.of(CompressionType.GZIP,
CompressionType.LZ4, CompressionType.SNAPPY), true, null);
+ telemetrySender.updateSubscriptionResult(subscription,
time.milliseconds());
+
+ try (MockedStatic<ClientTelemetryUtils> mockedCompress =
Mockito.mockStatic(ClientTelemetryUtils.class, new CallsRealMethods())) {
+ // First request: GZIP fails with NoClassDefFoundError, should use
NONE for this request
+ mockedCompress.when(() -> ClientTelemetryUtils.compress(any(),
eq(CompressionType.GZIP))).thenThrow(new NoClassDefFoundError("GZIP not
available"));
+
+ Optional<AbstractRequest.Builder<?>> requestOptional =
telemetrySender.createRequest();
+ assertNotNull(requestOptional);
+ assertTrue(requestOptional.isPresent());
+ assertInstanceOf(PushTelemetryRequest.class,
requestOptional.get().build());
+ PushTelemetryRequest request = (PushTelemetryRequest)
requestOptional.get().build();
+
+ // Should fallback to NONE for this request (GZIP gets cached as
unsupported)
+ assertEquals(CompressionType.NONE.id,
request.data().compressionType());
+ assertEquals(ClientTelemetryState.PUSH_IN_PROGRESS,
telemetrySender.state());
+
+ // Reset state for next request
+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
+
+ // Second request: LZ4 is selected (since GZIP is now cached as
unsupported), LZ4 fails, should use NONE
+ // Note that some libraries eg. LZ4 return KafkaException with
cause as NoClassDefFoundError
+ mockedCompress.when(() -> ClientTelemetryUtils.compress(any(),
eq(CompressionType.LZ4))).thenThrow(new KafkaException(new
NoClassDefFoundError("LZ4 not available")));
+
+ requestOptional = telemetrySender.createRequest();
+ assertNotNull(requestOptional);
+ assertTrue(requestOptional.isPresent());
+ assertInstanceOf(PushTelemetryRequest.class,
requestOptional.get().build());
+ request = (PushTelemetryRequest) requestOptional.get().build();
+
+ // Should fallback to NONE for this request (LZ4 gets cached as
unsupported)
+ assertEquals(CompressionType.NONE.id,
request.data().compressionType());
+ assertEquals(ClientTelemetryState.PUSH_IN_PROGRESS,
telemetrySender.state());
+
+ // Reset state for next request
+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
+
+ // Third request: SNAPPY is selected (since GZIP and LZ4 are now
cached as unsupported), SNAPPY fails, should use NONE
+ mockedCompress.when(() -> ClientTelemetryUtils.compress(any(),
eq(CompressionType.SNAPPY))).thenThrow(new NoClassDefFoundError("SNAPPY not
available"));
+
+ requestOptional = telemetrySender.createRequest();
+ assertNotNull(requestOptional);
+ assertTrue(requestOptional.isPresent());
+ assertInstanceOf(PushTelemetryRequest.class,
requestOptional.get().build());
+ request = (PushTelemetryRequest) requestOptional.get().build();
+
+ // Should fallback to NONE for this request (SNAPPY gets cached as
unsupported)
+ assertEquals(CompressionType.NONE.id,
request.data().compressionType());
+ assertEquals(ClientTelemetryState.PUSH_IN_PROGRESS,
telemetrySender.state());
+
+ // Reset state for next request
+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
+
+ // Fourth request: All compression types are now cached as
unsupported, should use NONE directly
+ requestOptional = telemetrySender.createRequest();
+ assertNotNull(requestOptional);
+ assertTrue(requestOptional.isPresent());
+ assertInstanceOf(PushTelemetryRequest.class,
requestOptional.get().build());
+ request = (PushTelemetryRequest) requestOptional.get().build();
+
+ // Should use NONE directly (no compression types are supported)
+ assertEquals(CompressionType.NONE.id,
request.data().compressionType());
+ assertEquals(ClientTelemetryState.PUSH_IN_PROGRESS,
telemetrySender.state());
+ }
+ }
+
+ @Test
+ public void testCreateRequestPushCompressionFallbackAndTermination() {
+ clientTelemetryReporter.configure(configs);
+ clientTelemetryReporter.contextChange(metricsContext);
+
+ ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender =
(ClientTelemetryReporter.DefaultClientTelemetrySender)
clientTelemetryReporter.telemetrySender();
+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
+
+ // Set up subscription with ZSTD compression type
+ ClientTelemetryReporter.ClientTelemetrySubscription subscription = new
ClientTelemetryReporter.ClientTelemetrySubscription(
+ uuid, 1234, 20000, List.of(CompressionType.ZSTD,
CompressionType.LZ4), true, null);
+ telemetrySender.updateSubscriptionResult(subscription,
time.milliseconds());
+
+ try (MockedStatic<ClientTelemetryUtils> mockedCompress =
Mockito.mockStatic(ClientTelemetryUtils.class, new CallsRealMethods())) {
+
+ // === Test 1: NoClassDefFoundError fallback (recoverable) ===
+ mockedCompress.when(() -> ClientTelemetryUtils.compress(any(),
eq(CompressionType.ZSTD)))
+ .thenThrow(new
NoClassDefFoundError("com/github/luben/zstd/BufferPool"));
+
+ assertEquals(ClientTelemetryState.PUSH_NEEDED,
telemetrySender.state());
+
+ Optional<AbstractRequest.Builder<?>> request1 =
telemetrySender.createRequest();
+ assertNotNull(request1);
+ assertTrue(request1.isPresent());
+ assertInstanceOf(PushTelemetryRequest.class,
request1.get().build());
+ PushTelemetryRequest pushRequest1 = (PushTelemetryRequest)
request1.get().build();
+ assertEquals(CompressionType.NONE.id,
pushRequest1.data().compressionType()); // Fallback to NONE
+ assertEquals(ClientTelemetryState.PUSH_IN_PROGRESS,
telemetrySender.state());
+
+ // Reset state (simulate successful response handling)
+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
+
+ // === Test 2: OutOfMemoryError causes termination
(non-recoverable Error) ===
+ mockedCompress.reset();
+ mockedCompress.when(() -> ClientTelemetryUtils.compress(any(),
eq(CompressionType.LZ4)))
+ .thenThrow(new OutOfMemoryError("Out of memory during
compression"));
+
+ assertEquals(ClientTelemetryState.PUSH_NEEDED,
telemetrySender.state());
+
+ assertThrows(KafkaException.class, () ->
telemetrySender.createRequest());
+ assertEquals(ClientTelemetryState.TERMINATED,
telemetrySender.state());
+
+ // === Test 3: After termination, no more requests ===
+ Optional<AbstractRequest.Builder<?>> request3 =
telemetrySender.createRequest();
+ assertNotNull(request3);
+ assertFalse(request3.isPresent()); // No request created
+ assertEquals(ClientTelemetryState.TERMINATED,
telemetrySender.state()); // State remains TERMINATED
+ }
+ }
+
@Test
public void testHandleResponseGetSubscriptions() {
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender =
(ClientTelemetryReporter.DefaultClientTelemetrySender)
clientTelemetryReporter.telemetrySender();
diff --git
a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java
b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java
index 41679bed3f7..47925ff8e0a 100644
---
a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java
@@ -30,10 +30,9 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
import java.util.function.Predicate;
import io.opentelemetry.proto.metrics.v1.Metric;
@@ -69,12 +68,12 @@ public class ClientTelemetryUtilsTest {
@Test
public void testGetSelectorFromRequestedMetrics() {
// no metrics selector
- assertEquals(ClientTelemetryUtils.SELECTOR_NO_METRICS,
ClientTelemetryUtils.getSelectorFromRequestedMetrics(Collections.emptyList()));
+ assertEquals(ClientTelemetryUtils.SELECTOR_NO_METRICS,
ClientTelemetryUtils.getSelectorFromRequestedMetrics(List.of()));
assertEquals(ClientTelemetryUtils.SELECTOR_NO_METRICS,
ClientTelemetryUtils.getSelectorFromRequestedMetrics(null));
// all metrics selector
- assertEquals(ClientTelemetryUtils.SELECTOR_ALL_METRICS,
ClientTelemetryUtils.getSelectorFromRequestedMetrics(Collections.singletonList("*")));
+ assertEquals(ClientTelemetryUtils.SELECTOR_ALL_METRICS,
ClientTelemetryUtils.getSelectorFromRequestedMetrics(List.of("*")));
// specific metrics selector
- Predicate<? super MetricKeyable> selector =
ClientTelemetryUtils.getSelectorFromRequestedMetrics(Arrays.asList("metric1",
"metric2"));
+ Predicate<? super MetricKeyable> selector =
ClientTelemetryUtils.getSelectorFromRequestedMetrics(List.of("metric1",
"metric2"));
assertNotEquals(ClientTelemetryUtils.SELECTOR_NO_METRICS, selector);
assertNotEquals(ClientTelemetryUtils.SELECTOR_ALL_METRICS, selector);
assertTrue(selector.test(new MetricKey("metric1.test")));
@@ -86,7 +85,7 @@ public class ClientTelemetryUtilsTest {
@Test
public void testGetCompressionTypesFromAcceptedList() {
assertEquals(0,
ClientTelemetryUtils.getCompressionTypesFromAcceptedList(null).size());
- assertEquals(0,
ClientTelemetryUtils.getCompressionTypesFromAcceptedList(Collections.emptyList()).size());
+ assertEquals(0,
ClientTelemetryUtils.getCompressionTypesFromAcceptedList(List.of()).size());
List<Byte> compressionTypes = new ArrayList<>();
compressionTypes.add(CompressionType.GZIP.id);
@@ -123,10 +122,24 @@ public class ClientTelemetryUtilsTest {
@Test
public void testPreferredCompressionType() {
- assertEquals(CompressionType.NONE,
ClientTelemetryUtils.preferredCompressionType(Collections.emptyList()));
- assertEquals(CompressionType.NONE,
ClientTelemetryUtils.preferredCompressionType(null));
- assertEquals(CompressionType.NONE,
ClientTelemetryUtils.preferredCompressionType(Arrays.asList(CompressionType.NONE,
CompressionType.GZIP)));
- assertEquals(CompressionType.GZIP,
ClientTelemetryUtils.preferredCompressionType(Arrays.asList(CompressionType.GZIP,
CompressionType.NONE)));
+ // Test with no unsupported types
+ assertEquals(CompressionType.NONE,
ClientTelemetryUtils.preferredCompressionType(List.of(), Set.of()));
+ assertEquals(CompressionType.NONE,
ClientTelemetryUtils.preferredCompressionType(List.of(CompressionType.NONE,
CompressionType.GZIP), Set.of()));
+ assertEquals(CompressionType.GZIP,
ClientTelemetryUtils.preferredCompressionType(List.of(CompressionType.GZIP,
CompressionType.NONE), Set.of()));
+
+ // Test unsupported type filtering (returns first available type, or
NONE if all are unsupported)
+ assertEquals(CompressionType.LZ4,
ClientTelemetryUtils.preferredCompressionType(List.of(CompressionType.GZIP,
CompressionType.LZ4), Set.of(CompressionType.GZIP)));
+ assertEquals(CompressionType.SNAPPY,
ClientTelemetryUtils.preferredCompressionType(List.of(CompressionType.GZIP,
CompressionType.LZ4, CompressionType.SNAPPY), Set.of(CompressionType.GZIP,
CompressionType.LZ4)));
+ assertEquals(CompressionType.NONE,
ClientTelemetryUtils.preferredCompressionType(List.of(CompressionType.GZIP,
CompressionType.LZ4), Set.of(CompressionType.GZIP, CompressionType.LZ4)));
+
+ // Test edge case: no match between requested and supported types
+ assertEquals(CompressionType.GZIP,
ClientTelemetryUtils.preferredCompressionType(List.of(CompressionType.GZIP,
CompressionType.LZ4), Set.of(CompressionType.SNAPPY)));
+
+ // Test NullPointerException for null parameters
+ assertThrows(NullPointerException.class, () ->
+ ClientTelemetryUtils.preferredCompressionType(null, Set.of()));
+ assertThrows(NullPointerException.class, () ->
+
ClientTelemetryUtils.preferredCompressionType(List.of(CompressionType.GZIP,
CompressionType.NONE), null));
}
@ParameterizedTest
@@ -150,19 +163,19 @@ public class ClientTelemetryUtilsTest {
private MetricsData getMetricsData() {
List<Metric> metricsList = new ArrayList<>();
metricsList.add(SinglePointMetric.sum(
- new MetricKey("metricName"), 1.0, true, Instant.now(),
null, Collections.emptySet())
+ new MetricKey("metricName"), 1.0, true, Instant.now(),
null, Set.of())
.builder().build());
metricsList.add(SinglePointMetric.sum(
- new MetricKey("metricName1"), 100.0, false,
Instant.now(), Instant.now(), Collections.emptySet())
+ new MetricKey("metricName1"), 100.0, false,
Instant.now(), Instant.now(), Set.of())
.builder().build());
metricsList.add(SinglePointMetric.deltaSum(
- new MetricKey("metricName2"), 1.0, true,
Instant.now(), Instant.now(), Collections.emptySet())
+ new MetricKey("metricName2"), 1.0, true,
Instant.now(), Instant.now(), Set.of())
.builder().build());
metricsList.add(SinglePointMetric.gauge(
- new MetricKey("metricName3"), 1.0, Instant.now(),
Collections.emptySet())
+ new MetricKey("metricName3"), 1.0, Instant.now(),
Set.of())
.builder().build());
metricsList.add(SinglePointMetric.gauge(
- new MetricKey("metricName4"), Long.valueOf(100),
Instant.now(), Collections.emptySet())
+ new MetricKey("metricName4"), Long.valueOf(100),
Instant.now(), Set.of())
.builder().build());
MetricsData.Builder builder = MetricsData.newBuilder();