Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #15148: URL: https://github.com/apache/kafka/pull/15148#discussion_r1532971164 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java: ## @@ -175,16 +182,44 @@ public static boolean validateRequiredResourceLabels(Map metadat } public static CompressionType preferredCompressionType(List acceptedCompressionTypes) { -// TODO: Support compression in client telemetry. +if (acceptedCompressionTypes != null && !acceptedCompressionTypes.isEmpty()) { +// Broker is providing the compression types in order of preference. Grab the +// first one. +return acceptedCompressionTypes.get(0); +} return CompressionType.NONE; } public static ByteBuffer compress(byte[] raw, CompressionType compressionType) { -// TODO: Support compression in client telemetry. -if (compressionType == CompressionType.NONE) { -return ByteBuffer.wrap(raw); -} else { -throw new UnsupportedOperationException("Compression is not supported"); +try { +try (ByteBufferOutputStream compressedOut = new ByteBufferOutputStream(512)) { +try (OutputStream out = compressionType.wrapForOutput(compressedOut, RecordBatch.CURRENT_MAGIC_VALUE)) { +out.write(raw); +out.flush(); +} +compressedOut.buffer().flip(); +return ByteBuffer.wrap(Utils.toArray(compressedOut.buffer())); +} +} catch (IOException e) { +throw new KafkaException("Failed to compress metrics data", e); +} +} + +public static ByteBuffer decompress(byte[] metrics, CompressionType compressionType) { +ByteBuffer data = ByteBuffer.wrap(metrics); +try (InputStream in = compressionType.wrapForInput(data, RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create()); +ByteArrayOutputStream out = new ByteArrayOutputStream()) { + +byte[] bytes = new byte[data.capacity() * 2]; +int nRead; +while ((nRead = in.read(bytes, 0, bytes.length)) != -1) { +out.write(bytes, 0, nRead); +} + +out.flush(); +return ByteBuffer.wrap(out.toByteArray()); Review Comment: @chia7712 Sounds good, can you please create an improvement jira for myself, I ll address that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]
chia7712 commented on code in PR #15148: URL: https://github.com/apache/kafka/pull/15148#discussion_r1531889679 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java: ## @@ -175,16 +182,44 @@ public static boolean validateRequiredResourceLabels(Map metadat } public static CompressionType preferredCompressionType(List acceptedCompressionTypes) { -// TODO: Support compression in client telemetry. +if (acceptedCompressionTypes != null && !acceptedCompressionTypes.isEmpty()) { +// Broker is providing the compression types in order of preference. Grab the +// first one. +return acceptedCompressionTypes.get(0); +} return CompressionType.NONE; } public static ByteBuffer compress(byte[] raw, CompressionType compressionType) { -// TODO: Support compression in client telemetry. -if (compressionType == CompressionType.NONE) { -return ByteBuffer.wrap(raw); -} else { -throw new UnsupportedOperationException("Compression is not supported"); +try { +try (ByteBufferOutputStream compressedOut = new ByteBufferOutputStream(512)) { +try (OutputStream out = compressionType.wrapForOutput(compressedOut, RecordBatch.CURRENT_MAGIC_VALUE)) { +out.write(raw); +out.flush(); +} +compressedOut.buffer().flip(); +return ByteBuffer.wrap(Utils.toArray(compressedOut.buffer())); +} +} catch (IOException e) { +throw new KafkaException("Failed to compress metrics data", e); +} +} + +public static ByteBuffer decompress(byte[] metrics, CompressionType compressionType) { +ByteBuffer data = ByteBuffer.wrap(metrics); +try (InputStream in = compressionType.wrapForInput(data, RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create()); +ByteArrayOutputStream out = new ByteArrayOutputStream()) { + +byte[] bytes = new byte[data.capacity() * 2]; +int nRead; +while ((nRead = in.read(bytes, 0, bytes.length)) != -1) { +out.write(bytes, 0, nRead); +} + +out.flush(); +return ByteBuffer.wrap(out.toByteArray()); Review Comment: hi @apoorvmittal10 I have a question: Is it worth using `ByteBufferOutputStream` to replace `ByteArrayOutputStream`? We can avoid the array copy by taking buffer from `ByteBufferOutputStream` directly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]
mjsax merged PR #15148: URL: https://github.com/apache/kafka/pull/15148 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]
mjsax commented on code in PR #15148: URL: https://github.com/apache/kafka/pull/15148#discussion_r1456522509 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java: ## @@ -715,15 +716,22 @@ private Optional> createPushRequest(ClientTelemetrySubscription local } CompressionType compressionType = ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes()); -ByteBuffer buffer = ClientTelemetryUtils.compress(payload, compressionType); +byte[] compressedPayload; +try { +compressedPayload = ClientTelemetryUtils.compress(payload, compressionType); +} catch (IOException e) { +log.info("Failed to compress telemetry payload for compression: {}, sending uncompressed data", compressionType); Review Comment: Fine with me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]
philipnee commented on code in PR #15148: URL: https://github.com/apache/kafka/pull/15148#discussion_r1453824378 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java: ## @@ -715,15 +716,22 @@ private Optional> createPushRequest(ClientTelemetrySubscription local } CompressionType compressionType = ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes()); -ByteBuffer buffer = ClientTelemetryUtils.compress(payload, compressionType); +byte[] compressedPayload; +try { +compressedPayload = ClientTelemetryUtils.compress(payload, compressionType); +} catch (IOException e) { +log.info("Failed to compress telemetry payload for compression: {}, sending uncompressed data", compressionType); Review Comment: thanks Apoorv, i'm ok with info level logging - wdyt @mjsax? i mentioned logging error to debug because it seems like the caught exception is not being logged anywhere - i was wondering if we should at least somewhat log that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #15148: URL: https://github.com/apache/kafka/pull/15148#discussion_r1453672248 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java: ## @@ -715,15 +716,22 @@ private Optional> createPushRequest(ClientTelemetrySubscription local } CompressionType compressionType = ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes()); -ByteBuffer buffer = ClientTelemetryUtils.compress(payload, compressionType); +byte[] compressedPayload; +try { +compressedPayload = ClientTelemetryUtils.compress(payload, compressionType); +} catch (IOException e) { +log.info("Failed to compress telemetry payload for compression: {}, sending uncompressed data", compressionType); Review Comment: Make sense @philipnee, as user cannot fix it hence I avoided WARN or ERROR. DEBUG would just make errors on client go unnoticed hence avoided that too. I wanted to have a log which can tell that something can be fixed in clients if we see this log line. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]
philipnee commented on code in PR #15148: URL: https://github.com/apache/kafka/pull/15148#discussion_r1453662602 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java: ## @@ -715,15 +716,22 @@ private Optional> createPushRequest(ClientTelemetrySubscription local } CompressionType compressionType = ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes()); -ByteBuffer buffer = ClientTelemetryUtils.compress(payload, compressionType); +byte[] compressedPayload; +try { +compressedPayload = ClientTelemetryUtils.compress(payload, compressionType); +} catch (IOException e) { +log.info("Failed to compress telemetry payload for compression: {}, sending uncompressed data", compressionType); Review Comment: I think WARN or INFO is sufficient as it is not necessary a fatal path. How much should user care about getting uncompressed data? If they should try to fix it maybe deserves a WARN level logging. If this is a potential issue to the system then WARN is better. Also, Should we log error to DEBUG log? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #15148: URL: https://github.com/apache/kafka/pull/15148#discussion_r1452404874 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java: ## @@ -715,15 +716,22 @@ private Optional> createPushRequest(ClientTelemetrySubscription local } CompressionType compressionType = ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes()); -ByteBuffer buffer = ClientTelemetryUtils.compress(payload, compressionType); +byte[] compressedPayload; +try { +compressedPayload = ClientTelemetryUtils.compress(payload, compressionType); +} catch (IOException e) { +log.info("Failed to compress telemetry payload for compression: {}, sending uncompressed data", compressionType); Review Comment: cc: @AndrewJSchofield -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #15148: URL: https://github.com/apache/kafka/pull/15148#discussion_r1451062702 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java: ## @@ -715,15 +716,22 @@ private Optional> createPushRequest(ClientTelemetrySubscription local } CompressionType compressionType = ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes()); -ByteBuffer buffer = ClientTelemetryUtils.compress(payload, compressionType); +byte[] compressedPayload; +try { +compressedPayload = ClientTelemetryUtils.compress(payload, compressionType); +} catch (IOException e) { +log.info("Failed to compress telemetry payload for compression: {}, sending uncompressed data", compressionType); Review Comment: I think generally clients log such internal handling errors in INFO else end application user sees that as an issue (if they see logs in WARN or ERROR), that's my understanding. Maybe @philipnee @kirktrue can better address this. I don't think there will be other scenarios where we fall back to uncompressed data but client metrics is something where the compression is not defined by the end user rather sent by server and automatically picked by client. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #15148: URL: https://github.com/apache/kafka/pull/15148#discussion_r1451062702 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java: ## @@ -715,15 +716,22 @@ private Optional> createPushRequest(ClientTelemetrySubscription local } CompressionType compressionType = ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes()); -ByteBuffer buffer = ClientTelemetryUtils.compress(payload, compressionType); +byte[] compressedPayload; +try { +compressedPayload = ClientTelemetryUtils.compress(payload, compressionType); +} catch (IOException e) { +log.info("Failed to compress telemetry payload for compression: {}, sending uncompressed data", compressionType); Review Comment: I think generally clients log such issues in INFO else customer opens the escalation issue where if they see logs in WARN or ERROR, that's my understanding. Maybe @philipnee @kirktrue can better address this. I don't think there will be other scenarios where we fall back to uncompressed but client metrics is something where the compression is not defined by the user rather sent by server and automatically picked by client. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #15148: URL: https://github.com/apache/kafka/pull/15148#discussion_r1451062702 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java: ## @@ -715,15 +716,22 @@ private Optional> createPushRequest(ClientTelemetrySubscription local } CompressionType compressionType = ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes()); -ByteBuffer buffer = ClientTelemetryUtils.compress(payload, compressionType); +byte[] compressedPayload; +try { +compressedPayload = ClientTelemetryUtils.compress(payload, compressionType); +} catch (IOException e) { +log.info("Failed to compress telemetry payload for compression: {}, sending uncompressed data", compressionType); Review Comment: I think generally clients log such issues in INFO else customer opens the escalation issue where he sees logs in WARN or ERROR, that's my understanding. Maybe @philipnee @kirktrue can better address this. I don't think there will be other scenarios where we fall back to uncompressed but client metrics is something where the compression is not defined by the user rather sent by server and automatically picked by client. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]
mjsax commented on code in PR #15148: URL: https://github.com/apache/kafka/pull/15148#discussion_r1451046445 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java: ## @@ -715,15 +716,22 @@ private Optional> createPushRequest(ClientTelemetrySubscription local } CompressionType compressionType = ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes()); -ByteBuffer buffer = ClientTelemetryUtils.compress(payload, compressionType); +byte[] compressedPayload; +try { +compressedPayload = ClientTelemetryUtils.compress(payload, compressionType); +} catch (IOException e) { +log.info("Failed to compress telemetry payload for compression: {}, sending uncompressed data", compressionType); Review Comment: Should this be WARN-level (or maybe even ERROR) -- Do we have existing similar cases elsewhere in client for which we fall back to uncompressed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]
mjsax commented on code in PR #15148: URL: https://github.com/apache/kafka/pull/15148#discussion_r1446041813 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java: ## @@ -175,16 +182,42 @@ public static boolean validateRequiredResourceLabels(Map metadat } public static CompressionType preferredCompressionType(List acceptedCompressionTypes) { -// TODO: Support compression in client telemetry. +if (acceptedCompressionTypes != null && !acceptedCompressionTypes.isEmpty()) { +// Broker is providing the compression types in order of preference. Grab the +// first one. +return acceptedCompressionTypes.get(0); +} return CompressionType.NONE; } public static ByteBuffer compress(byte[] raw, CompressionType compressionType) { -// TODO: Support compression in client telemetry. -if (compressionType == CompressionType.NONE) { -return ByteBuffer.wrap(raw); -} else { -throw new UnsupportedOperationException("Compression is not supported"); +try (ByteBufferOutputStream compressedOut = new ByteBufferOutputStream(512)) { +try (OutputStream out = compressionType.wrapForOutput(compressedOut, RecordBatch.CURRENT_MAGIC_VALUE)) { +out.write(raw); +out.flush(); +} +compressedOut.buffer().flip(); +return ByteBuffer.wrap(Utils.toArray(compressedOut.buffer())); +} catch (IOException e) { +throw new KafkaException("Failed to compress metrics data", e); Review Comment: Is it intentional to crash for this case? Or should we send data uncompressed if anything goes wrong? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]
apoorvmittal10 commented on PR #15148: URL: https://github.com/apache/kafka/pull/15148#issuecomment-1888052029 @mjsax Can you please take a re-look, I have addressed the comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]
philipnee commented on code in PR #15148: URL: https://github.com/apache/kafka/pull/15148#discussion_r1446637026 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java: ## @@ -175,16 +182,40 @@ public static boolean validateRequiredResourceLabels(Map metadat } public static CompressionType preferredCompressionType(List acceptedCompressionTypes) { -// TODO: Support compression in client telemetry. +if (acceptedCompressionTypes != null && !acceptedCompressionTypes.isEmpty()) { +// Broker is providing the compression types in order of preference. Grab the +// first one. +return acceptedCompressionTypes.get(0); Review Comment: thanks for the clarification. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #15148: URL: https://github.com/apache/kafka/pull/15148#discussion_r1446625255 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java: ## @@ -175,16 +182,40 @@ public static boolean validateRequiredResourceLabels(Map metadat } public static CompressionType preferredCompressionType(List acceptedCompressionTypes) { -// TODO: Support compression in client telemetry. +if (acceptedCompressionTypes != null && !acceptedCompressionTypes.isEmpty()) { +// Broker is providing the compression types in order of preference. Grab the +// first one. +return acceptedCompressionTypes.get(0); Review Comment: Thanks for looking at PR Philip. Never for Java client as we support all compression types in java client. Below is what KIP says ` The broker will return a prioritized list of supported compression types in the GetTelemetrySubscriptionsResponse.AcceptedCompressionTypes array, the client is free to pick any supported compression type but should pick the first mutually supported type in the returned list. If the AcceptedCompressionTypes array is empty the client must send metrics uncompressed. The default compression types list as returned from the broker should be: ZStd, LZ4, GZip, Snappy.` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]
philipnee commented on code in PR #15148: URL: https://github.com/apache/kafka/pull/15148#discussion_r1446620286 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java: ## @@ -175,16 +182,40 @@ public static boolean validateRequiredResourceLabels(Map metadat } public static CompressionType preferredCompressionType(List acceptedCompressionTypes) { -// TODO: Support compression in client telemetry. +if (acceptedCompressionTypes != null && !acceptedCompressionTypes.isEmpty()) { +// Broker is providing the compression types in order of preference. Grab the +// first one. +return acceptedCompressionTypes.get(0); Review Comment: out of curiousity - when would we not use the first one? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]
apoorvmittal10 commented on PR #15148: URL: https://github.com/apache/kafka/pull/15148#issuecomment-1883799949 Build passed on all environments with unrelated tests failure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #15148: URL: https://github.com/apache/kafka/pull/15148#discussion_r1446222582 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java: ## @@ -175,16 +182,42 @@ public static boolean validateRequiredResourceLabels(Map metadat } public static CompressionType preferredCompressionType(List acceptedCompressionTypes) { -// TODO: Support compression in client telemetry. +if (acceptedCompressionTypes != null && !acceptedCompressionTypes.isEmpty()) { +// Broker is providing the compression types in order of preference. Grab the +// first one. +return acceptedCompressionTypes.get(0); +} return CompressionType.NONE; } public static ByteBuffer compress(byte[] raw, CompressionType compressionType) { -// TODO: Support compression in client telemetry. -if (compressionType == CompressionType.NONE) { -return ByteBuffer.wrap(raw); -} else { -throw new UnsupportedOperationException("Compression is not supported"); +try (ByteBufferOutputStream compressedOut = new ByteBufferOutputStream(512)) { +try (OutputStream out = compressionType.wrapForOutput(compressedOut, RecordBatch.CURRENT_MAGIC_VALUE)) { +out.write(raw); +out.flush(); +} +compressedOut.buffer().flip(); +return ByteBuffer.wrap(Utils.toArray(compressedOut.buffer())); +} catch (IOException e) { +throw new KafkaException("Failed to compress metrics data", e); Review Comment: Added additional tests in ClientTelemetryReporterTest.java to validate the above scenario. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #15148: URL: https://github.com/apache/kafka/pull/15148#discussion_r1446218297 ## clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java: ## @@ -111,5 +116,20 @@ public void testValidateIntervalMsInvalid(int pushIntervalMs) { public void testPreferredCompressionType() { assertEquals(CompressionType.NONE, ClientTelemetryUtils.preferredCompressionType(Collections.emptyList())); assertEquals(CompressionType.NONE, ClientTelemetryUtils.preferredCompressionType(null)); +assertEquals(CompressionType.NONE, ClientTelemetryUtils.preferredCompressionType(Arrays.asList(CompressionType.NONE, CompressionType.GZIP))); +assertEquals(CompressionType.GZIP, ClientTelemetryUtils.preferredCompressionType(Arrays.asList(CompressionType.GZIP, CompressionType.NONE))); +} + +@ParameterizedTest +@EnumSource(CompressionType.class) +public void testCompressDecompress(CompressionType compressionType) { +byte[] testString = "test string".getBytes(StandardCharsets.UTF_8); +ByteBuffer compressed = ClientTelemetryUtils.compress(testString, compressionType); +assertNotNull(compressed); Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #15148: URL: https://github.com/apache/kafka/pull/15148#discussion_r1446217619 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java: ## @@ -175,16 +182,42 @@ public static boolean validateRequiredResourceLabels(Map metadat } public static CompressionType preferredCompressionType(List acceptedCompressionTypes) { -// TODO: Support compression in client telemetry. +if (acceptedCompressionTypes != null && !acceptedCompressionTypes.isEmpty()) { +// Broker is providing the compression types in order of preference. Grab the +// first one. +return acceptedCompressionTypes.get(0); +} return CompressionType.NONE; } public static ByteBuffer compress(byte[] raw, CompressionType compressionType) { -// TODO: Support compression in client telemetry. -if (compressionType == CompressionType.NONE) { -return ByteBuffer.wrap(raw); -} else { -throw new UnsupportedOperationException("Compression is not supported"); +try (ByteBufferOutputStream compressedOut = new ByteBufferOutputStream(512)) { +try (OutputStream out = compressionType.wrapForOutput(compressedOut, RecordBatch.CURRENT_MAGIC_VALUE)) { +out.write(raw); +out.flush(); +} +compressedOut.buffer().flip(); +return ByteBuffer.wrap(Utils.toArray(compressedOut.buffer())); +} catch (IOException e) { +throw new KafkaException("Failed to compress metrics data", e); Review Comment: I think you are right. It would be wise to send uncompressed data in case some error occurs, I have changed the code to address that. cc: @AndrewJSchofield ## clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryRequestTest.java: ## @@ -34,4 +53,65 @@ public void testGetErrorResponse() { assertEquals(Collections.singletonMap(Errors.CLUSTER_AUTHORIZATION_FAILED, 1), response.errorCounts()); } +@ParameterizedTest +@EnumSource(CompressionType.class) +public void testMetricsDataCompression(CompressionType compressionType) { +MetricsData metricsData = getMetricsData(); +PushTelemetryRequest req = getPushTelemetryRequest(metricsData, compressionType); + +ByteBuffer receivedMetricsBuffer = req.metricsData(); +assertNotNull(receivedMetricsBuffer); +assertTrue(receivedMetricsBuffer.capacity() > 0); + +MetricsData receivedData = ClientTelemetryUtils.deserializeMetricsData(receivedMetricsBuffer); +assertEquals(metricsData, receivedData); +} + +private PushTelemetryRequest getPushTelemetryRequest(MetricsData metricsData, CompressionType compressionType) { +byte[] data = metricsData.toByteArray(); +ByteBuffer metricsBuffer = ClientTelemetryUtils.compress(data, compressionType); +if (compressionType != CompressionType.NONE) { +assertTrue(metricsBuffer.array().length < data.length); +} else { +assertEquals(metricsBuffer.array().length, data.length); Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]
mjsax commented on code in PR #15148: URL: https://github.com/apache/kafka/pull/15148#discussion_r1446040488 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java: ## @@ -175,16 +182,42 @@ public static boolean validateRequiredResourceLabels(Map metadat } public static CompressionType preferredCompressionType(List acceptedCompressionTypes) { -// TODO: Support compression in client telemetry. +if (acceptedCompressionTypes != null && !acceptedCompressionTypes.isEmpty()) { +// Broker is providing the compression types in order of preference. Grab the +// first one. +return acceptedCompressionTypes.get(0); +} return CompressionType.NONE; } public static ByteBuffer compress(byte[] raw, CompressionType compressionType) { -// TODO: Support compression in client telemetry. -if (compressionType == CompressionType.NONE) { Review Comment: Why can we remove this case? It seems `compress(...)` might be called with `NONE`, so it's still a valid case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]
mjsax commented on code in PR #15148: URL: https://github.com/apache/kafka/pull/15148#discussion_r1446048966 ## clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java: ## @@ -111,5 +116,20 @@ public void testValidateIntervalMsInvalid(int pushIntervalMs) { public void testPreferredCompressionType() { assertEquals(CompressionType.NONE, ClientTelemetryUtils.preferredCompressionType(Collections.emptyList())); assertEquals(CompressionType.NONE, ClientTelemetryUtils.preferredCompressionType(null)); +assertEquals(CompressionType.NONE, ClientTelemetryUtils.preferredCompressionType(Arrays.asList(CompressionType.NONE, CompressionType.GZIP))); +assertEquals(CompressionType.GZIP, ClientTelemetryUtils.preferredCompressionType(Arrays.asList(CompressionType.GZIP, CompressionType.NONE))); +} + +@ParameterizedTest +@EnumSource(CompressionType.class) +public void testCompressDecompress(CompressionType compressionType) { +byte[] testString = "test string".getBytes(StandardCharsets.UTF_8); +ByteBuffer compressed = ClientTelemetryUtils.compress(testString, compressionType); +assertNotNull(compressed); Review Comment: Should we check if array length was reduced? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]
mjsax commented on code in PR #15148: URL: https://github.com/apache/kafka/pull/15148#discussion_r1446048539 ## clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryRequestTest.java: ## @@ -34,4 +53,65 @@ public void testGetErrorResponse() { assertEquals(Collections.singletonMap(Errors.CLUSTER_AUTHORIZATION_FAILED, 1), response.errorCounts()); } +@ParameterizedTest +@EnumSource(CompressionType.class) +public void testMetricsDataCompression(CompressionType compressionType) { +MetricsData metricsData = getMetricsData(); +PushTelemetryRequest req = getPushTelemetryRequest(metricsData, compressionType); + +ByteBuffer receivedMetricsBuffer = req.metricsData(); +assertNotNull(receivedMetricsBuffer); +assertTrue(receivedMetricsBuffer.capacity() > 0); + +MetricsData receivedData = ClientTelemetryUtils.deserializeMetricsData(receivedMetricsBuffer); +assertEquals(metricsData, receivedData); +} + +private PushTelemetryRequest getPushTelemetryRequest(MetricsData metricsData, CompressionType compressionType) { +byte[] data = metricsData.toByteArray(); +ByteBuffer metricsBuffer = ClientTelemetryUtils.compress(data, compressionType); +if (compressionType != CompressionType.NONE) { +assertTrue(metricsBuffer.array().length < data.length); +} else { +assertEquals(metricsBuffer.array().length, data.length); Review Comment: Should we check if both arrays are the same (not just their length)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]
mjsax commented on code in PR #15148: URL: https://github.com/apache/kafka/pull/15148#discussion_r1446041813 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java: ## @@ -175,16 +182,42 @@ public static boolean validateRequiredResourceLabels(Map metadat } public static CompressionType preferredCompressionType(List acceptedCompressionTypes) { -// TODO: Support compression in client telemetry. +if (acceptedCompressionTypes != null && !acceptedCompressionTypes.isEmpty()) { +// Broker is providing the compression types in order of preference. Grab the +// first one. +return acceptedCompressionTypes.get(0); +} return CompressionType.NONE; } public static ByteBuffer compress(byte[] raw, CompressionType compressionType) { -// TODO: Support compression in client telemetry. -if (compressionType == CompressionType.NONE) { -return ByteBuffer.wrap(raw); -} else { -throw new UnsupportedOperationException("Compression is not supported"); +try (ByteBufferOutputStream compressedOut = new ByteBufferOutputStream(512)) { +try (OutputStream out = compressionType.wrapForOutput(compressedOut, RecordBatch.CURRENT_MAGIC_VALUE)) { +out.write(raw); +out.flush(); +} +compressedOut.buffer().flip(); +return ByteBuffer.wrap(Utils.toArray(compressedOut.buffer())); +} catch (IOException e) { +throw new KafkaException("Failed to compress metrics data", e); Review Comment: In it intentional to crash for this case? Or should we send data uncompressed if anything goes wrong? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]
mjsax commented on code in PR #15148: URL: https://github.com/apache/kafka/pull/15148#discussion_r1446040488 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java: ## @@ -175,16 +182,42 @@ public static boolean validateRequiredResourceLabels(Map metadat } public static CompressionType preferredCompressionType(List acceptedCompressionTypes) { -// TODO: Support compression in client telemetry. +if (acceptedCompressionTypes != null && !acceptedCompressionTypes.isEmpty()) { +// Broker is providing the compression types in order of preference. Grab the +// first one. +return acceptedCompressionTypes.get(0); +} return CompressionType.NONE; } public static ByteBuffer compress(byte[] raw, CompressionType compressionType) { -// TODO: Support compression in client telemetry. -if (compressionType == CompressionType.NONE) { Review Comment: Why can we remove this case? It seems `compress(...)` might be called with `NONE`, so it's still a valid case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #15148: URL: https://github.com/apache/kafka/pull/15148#discussion_r1445983300 ## clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryRequestTest.java: ## @@ -34,4 +53,65 @@ public void testGetErrorResponse() { assertEquals(Collections.singletonMap(Errors.CLUSTER_AUTHORIZATION_FAILED, 1), response.errorCounts()); } +@ParameterizedTest +@EnumSource(CompressionType.class) +public void testMetricsDataCompressionTypeNone(CompressionType compressionType) { Review Comment: My bad, moved multiple methods to single method with parametrized test and missed the method name correction. Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]
AndrewJSchofield commented on code in PR #15148: URL: https://github.com/apache/kafka/pull/15148#discussion_r1445978961 ## clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryRequestTest.java: ## @@ -34,4 +53,65 @@ public void testGetErrorResponse() { assertEquals(Collections.singletonMap(Errors.CLUSTER_AUTHORIZATION_FAILED, 1), response.errorCounts()); } +@ParameterizedTest +@EnumSource(CompressionType.class) +public void testMetricsDataCompressionTypeNone(CompressionType compressionType) { Review Comment: It's slightly odd that it's a parameterized test, although it's `CompressionTypeNone`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]
apoorvmittal10 commented on PR #15148: URL: https://github.com/apache/kafka/pull/15148#issuecomment-1881633923 @AndrewJSchofield @mjsax @kirktrue @philipnee Please review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #15148: URL: https://github.com/apache/kafka/pull/15148#discussion_r1445133177 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java: ## @@ -175,16 +182,44 @@ public static boolean validateRequiredResourceLabels(Map metadat } public static CompressionType preferredCompressionType(List acceptedCompressionTypes) { -// TODO: Support compression in client telemetry. +if (acceptedCompressionTypes != null && !acceptedCompressionTypes.isEmpty()) { +// Broker is providing the compression types in order of preference. Grab the +// first one. +return acceptedCompressionTypes.get(0); +} return CompressionType.NONE; } public static ByteBuffer compress(byte[] raw, CompressionType compressionType) { -// TODO: Support compression in client telemetry. -if (compressionType == CompressionType.NONE) { -return ByteBuffer.wrap(raw); -} else { -throw new UnsupportedOperationException("Compression is not supported"); +try { +try (ByteBufferOutputStream compressedOut = new ByteBufferOutputStream(512)) { +try (OutputStream out = compressionType.wrapForOutput(compressedOut, RecordBatch.CURRENT_MAGIC_VALUE)) { +out.write(raw); +out.flush(); +} +compressedOut.buffer().flip(); +return ByteBuffer.wrap(Utils.toArray(compressedOut.buffer())); +} +} catch (IOException e) { +throw new KafkaException("Failed to compress metrics data", e); +} +} + +public static ByteBuffer decompress(byte[] metrics, CompressionType compressionType) { +ByteBuffer data = ByteBuffer.wrap(metrics); +try (InputStream in = compressionType.wrapForInput(data, RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create()); +ByteArrayOutputStream out = new ByteArrayOutputStream()) { + +byte[] bytes = new byte[data.capacity() * 2]; +int nRead; +while ((nRead = in.read(bytes, 0, bytes.length)) != -1) { +out.write(bytes, 0, nRead); +} + +out.flush(); +return ByteBuffer.wrap(out.toByteArray()); Review Comment: Avoided using below code from Utils as the allocation of ByteBuffer is not known and `Utils.readFully` only fills the data till buffer capacity, which means we require readjusting ByteBuffer which is similarly done above. The reason Utils.readFully is being successfully used in MemoryRecords because the uncompressed data size comes along the Produce request where buffer allocation is accurate. ``` ByteBuffer decompressedData = ByteBuffer.allocate(); Utils.readFully(in, decompressedData); return (ByteBuffer) decompressedData.flip(); -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #15148: URL: https://github.com/apache/kafka/pull/15148#discussion_r1445128354 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java: ## @@ -175,16 +182,44 @@ public static boolean validateRequiredResourceLabels(Map metadat } public static CompressionType preferredCompressionType(List acceptedCompressionTypes) { -// TODO: Support compression in client telemetry. +if (acceptedCompressionTypes != null && !acceptedCompressionTypes.isEmpty()) { +// Broker is providing the compression types in order of preference. Grab the +// first one. +return acceptedCompressionTypes.get(0); +} return CompressionType.NONE; } public static ByteBuffer compress(byte[] raw, CompressionType compressionType) { -// TODO: Support compression in client telemetry. -if (compressionType == CompressionType.NONE) { -return ByteBuffer.wrap(raw); -} else { -throw new UnsupportedOperationException("Compression is not supported"); +try { +try (ByteBufferOutputStream compressedOut = new ByteBufferOutputStream(512)) { +try (OutputStream out = compressionType.wrapForOutput(compressedOut, RecordBatch.CURRENT_MAGIC_VALUE)) { +out.write(raw); +out.flush(); +} +compressedOut.buffer().flip(); +return ByteBuffer.wrap(Utils.toArray(compressedOut.buffer())); Review Comment: Equivalent to below code, the line makes sure only filled bytes are returned from the buffer. ``` ByteBuffer compressedBuffer = ByteBuffer.allocate(bufferOutputStream.buffer().remaining()); compressedBuffer.put(bufferOutputStream.buffer()).flip(); return compressedBuffer; -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org