This is an automated email from the ASF dual-hosted git repository.
mittal pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 70c51641fbf Cherrypick "MINOR : Handle error for client telemetry push
(#19881)" (#20176)
70c51641fbf is described below
commit 70c51641fbf54c3cea12b2ab2d730683813d00e8
Author: Kaushik Raina <[email protected]>
AuthorDate: Wed Jul 16 22:08:02 2025 +0530
Cherrypick "MINOR : Handle error for client telemetry push (#19881)"
(#20176)
Update catch to handle compression errors
Before :

After
```
Sent message: KR Message 376
[kafka-producer-network-thread | kr-kafka-producer] INFO
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
KR: Failed to compress telemetry payload for compression: zstd, sending
uncompressed data
Sent message: KR Message 377
```
Reviewers: Apoorv Mittal <[email protected]>, Bill Bejeck
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../kafka/common/telemetry/internals/ClientTelemetryReporter.java | 5 ++---
.../src/main/java/org/apache/kafka/server/ClientMetricsManager.java | 2 +-
2 files changed, 3 insertions(+), 4 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
index 705aafaaa70..e0491943fef 100644
---
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
+++
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
@@ -41,7 +41,6 @@ import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collections;
@@ -718,8 +717,8 @@ public class ClientTelemetryReporter implements
MetricsReporter {
ByteBuffer compressedPayload;
try {
compressedPayload = ClientTelemetryUtils.compress(payload,
compressionType);
- } catch (IOException e) {
- log.info("Failed to compress telemetry payload for
compression: {}, sending uncompressed data", compressionType);
+ } catch (Throwable e) {
+ log.debug("Failed to compress telemetry payload for
compression: {}, sending uncompressed data", compressionType);
compressedPayload = ByteBuffer.wrap(payload.toByteArray());
compressionType = CompressionType.NONE;
}
diff --git
a/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
index dcd17a3ecc0..2487ccb6f3d 100644
--- a/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
+++ b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
@@ -217,7 +217,7 @@ public class ClientMetricsManager implements AutoCloseable {
long exportTimeStartMs = time.hiResClockMs();
receiverPlugin.exportMetrics(requestContext, request);
clientMetricsStats.recordPluginExport(clientInstanceId,
time.hiResClockMs() - exportTimeStartMs);
- } catch (Exception exception) {
+ } catch (Throwable exception) {
clientMetricsStats.recordPluginErrorCount(clientInstanceId);
clientInstance.lastKnownError(Errors.INVALID_RECORD);
log.error("Error exporting client metrics to the plugin for
client instance id: {}", clientInstanceId, exception);