Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-03-20 Thread via GitHub


apoorvmittal10 commented on code in PR #15148:
URL: https://github.com/apache/kafka/pull/15148#discussion_r1532971164


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java:
##
@@ -175,16 +182,44 @@ public static boolean 
validateRequiredResourceLabels(Map metadat
 }
 
 public static CompressionType 
preferredCompressionType(List acceptedCompressionTypes) {
-// TODO: Support compression in client telemetry.
+if (acceptedCompressionTypes != null && 
!acceptedCompressionTypes.isEmpty()) {
+// Broker is providing the compression types in order of 
preference. Grab the
+// first one.
+return acceptedCompressionTypes.get(0);
+}
 return CompressionType.NONE;
 }
 
 public static ByteBuffer compress(byte[] raw, CompressionType 
compressionType) {
-// TODO: Support compression in client telemetry.
-if (compressionType == CompressionType.NONE) {
-return ByteBuffer.wrap(raw);
-} else {
-throw new UnsupportedOperationException("Compression is not 
supported");
+try {
+try (ByteBufferOutputStream compressedOut = new 
ByteBufferOutputStream(512)) {
+try (OutputStream out = 
compressionType.wrapForOutput(compressedOut, RecordBatch.CURRENT_MAGIC_VALUE)) {
+out.write(raw);
+out.flush();
+}
+compressedOut.buffer().flip();
+return ByteBuffer.wrap(Utils.toArray(compressedOut.buffer()));
+}
+} catch (IOException e) {
+throw new KafkaException("Failed to compress metrics data", e);
+}
+}
+
+public static ByteBuffer decompress(byte[] metrics, CompressionType 
compressionType) {
+ByteBuffer data = ByteBuffer.wrap(metrics);
+try (InputStream in = compressionType.wrapForInput(data, 
RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create());
+ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+
+byte[] bytes = new byte[data.capacity() * 2];
+int nRead;
+while ((nRead = in.read(bytes, 0, bytes.length)) != -1) {
+out.write(bytes, 0, nRead);
+}
+
+out.flush();
+return ByteBuffer.wrap(out.toByteArray());

Review Comment:
   @chia7712 Sounds good, can you please create an improvement jira for myself, 
I ll address that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-03-20 Thread via GitHub


chia7712 commented on code in PR #15148:
URL: https://github.com/apache/kafka/pull/15148#discussion_r1531889679


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java:
##
@@ -175,16 +182,44 @@ public static boolean 
validateRequiredResourceLabels(Map metadat
 }
 
 public static CompressionType 
preferredCompressionType(List acceptedCompressionTypes) {
-// TODO: Support compression in client telemetry.
+if (acceptedCompressionTypes != null && 
!acceptedCompressionTypes.isEmpty()) {
+// Broker is providing the compression types in order of 
preference. Grab the
+// first one.
+return acceptedCompressionTypes.get(0);
+}
 return CompressionType.NONE;
 }
 
 public static ByteBuffer compress(byte[] raw, CompressionType 
compressionType) {
-// TODO: Support compression in client telemetry.
-if (compressionType == CompressionType.NONE) {
-return ByteBuffer.wrap(raw);
-} else {
-throw new UnsupportedOperationException("Compression is not 
supported");
+try {
+try (ByteBufferOutputStream compressedOut = new 
ByteBufferOutputStream(512)) {
+try (OutputStream out = 
compressionType.wrapForOutput(compressedOut, RecordBatch.CURRENT_MAGIC_VALUE)) {
+out.write(raw);
+out.flush();
+}
+compressedOut.buffer().flip();
+return ByteBuffer.wrap(Utils.toArray(compressedOut.buffer()));
+}
+} catch (IOException e) {
+throw new KafkaException("Failed to compress metrics data", e);
+}
+}
+
+public static ByteBuffer decompress(byte[] metrics, CompressionType 
compressionType) {
+ByteBuffer data = ByteBuffer.wrap(metrics);
+try (InputStream in = compressionType.wrapForInput(data, 
RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create());
+ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+
+byte[] bytes = new byte[data.capacity() * 2];
+int nRead;
+while ((nRead = in.read(bytes, 0, bytes.length)) != -1) {
+out.write(bytes, 0, nRead);
+}
+
+out.flush();
+return ByteBuffer.wrap(out.toByteArray());

Review Comment:
   hi @apoorvmittal10 I have a question: Is it worth using 
`ByteBufferOutputStream` to replace `ByteArrayOutputStream`? We can avoid the 
array copy by taking buffer from `ByteBufferOutputStream` directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-01-17 Thread via GitHub


mjsax merged PR #15148:
URL: https://github.com/apache/kafka/pull/15148


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-01-17 Thread via GitHub


mjsax commented on code in PR #15148:
URL: https://github.com/apache/kafka/pull/15148#discussion_r1456522509


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java:
##
@@ -715,15 +716,22 @@ private Optional> 
createPushRequest(ClientTelemetrySubscription local
 }
 
 CompressionType compressionType = 
ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes());
-ByteBuffer buffer = ClientTelemetryUtils.compress(payload, 
compressionType);
+byte[] compressedPayload;
+try {
+compressedPayload = ClientTelemetryUtils.compress(payload, 
compressionType);
+} catch (IOException e) {
+log.info("Failed to compress telemetry payload for 
compression: {}, sending uncompressed data", compressionType);

Review Comment:
   Fine with me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-01-16 Thread via GitHub


philipnee commented on code in PR #15148:
URL: https://github.com/apache/kafka/pull/15148#discussion_r1453824378


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java:
##
@@ -715,15 +716,22 @@ private Optional> 
createPushRequest(ClientTelemetrySubscription local
 }
 
 CompressionType compressionType = 
ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes());
-ByteBuffer buffer = ClientTelemetryUtils.compress(payload, 
compressionType);
+byte[] compressedPayload;
+try {
+compressedPayload = ClientTelemetryUtils.compress(payload, 
compressionType);
+} catch (IOException e) {
+log.info("Failed to compress telemetry payload for 
compression: {}, sending uncompressed data", compressionType);

Review Comment:
   thanks Apoorv, i'm ok with info level logging - wdyt @mjsax? 
   
   i mentioned logging error to debug because it seems like the caught 
exception is not being logged anywhere - i was wondering if we should at least 
somewhat log that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-01-16 Thread via GitHub


apoorvmittal10 commented on code in PR #15148:
URL: https://github.com/apache/kafka/pull/15148#discussion_r1453672248


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java:
##
@@ -715,15 +716,22 @@ private Optional> 
createPushRequest(ClientTelemetrySubscription local
 }
 
 CompressionType compressionType = 
ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes());
-ByteBuffer buffer = ClientTelemetryUtils.compress(payload, 
compressionType);
+byte[] compressedPayload;
+try {
+compressedPayload = ClientTelemetryUtils.compress(payload, 
compressionType);
+} catch (IOException e) {
+log.info("Failed to compress telemetry payload for 
compression: {}, sending uncompressed data", compressionType);

Review Comment:
   Make sense @philipnee, as user cannot fix it hence I avoided WARN or ERROR. 
DEBUG would just make errors on client go unnoticed hence avoided that too. I 
wanted to have a log which can tell that something can be fixed in clients if 
we see this log line. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-01-16 Thread via GitHub


philipnee commented on code in PR #15148:
URL: https://github.com/apache/kafka/pull/15148#discussion_r1453662602


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java:
##
@@ -715,15 +716,22 @@ private Optional> 
createPushRequest(ClientTelemetrySubscription local
 }
 
 CompressionType compressionType = 
ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes());
-ByteBuffer buffer = ClientTelemetryUtils.compress(payload, 
compressionType);
+byte[] compressedPayload;
+try {
+compressedPayload = ClientTelemetryUtils.compress(payload, 
compressionType);
+} catch (IOException e) {
+log.info("Failed to compress telemetry payload for 
compression: {}, sending uncompressed data", compressionType);

Review Comment:
   I think WARN or INFO is sufficient as it is not necessary a fatal path.  How 
much should user care about getting uncompressed data? If they should try to 
fix it maybe deserves a WARN level logging.  If this is a potential issue to 
the system then WARN is better. 
   
   Also, Should we log error to DEBUG log?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-01-15 Thread via GitHub


apoorvmittal10 commented on code in PR #15148:
URL: https://github.com/apache/kafka/pull/15148#discussion_r1452404874


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java:
##
@@ -715,15 +716,22 @@ private Optional> 
createPushRequest(ClientTelemetrySubscription local
 }
 
 CompressionType compressionType = 
ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes());
-ByteBuffer buffer = ClientTelemetryUtils.compress(payload, 
compressionType);
+byte[] compressedPayload;
+try {
+compressedPayload = ClientTelemetryUtils.compress(payload, 
compressionType);
+} catch (IOException e) {
+log.info("Failed to compress telemetry payload for 
compression: {}, sending uncompressed data", compressionType);

Review Comment:
   cc: @AndrewJSchofield 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-01-13 Thread via GitHub


apoorvmittal10 commented on code in PR #15148:
URL: https://github.com/apache/kafka/pull/15148#discussion_r1451062702


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java:
##
@@ -715,15 +716,22 @@ private Optional> 
createPushRequest(ClientTelemetrySubscription local
 }
 
 CompressionType compressionType = 
ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes());
-ByteBuffer buffer = ClientTelemetryUtils.compress(payload, 
compressionType);
+byte[] compressedPayload;
+try {
+compressedPayload = ClientTelemetryUtils.compress(payload, 
compressionType);
+} catch (IOException e) {
+log.info("Failed to compress telemetry payload for 
compression: {}, sending uncompressed data", compressionType);

Review Comment:
   I think generally clients log such internal handling errors in INFO else end 
application user sees that as an issue (if they see logs in WARN or ERROR), 
that's my understanding. Maybe @philipnee @kirktrue can better address this.
   
   I don't think there will be other scenarios where we fall back to 
uncompressed data but client metrics is something where the compression is not 
defined by the end user rather sent by server and automatically picked by 
client.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-01-12 Thread via GitHub


apoorvmittal10 commented on code in PR #15148:
URL: https://github.com/apache/kafka/pull/15148#discussion_r1451062702


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java:
##
@@ -715,15 +716,22 @@ private Optional> 
createPushRequest(ClientTelemetrySubscription local
 }
 
 CompressionType compressionType = 
ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes());
-ByteBuffer buffer = ClientTelemetryUtils.compress(payload, 
compressionType);
+byte[] compressedPayload;
+try {
+compressedPayload = ClientTelemetryUtils.compress(payload, 
compressionType);
+} catch (IOException e) {
+log.info("Failed to compress telemetry payload for 
compression: {}, sending uncompressed data", compressionType);

Review Comment:
   I think generally clients log such issues in INFO else customer opens the 
escalation issue where if they see logs in WARN or ERROR, that's my 
understanding. Maybe @philipnee @kirktrue can better address this.
   
   I don't think there will be other scenarios where we fall back to 
uncompressed but client metrics is something where the compression is not 
defined by the user rather sent by server and automatically picked by client.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-01-12 Thread via GitHub


apoorvmittal10 commented on code in PR #15148:
URL: https://github.com/apache/kafka/pull/15148#discussion_r1451062702


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java:
##
@@ -715,15 +716,22 @@ private Optional> 
createPushRequest(ClientTelemetrySubscription local
 }
 
 CompressionType compressionType = 
ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes());
-ByteBuffer buffer = ClientTelemetryUtils.compress(payload, 
compressionType);
+byte[] compressedPayload;
+try {
+compressedPayload = ClientTelemetryUtils.compress(payload, 
compressionType);
+} catch (IOException e) {
+log.info("Failed to compress telemetry payload for 
compression: {}, sending uncompressed data", compressionType);

Review Comment:
   I think generally clients log such issues in INFO else customer opens the 
escalation issue where he sees logs in WARN or ERROR, that's my understanding. 
Maybe @philipnee @kirktrue can better address this.
   
   I don't think there will be other scenarios where we fall back to 
uncompressed but client metrics is something where the compression is not 
defined by the user rather sent by server and automatically picked by client.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-01-12 Thread via GitHub


mjsax commented on code in PR #15148:
URL: https://github.com/apache/kafka/pull/15148#discussion_r1451046445


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java:
##
@@ -715,15 +716,22 @@ private Optional> 
createPushRequest(ClientTelemetrySubscription local
 }
 
 CompressionType compressionType = 
ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes());
-ByteBuffer buffer = ClientTelemetryUtils.compress(payload, 
compressionType);
+byte[] compressedPayload;
+try {
+compressedPayload = ClientTelemetryUtils.compress(payload, 
compressionType);
+} catch (IOException e) {
+log.info("Failed to compress telemetry payload for 
compression: {}, sending uncompressed data", compressionType);

Review Comment:
   Should this be WARN-level (or maybe even ERROR) -- Do we have existing 
similar cases elsewhere in client for which we fall back to uncompressed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-01-12 Thread via GitHub


mjsax commented on code in PR #15148:
URL: https://github.com/apache/kafka/pull/15148#discussion_r1446041813


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java:
##
@@ -175,16 +182,42 @@ public static boolean 
validateRequiredResourceLabels(Map metadat
 }
 
 public static CompressionType 
preferredCompressionType(List acceptedCompressionTypes) {
-// TODO: Support compression in client telemetry.
+if (acceptedCompressionTypes != null && 
!acceptedCompressionTypes.isEmpty()) {
+// Broker is providing the compression types in order of 
preference. Grab the
+// first one.
+return acceptedCompressionTypes.get(0);
+}
 return CompressionType.NONE;
 }
 
 public static ByteBuffer compress(byte[] raw, CompressionType 
compressionType) {
-// TODO: Support compression in client telemetry.
-if (compressionType == CompressionType.NONE) {
-return ByteBuffer.wrap(raw);
-} else {
-throw new UnsupportedOperationException("Compression is not 
supported");
+try (ByteBufferOutputStream compressedOut = new 
ByteBufferOutputStream(512)) {
+try (OutputStream out = 
compressionType.wrapForOutput(compressedOut, RecordBatch.CURRENT_MAGIC_VALUE)) {
+out.write(raw);
+out.flush();
+}
+compressedOut.buffer().flip();
+return ByteBuffer.wrap(Utils.toArray(compressedOut.buffer()));
+} catch (IOException e) {
+throw new KafkaException("Failed to compress metrics data", e);

Review Comment:
   Is it intentional to crash for this case? Or should we send data 
uncompressed if anything goes wrong?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-01-11 Thread via GitHub


apoorvmittal10 commented on PR #15148:
URL: https://github.com/apache/kafka/pull/15148#issuecomment-1888052029

   @mjsax Can you please take a re-look, I have addressed the comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-01-09 Thread via GitHub


philipnee commented on code in PR #15148:
URL: https://github.com/apache/kafka/pull/15148#discussion_r1446637026


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java:
##
@@ -175,16 +182,40 @@ public static boolean 
validateRequiredResourceLabels(Map metadat
 }
 
 public static CompressionType 
preferredCompressionType(List acceptedCompressionTypes) {
-// TODO: Support compression in client telemetry.
+if (acceptedCompressionTypes != null && 
!acceptedCompressionTypes.isEmpty()) {
+// Broker is providing the compression types in order of 
preference. Grab the
+// first one.
+return acceptedCompressionTypes.get(0);

Review Comment:
   thanks for the clarification.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-01-09 Thread via GitHub


apoorvmittal10 commented on code in PR #15148:
URL: https://github.com/apache/kafka/pull/15148#discussion_r1446625255


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java:
##
@@ -175,16 +182,40 @@ public static boolean 
validateRequiredResourceLabels(Map metadat
 }
 
 public static CompressionType 
preferredCompressionType(List acceptedCompressionTypes) {
-// TODO: Support compression in client telemetry.
+if (acceptedCompressionTypes != null && 
!acceptedCompressionTypes.isEmpty()) {
+// Broker is providing the compression types in order of 
preference. Grab the
+// first one.
+return acceptedCompressionTypes.get(0);

Review Comment:
   Thanks for looking at PR Philip. Never for Java client as we support all 
compression types in java client. Below is what KIP says
   
   `
   The broker will return a prioritized list of supported compression types in 
the GetTelemetrySubscriptionsResponse.AcceptedCompressionTypes array, the 
client is free to pick any supported compression type but should pick the first 
mutually supported type in the returned list. If the AcceptedCompressionTypes 
array is empty the client must send metrics uncompressed. The default 
compression types list as returned from the broker should be: ZStd, LZ4, GZip, 
Snappy.`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-01-09 Thread via GitHub


philipnee commented on code in PR #15148:
URL: https://github.com/apache/kafka/pull/15148#discussion_r1446620286


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java:
##
@@ -175,16 +182,40 @@ public static boolean 
validateRequiredResourceLabels(Map metadat
 }
 
 public static CompressionType 
preferredCompressionType(List acceptedCompressionTypes) {
-// TODO: Support compression in client telemetry.
+if (acceptedCompressionTypes != null && 
!acceptedCompressionTypes.isEmpty()) {
+// Broker is providing the compression types in order of 
preference. Grab the
+// first one.
+return acceptedCompressionTypes.get(0);

Review Comment:
   out of curiousity - when would we not use the first one?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-01-09 Thread via GitHub


apoorvmittal10 commented on PR #15148:
URL: https://github.com/apache/kafka/pull/15148#issuecomment-1883799949

   Build passed on all environments with unrelated tests failure.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-01-09 Thread via GitHub


apoorvmittal10 commented on code in PR #15148:
URL: https://github.com/apache/kafka/pull/15148#discussion_r1446222582


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java:
##
@@ -175,16 +182,42 @@ public static boolean 
validateRequiredResourceLabels(Map metadat
 }
 
 public static CompressionType 
preferredCompressionType(List acceptedCompressionTypes) {
-// TODO: Support compression in client telemetry.
+if (acceptedCompressionTypes != null && 
!acceptedCompressionTypes.isEmpty()) {
+// Broker is providing the compression types in order of 
preference. Grab the
+// first one.
+return acceptedCompressionTypes.get(0);
+}
 return CompressionType.NONE;
 }
 
 public static ByteBuffer compress(byte[] raw, CompressionType 
compressionType) {
-// TODO: Support compression in client telemetry.
-if (compressionType == CompressionType.NONE) {
-return ByteBuffer.wrap(raw);
-} else {
-throw new UnsupportedOperationException("Compression is not 
supported");
+try (ByteBufferOutputStream compressedOut = new 
ByteBufferOutputStream(512)) {
+try (OutputStream out = 
compressionType.wrapForOutput(compressedOut, RecordBatch.CURRENT_MAGIC_VALUE)) {
+out.write(raw);
+out.flush();
+}
+compressedOut.buffer().flip();
+return ByteBuffer.wrap(Utils.toArray(compressedOut.buffer()));
+} catch (IOException e) {
+throw new KafkaException("Failed to compress metrics data", e);

Review Comment:
   Added additional tests in ClientTelemetryReporterTest.java to validate the 
above scenario.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-01-09 Thread via GitHub


apoorvmittal10 commented on code in PR #15148:
URL: https://github.com/apache/kafka/pull/15148#discussion_r1446218297


##
clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java:
##
@@ -111,5 +116,20 @@ public void testValidateIntervalMsInvalid(int 
pushIntervalMs) {
 public void testPreferredCompressionType() {
 assertEquals(CompressionType.NONE, 
ClientTelemetryUtils.preferredCompressionType(Collections.emptyList()));
 assertEquals(CompressionType.NONE, 
ClientTelemetryUtils.preferredCompressionType(null));
+assertEquals(CompressionType.NONE, 
ClientTelemetryUtils.preferredCompressionType(Arrays.asList(CompressionType.NONE,
 CompressionType.GZIP)));
+assertEquals(CompressionType.GZIP, 
ClientTelemetryUtils.preferredCompressionType(Arrays.asList(CompressionType.GZIP,
 CompressionType.NONE)));
+}
+
+@ParameterizedTest
+@EnumSource(CompressionType.class)
+public void testCompressDecompress(CompressionType compressionType) {
+byte[] testString = "test string".getBytes(StandardCharsets.UTF_8);
+ByteBuffer compressed = ClientTelemetryUtils.compress(testString, 
compressionType);
+assertNotNull(compressed);

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-01-09 Thread via GitHub


apoorvmittal10 commented on code in PR #15148:
URL: https://github.com/apache/kafka/pull/15148#discussion_r1446217619


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java:
##
@@ -175,16 +182,42 @@ public static boolean 
validateRequiredResourceLabels(Map metadat
 }
 
 public static CompressionType 
preferredCompressionType(List acceptedCompressionTypes) {
-// TODO: Support compression in client telemetry.
+if (acceptedCompressionTypes != null && 
!acceptedCompressionTypes.isEmpty()) {
+// Broker is providing the compression types in order of 
preference. Grab the
+// first one.
+return acceptedCompressionTypes.get(0);
+}
 return CompressionType.NONE;
 }
 
 public static ByteBuffer compress(byte[] raw, CompressionType 
compressionType) {
-// TODO: Support compression in client telemetry.
-if (compressionType == CompressionType.NONE) {
-return ByteBuffer.wrap(raw);
-} else {
-throw new UnsupportedOperationException("Compression is not 
supported");
+try (ByteBufferOutputStream compressedOut = new 
ByteBufferOutputStream(512)) {
+try (OutputStream out = 
compressionType.wrapForOutput(compressedOut, RecordBatch.CURRENT_MAGIC_VALUE)) {
+out.write(raw);
+out.flush();
+}
+compressedOut.buffer().flip();
+return ByteBuffer.wrap(Utils.toArray(compressedOut.buffer()));
+} catch (IOException e) {
+throw new KafkaException("Failed to compress metrics data", e);

Review Comment:
   I think you are right. It would be wise to send uncompressed data in case 
some error occurs, I have changed the code to address that.
   cc: @AndrewJSchofield 



##
clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryRequestTest.java:
##
@@ -34,4 +53,65 @@ public void testGetErrorResponse() {
 
assertEquals(Collections.singletonMap(Errors.CLUSTER_AUTHORIZATION_FAILED, 1), 
response.errorCounts());
 }
 
+@ParameterizedTest
+@EnumSource(CompressionType.class)
+public void testMetricsDataCompression(CompressionType compressionType) {
+MetricsData metricsData = getMetricsData();
+PushTelemetryRequest req = getPushTelemetryRequest(metricsData, 
compressionType);
+
+ByteBuffer receivedMetricsBuffer = req.metricsData();
+assertNotNull(receivedMetricsBuffer);
+assertTrue(receivedMetricsBuffer.capacity() > 0);
+
+MetricsData receivedData = 
ClientTelemetryUtils.deserializeMetricsData(receivedMetricsBuffer);
+assertEquals(metricsData, receivedData);
+}
+
+private PushTelemetryRequest getPushTelemetryRequest(MetricsData 
metricsData, CompressionType compressionType) {
+byte[] data = metricsData.toByteArray();
+ByteBuffer metricsBuffer = ClientTelemetryUtils.compress(data, 
compressionType);
+if (compressionType != CompressionType.NONE) {
+assertTrue(metricsBuffer.array().length < data.length);
+} else {
+assertEquals(metricsBuffer.array().length, data.length);

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-01-09 Thread via GitHub


mjsax commented on code in PR #15148:
URL: https://github.com/apache/kafka/pull/15148#discussion_r1446040488


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java:
##
@@ -175,16 +182,42 @@ public static boolean 
validateRequiredResourceLabels(Map metadat
 }
 
 public static CompressionType 
preferredCompressionType(List acceptedCompressionTypes) {
-// TODO: Support compression in client telemetry.
+if (acceptedCompressionTypes != null && 
!acceptedCompressionTypes.isEmpty()) {
+// Broker is providing the compression types in order of 
preference. Grab the
+// first one.
+return acceptedCompressionTypes.get(0);
+}
 return CompressionType.NONE;
 }
 
 public static ByteBuffer compress(byte[] raw, CompressionType 
compressionType) {
-// TODO: Support compression in client telemetry.
-if (compressionType == CompressionType.NONE) {

Review Comment:
   Why can we remove this case? It seems `compress(...)` might be called with 
`NONE`, so it's still a valid case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-01-09 Thread via GitHub


mjsax commented on code in PR #15148:
URL: https://github.com/apache/kafka/pull/15148#discussion_r1446048966


##
clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java:
##
@@ -111,5 +116,20 @@ public void testValidateIntervalMsInvalid(int 
pushIntervalMs) {
 public void testPreferredCompressionType() {
 assertEquals(CompressionType.NONE, 
ClientTelemetryUtils.preferredCompressionType(Collections.emptyList()));
 assertEquals(CompressionType.NONE, 
ClientTelemetryUtils.preferredCompressionType(null));
+assertEquals(CompressionType.NONE, 
ClientTelemetryUtils.preferredCompressionType(Arrays.asList(CompressionType.NONE,
 CompressionType.GZIP)));
+assertEquals(CompressionType.GZIP, 
ClientTelemetryUtils.preferredCompressionType(Arrays.asList(CompressionType.GZIP,
 CompressionType.NONE)));
+}
+
+@ParameterizedTest
+@EnumSource(CompressionType.class)
+public void testCompressDecompress(CompressionType compressionType) {
+byte[] testString = "test string".getBytes(StandardCharsets.UTF_8);
+ByteBuffer compressed = ClientTelemetryUtils.compress(testString, 
compressionType);
+assertNotNull(compressed);

Review Comment:
   Should we check if array length was reduced?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-01-09 Thread via GitHub


mjsax commented on code in PR #15148:
URL: https://github.com/apache/kafka/pull/15148#discussion_r1446048539


##
clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryRequestTest.java:
##
@@ -34,4 +53,65 @@ public void testGetErrorResponse() {
 
assertEquals(Collections.singletonMap(Errors.CLUSTER_AUTHORIZATION_FAILED, 1), 
response.errorCounts());
 }
 
+@ParameterizedTest
+@EnumSource(CompressionType.class)
+public void testMetricsDataCompression(CompressionType compressionType) {
+MetricsData metricsData = getMetricsData();
+PushTelemetryRequest req = getPushTelemetryRequest(metricsData, 
compressionType);
+
+ByteBuffer receivedMetricsBuffer = req.metricsData();
+assertNotNull(receivedMetricsBuffer);
+assertTrue(receivedMetricsBuffer.capacity() > 0);
+
+MetricsData receivedData = 
ClientTelemetryUtils.deserializeMetricsData(receivedMetricsBuffer);
+assertEquals(metricsData, receivedData);
+}
+
+private PushTelemetryRequest getPushTelemetryRequest(MetricsData 
metricsData, CompressionType compressionType) {
+byte[] data = metricsData.toByteArray();
+ByteBuffer metricsBuffer = ClientTelemetryUtils.compress(data, 
compressionType);
+if (compressionType != CompressionType.NONE) {
+assertTrue(metricsBuffer.array().length < data.length);
+} else {
+assertEquals(metricsBuffer.array().length, data.length);

Review Comment:
   Should we check if both arrays are the same (not just their length)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-01-09 Thread via GitHub


mjsax commented on code in PR #15148:
URL: https://github.com/apache/kafka/pull/15148#discussion_r1446041813


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java:
##
@@ -175,16 +182,42 @@ public static boolean 
validateRequiredResourceLabels(Map metadat
 }
 
 public static CompressionType 
preferredCompressionType(List acceptedCompressionTypes) {
-// TODO: Support compression in client telemetry.
+if (acceptedCompressionTypes != null && 
!acceptedCompressionTypes.isEmpty()) {
+// Broker is providing the compression types in order of 
preference. Grab the
+// first one.
+return acceptedCompressionTypes.get(0);
+}
 return CompressionType.NONE;
 }
 
 public static ByteBuffer compress(byte[] raw, CompressionType 
compressionType) {
-// TODO: Support compression in client telemetry.
-if (compressionType == CompressionType.NONE) {
-return ByteBuffer.wrap(raw);
-} else {
-throw new UnsupportedOperationException("Compression is not 
supported");
+try (ByteBufferOutputStream compressedOut = new 
ByteBufferOutputStream(512)) {
+try (OutputStream out = 
compressionType.wrapForOutput(compressedOut, RecordBatch.CURRENT_MAGIC_VALUE)) {
+out.write(raw);
+out.flush();
+}
+compressedOut.buffer().flip();
+return ByteBuffer.wrap(Utils.toArray(compressedOut.buffer()));
+} catch (IOException e) {
+throw new KafkaException("Failed to compress metrics data", e);

Review Comment:
   In it intentional to crash for this case? Or should we send data 
uncompressed if anything goes wrong?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-01-09 Thread via GitHub


mjsax commented on code in PR #15148:
URL: https://github.com/apache/kafka/pull/15148#discussion_r1446040488


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java:
##
@@ -175,16 +182,42 @@ public static boolean 
validateRequiredResourceLabels(Map metadat
 }
 
 public static CompressionType 
preferredCompressionType(List acceptedCompressionTypes) {
-// TODO: Support compression in client telemetry.
+if (acceptedCompressionTypes != null && 
!acceptedCompressionTypes.isEmpty()) {
+// Broker is providing the compression types in order of 
preference. Grab the
+// first one.
+return acceptedCompressionTypes.get(0);
+}
 return CompressionType.NONE;
 }
 
 public static ByteBuffer compress(byte[] raw, CompressionType 
compressionType) {
-// TODO: Support compression in client telemetry.
-if (compressionType == CompressionType.NONE) {

Review Comment:
   Why can we remove this case? It seems `compress(...)` might be called with 
`NONE`, so it's still a valid case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-01-09 Thread via GitHub


apoorvmittal10 commented on code in PR #15148:
URL: https://github.com/apache/kafka/pull/15148#discussion_r1445983300


##
clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryRequestTest.java:
##
@@ -34,4 +53,65 @@ public void testGetErrorResponse() {
 
assertEquals(Collections.singletonMap(Errors.CLUSTER_AUTHORIZATION_FAILED, 1), 
response.errorCounts());
 }
 
+@ParameterizedTest
+@EnumSource(CompressionType.class)
+public void testMetricsDataCompressionTypeNone(CompressionType 
compressionType) {

Review Comment:
   My bad, moved multiple methods to single method with parametrized test and 
missed the method name correction. Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-01-09 Thread via GitHub


AndrewJSchofield commented on code in PR #15148:
URL: https://github.com/apache/kafka/pull/15148#discussion_r1445978961


##
clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryRequestTest.java:
##
@@ -34,4 +53,65 @@ public void testGetErrorResponse() {
 
assertEquals(Collections.singletonMap(Errors.CLUSTER_AUTHORIZATION_FAILED, 1), 
response.errorCounts());
 }
 
+@ParameterizedTest
+@EnumSource(CompressionType.class)
+public void testMetricsDataCompressionTypeNone(CompressionType 
compressionType) {

Review Comment:
   It's slightly odd that it's a parameterized test, although it's 
`CompressionTypeNone`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-01-08 Thread via GitHub


apoorvmittal10 commented on PR #15148:
URL: https://github.com/apache/kafka/pull/15148#issuecomment-1881633923

   @AndrewJSchofield @mjsax @kirktrue @philipnee Please review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-01-08 Thread via GitHub


apoorvmittal10 commented on code in PR #15148:
URL: https://github.com/apache/kafka/pull/15148#discussion_r1445133177


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java:
##
@@ -175,16 +182,44 @@ public static boolean 
validateRequiredResourceLabels(Map metadat
 }
 
 public static CompressionType 
preferredCompressionType(List acceptedCompressionTypes) {
-// TODO: Support compression in client telemetry.
+if (acceptedCompressionTypes != null && 
!acceptedCompressionTypes.isEmpty()) {
+// Broker is providing the compression types in order of 
preference. Grab the
+// first one.
+return acceptedCompressionTypes.get(0);
+}
 return CompressionType.NONE;
 }
 
 public static ByteBuffer compress(byte[] raw, CompressionType 
compressionType) {
-// TODO: Support compression in client telemetry.
-if (compressionType == CompressionType.NONE) {
-return ByteBuffer.wrap(raw);
-} else {
-throw new UnsupportedOperationException("Compression is not 
supported");
+try {
+try (ByteBufferOutputStream compressedOut = new 
ByteBufferOutputStream(512)) {
+try (OutputStream out = 
compressionType.wrapForOutput(compressedOut, RecordBatch.CURRENT_MAGIC_VALUE)) {
+out.write(raw);
+out.flush();
+}
+compressedOut.buffer().flip();
+return ByteBuffer.wrap(Utils.toArray(compressedOut.buffer()));
+}
+} catch (IOException e) {
+throw new KafkaException("Failed to compress metrics data", e);
+}
+}
+
+public static ByteBuffer decompress(byte[] metrics, CompressionType 
compressionType) {
+ByteBuffer data = ByteBuffer.wrap(metrics);
+try (InputStream in = compressionType.wrapForInput(data, 
RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create());
+ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+
+byte[] bytes = new byte[data.capacity() * 2];
+int nRead;
+while ((nRead = in.read(bytes, 0, bytes.length)) != -1) {
+out.write(bytes, 0, nRead);
+}
+
+out.flush();
+return ByteBuffer.wrap(out.toByteArray());

Review Comment:
   Avoided using below code from Utils as the allocation of ByteBuffer is not 
known and `Utils.readFully` only fills the data till buffer capacity, which 
means we require readjusting ByteBuffer which is similarly done above.
   
   The reason Utils.readFully is being successfully used in MemoryRecords 
because the uncompressed data size comes along the Produce request where buffer 
allocation is accurate. 
   
   ```
   ByteBuffer decompressedData = ByteBuffer.allocate();
   Utils.readFully(in, decompressedData);
   return (ByteBuffer) decompressedData.flip();
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-01-08 Thread via GitHub


apoorvmittal10 commented on code in PR #15148:
URL: https://github.com/apache/kafka/pull/15148#discussion_r1445128354


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java:
##
@@ -175,16 +182,44 @@ public static boolean 
validateRequiredResourceLabels(Map metadat
 }
 
 public static CompressionType 
preferredCompressionType(List acceptedCompressionTypes) {
-// TODO: Support compression in client telemetry.
+if (acceptedCompressionTypes != null && 
!acceptedCompressionTypes.isEmpty()) {
+// Broker is providing the compression types in order of 
preference. Grab the
+// first one.
+return acceptedCompressionTypes.get(0);
+}
 return CompressionType.NONE;
 }
 
 public static ByteBuffer compress(byte[] raw, CompressionType 
compressionType) {
-// TODO: Support compression in client telemetry.
-if (compressionType == CompressionType.NONE) {
-return ByteBuffer.wrap(raw);
-} else {
-throw new UnsupportedOperationException("Compression is not 
supported");
+try {
+try (ByteBufferOutputStream compressedOut = new 
ByteBufferOutputStream(512)) {
+try (OutputStream out = 
compressionType.wrapForOutput(compressedOut, RecordBatch.CURRENT_MAGIC_VALUE)) {
+out.write(raw);
+out.flush();
+}
+compressedOut.buffer().flip();
+return ByteBuffer.wrap(Utils.toArray(compressedOut.buffer()));

Review Comment:
   Equivalent to below code, the line makes sure only filled bytes are returned 
from the buffer.
   
   ```
   ByteBuffer compressedBuffer = 
ByteBuffer.allocate(bufferOutputStream.buffer().remaining());
   compressedBuffer.put(bufferOutputStream.buffer()).flip();
   return compressedBuffer;



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org