This is an automated email from the ASF dual-hosted git repository.

mittal pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 70c51641fbf Cherrypick "MINOR : Handle error for client telemetry push 
(#19881)" (#20176)
70c51641fbf is described below

commit 70c51641fbf54c3cea12b2ab2d730683813d00e8
Author: Kaushik Raina <[email protected]>
AuthorDate: Wed Jul 16 22:08:02 2025 +0530

    Cherrypick "MINOR : Handle error for client telemetry push (#19881)" 
(#20176)
    
    Update catch to handle compression errors
    
    Before :
    
    
    
![image](https://github.com/user-attachments/assets/c5ca121e-ba0c-4664-91f1-20b54abf67cc)
    
    After
    ```
    Sent message: KR Message 376
    [kafka-producer-network-thread | kr-kafka-producer] INFO
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    KR: Failed to compress telemetry payload for compression: zstd, sending
    uncompressed data
    Sent message: KR Message 377
    ```
    
    Reviewers: Apoorv Mittal <[email protected]>, Bill Bejeck
    <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../kafka/common/telemetry/internals/ClientTelemetryReporter.java    | 5 ++---
 .../src/main/java/org/apache/kafka/server/ClientMetricsManager.java  | 2 +-
 2 files changed, 3 insertions(+), 4 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
 
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
index 705aafaaa70..e0491943fef 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
@@ -41,7 +41,6 @@ import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.time.Duration;
 import java.util.Collections;
@@ -718,8 +717,8 @@ public class ClientTelemetryReporter implements 
MetricsReporter {
             ByteBuffer compressedPayload;
             try {
                 compressedPayload = ClientTelemetryUtils.compress(payload, 
compressionType);
-            } catch (IOException e) {
-                log.info("Failed to compress telemetry payload for 
compression: {}, sending uncompressed data", compressionType);
+            } catch (Throwable e) {
+                log.debug("Failed to compress telemetry payload for 
compression: {}, sending uncompressed data", compressionType);
                 compressedPayload = ByteBuffer.wrap(payload.toByteArray());
                 compressionType = CompressionType.NONE;
             }
diff --git 
a/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java 
b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
index dcd17a3ecc0..2487ccb6f3d 100644
--- a/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
+++ b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
@@ -217,7 +217,7 @@ public class ClientMetricsManager implements AutoCloseable {
                 long exportTimeStartMs = time.hiResClockMs();
                 receiverPlugin.exportMetrics(requestContext, request);
                 clientMetricsStats.recordPluginExport(clientInstanceId, 
time.hiResClockMs() - exportTimeStartMs);
-            } catch (Exception exception) {
+            } catch (Throwable exception) {
                 clientMetricsStats.recordPluginErrorCount(clientInstanceId);
                 clientInstance.lastKnownError(Errors.INVALID_RECORD);
                 log.error("Error exporting client metrics to the plugin for 
client instance id: {}", clientInstanceId, exception);

Reply via email to