This is an automated email from the ASF dual-hosted git repository.

mittal pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.9 by this push:
     new b7b2676f534 Cherrypick  "MINOR : Handle error for client telemetry 
push (#19881)" (#20179)
b7b2676f534 is described below

commit b7b2676f5345b1683db1599a2c0d04461619ecbf
Author: Kaushik Raina <[email protected]>
AuthorDate: Wed Jul 16 22:09:31 2025 +0530

    Cherrypick  "MINOR : Handle error for client telemetry push (#19881)" 
(#20179)
    
    Update catch to handle compression errors
    
    Before :
    
    
    
![image](https://github.com/user-attachments/assets/c5ca121e-ba0c-4664-91f1-20b54abf67cc)
    
    After
    ```
    Sent message: KR Message 376
    [kafka-producer-network-thread | kr-kafka-producer] INFO
    org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
    KR: Failed to compress telemetry payload for compression: zstd, sending
    uncompressed data
    Sent message: KR Message 377
    ```
    
    Reviewers: Apoorv Mittal <[email protected]>, Bill Bejeck
    <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../kafka/common/telemetry/internals/ClientTelemetryReporter.java    | 5 ++---
 .../src/main/java/org/apache/kafka/server/ClientMetricsManager.java  | 2 +-
 2 files changed, 3 insertions(+), 4 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
 
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
index 78a3900d636..a38d5c19167 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
@@ -41,7 +41,6 @@ import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.time.Duration;
 import java.util.Collections;
 import java.util.List;
@@ -719,8 +718,8 @@ public class ClientTelemetryReporter implements 
MetricsReporter {
             byte[] compressedPayload;
             try {
                 compressedPayload = ClientTelemetryUtils.compress(payload, 
compressionType);
-            } catch (IOException e) {
-                log.info("Failed to compress telemetry payload for 
compression: {}, sending uncompressed data", compressionType);
+            } catch (Throwable e) {
+                log.debug("Failed to compress telemetry payload for 
compression: {}, sending uncompressed data", compressionType);
                 compressedPayload = payload;
                 compressionType = CompressionType.NONE;
             }
diff --git 
a/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java 
b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
index 655a3d8625a..c3f76ae9dee 100644
--- a/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
+++ b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
@@ -211,7 +211,7 @@ public class ClientMetricsManager implements AutoCloseable {
                 long exportTimeStartMs = time.hiResClockMs();
                 receiverPlugin.exportMetrics(requestContext, request);
                 clientMetricsStats.recordPluginExport(clientInstanceId, 
time.hiResClockMs() - exportTimeStartMs);
-            } catch (Exception exception) {
+            } catch (Throwable exception) {
                 clientMetricsStats.recordPluginErrorCount(clientInstanceId);
                 clientInstance.lastKnownError(Errors.INVALID_RECORD);
                 log.error("Error exporting client metrics to the plugin for 
client instance id: {}", clientInstanceId, exception);

Reply via email to