mjsax commented on code in PR #15148: URL: https://github.com/apache/kafka/pull/15148#discussion_r1446041813
########## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java: ########## @@ -175,16 +182,42 @@ public static boolean validateRequiredResourceLabels(Map<String, String> metadat } public static CompressionType preferredCompressionType(List<CompressionType> acceptedCompressionTypes) { - // TODO: Support compression in client telemetry. + if (acceptedCompressionTypes != null && !acceptedCompressionTypes.isEmpty()) { + // Broker is providing the compression types in order of preference. Grab the + // first one. + return acceptedCompressionTypes.get(0); + } return CompressionType.NONE; } public static ByteBuffer compress(byte[] raw, CompressionType compressionType) { - // TODO: Support compression in client telemetry. - if (compressionType == CompressionType.NONE) { - return ByteBuffer.wrap(raw); - } else { - throw new UnsupportedOperationException("Compression is not supported"); + try (ByteBufferOutputStream compressedOut = new ByteBufferOutputStream(512)) { + try (OutputStream out = compressionType.wrapForOutput(compressedOut, RecordBatch.CURRENT_MAGIC_VALUE)) { + out.write(raw); + out.flush(); + } + compressedOut.buffer().flip(); + return ByteBuffer.wrap(Utils.toArray(compressedOut.buffer())); + } catch (IOException e) { + throw new KafkaException("Failed to compress metrics data", e); Review Comment: In it intentional to crash for this case? Or should we send data uncompressed if anything goes wrong? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org