Re: [PR] KAFKA-16397 - Use ByteBufferOutputStream to avoid array copy [kafka]
chia7712 merged PR #15589: URL: https://github.com/apache/kafka/pull/15589 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16397 - Use ByteBufferOutputStream to avoid array copy [kafka]
chiacyu commented on code in PR #15589: URL: https://github.com/apache/kafka/pull/15589#discussion_r154312 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java: ## @@ -204,16 +202,14 @@ public static byte[] compress(byte[] raw, CompressionType compressionType) throw public static ByteBuffer decompress(byte[] metrics, CompressionType compressionType) { ByteBuffer data = ByteBuffer.wrap(metrics); try (InputStream in = compressionType.wrapForInput(data, RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create()); -ByteArrayOutputStream out = new ByteArrayOutputStream()) { - + ByteBufferOutputStream out = new ByteBufferOutputStream(512)) { Review Comment: Done. Thanks for the reminder. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16397 - Use ByteBufferOutputStream to avoid array copy [kafka]
chiacyu commented on code in PR #15589: URL: https://github.com/apache/kafka/pull/15589#discussion_r1543121911 ## clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java: ## @@ -132,9 +133,9 @@ public void testCompressDecompress(CompressionType compressionType) throws IOExc } else { assertArrayEquals(testString, compressed); } - ByteBuffer decompressed = ClientTelemetryUtils.decompress(compressed, compressionType); +byte[] actualResult = Utils.toArray(decompressed); Review Comment: Done. Thanks for the reminder. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16397 - Use ByteBufferOutputStream to avoid array copy [kafka]
AndrewJSchofield commented on code in PR #15589: URL: https://github.com/apache/kafka/pull/15589#discussion_r1542082131 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java: ## @@ -204,16 +202,14 @@ public static byte[] compress(byte[] raw, CompressionType compressionType) throw public static ByteBuffer decompress(byte[] metrics, CompressionType compressionType) { ByteBuffer data = ByteBuffer.wrap(metrics); try (InputStream in = compressionType.wrapForInput(data, RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create()); -ByteArrayOutputStream out = new ByteArrayOutputStream()) { - + ByteBufferOutputStream out = new ByteBufferOutputStream(512)) { Review Comment: One tiny, tiny comment. The indentation of this line is out by 1 space. Apart from that, lgtm. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16397 - Use ByteBufferOutputStream to avoid array copy [kafka]
apoorvmittal10 commented on code in PR #15589: URL: https://github.com/apache/kafka/pull/15589#discussion_r1541573651 ## clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java: ## @@ -132,9 +133,9 @@ public void testCompressDecompress(CompressionType compressionType) throws IOExc } else { assertArrayEquals(testString, compressed); } - ByteBuffer decompressed = ClientTelemetryUtils.decompress(compressed, compressionType); +byte[] actualResult = Utils.toArray(decompressed); Review Comment: Can we please move the conversion after the `assertNotNull(decompressed);` as there is a usage of `decompressed` inside the Utils.toArray method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16397 - Use ByteBufferOutputStream to avoid array copy [kafka]
chia7712 commented on PR #15589: URL: https://github.com/apache/kafka/pull/15589#issuecomment-2021800318 @apoorvmittal10 Could you please take a look this PR if ur queue is not full :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16397 - Use ByteBufferOutputStream to avoid array copy [kafka]
chiacyu commented on code in PR #15589: URL: https://github.com/apache/kafka/pull/15589#discussion_r1539400840 ## clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java: ## @@ -132,9 +133,9 @@ public void testCompressDecompress(CompressionType compressionType) throws IOExc } else { assertArrayEquals(testString, compressed); } - ByteBuffer decompressed = ClientTelemetryUtils.decompress(compressed, compressionType); +byte[] actualResult = Utils.toArray(decompressed, testString.length); Review Comment: Got it, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16397 - Use ByteBufferOutputStream to avoid array copy [kafka]
chia7712 commented on code in PR #15589: URL: https://github.com/apache/kafka/pull/15589#discussion_r1538726557 ## clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java: ## @@ -132,9 +133,9 @@ public void testCompressDecompress(CompressionType compressionType) throws IOExc } else { assertArrayEquals(testString, compressed); } - ByteBuffer decompressed = ClientTelemetryUtils.decompress(compressed, compressionType); +byte[] actualResult = Utils.toArray(decompressed, testString.length); Review Comment: `byte[] actualResult = Utils.toArray(decompressed);` we should not set the size in order to check the `flip` (i.e the return buffer is ready to read) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16397 - Use ByteBufferOutputStream to avoid array copy [kafka]
chiacyu commented on code in PR #15589: URL: https://github.com/apache/kafka/pull/15589#discussion_r1537633908 ## clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java: ## @@ -132,9 +132,9 @@ public void testCompressDecompress(CompressionType compressionType) throws IOExc } else { assertArrayEquals(testString, compressed); } - ByteBuffer decompressed = ClientTelemetryUtils.decompress(compressed, compressionType); +byte[] actualResult = Arrays.copyOfRange(decompressed.array(), 0, testString.length); Review Comment: Sounds good. Will do. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16397 - Use ByteBufferOutputStream to avoid array copy [kafka]
chia7712 commented on code in PR #15589: URL: https://github.com/apache/kafka/pull/15589#discussion_r1537626874 ## clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java: ## @@ -132,9 +132,9 @@ public void testCompressDecompress(CompressionType compressionType) throws IOExc } else { assertArrayEquals(testString, compressed); } - ByteBuffer decompressed = ClientTelemetryUtils.decompress(compressed, compressionType); +byte[] actualResult = Arrays.copyOfRange(decompressed.array(), 0, testString.length); Review Comment: How about using `Utils.toArray`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16397 - Use ByteBufferOutputStream to avoid array copy [kafka]
chia7712 commented on code in PR #15589: URL: https://github.com/apache/kafka/pull/15589#discussion_r1536848215 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java: ## @@ -204,16 +202,14 @@ public static byte[] compress(byte[] raw, CompressionType compressionType) throw public static ByteBuffer decompress(byte[] metrics, CompressionType compressionType) { ByteBuffer data = ByteBuffer.wrap(metrics); try (InputStream in = compressionType.wrapForInput(data, RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create()); -ByteArrayOutputStream out = new ByteArrayOutputStream()) { - + ByteBufferOutputStream out = new ByteBufferOutputStream(1)) { Review Comment: Yep, we can set a larger size to initialize buffer. Maybe we can use the same size (512 bytes) as line#192 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16397 - Use ByteBufferOutputStream to avoid array copy [kafka]
brandboat commented on code in PR #15589: URL: https://github.com/apache/kafka/pull/15589#discussion_r1536846618 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java: ## @@ -204,16 +202,14 @@ public static byte[] compress(byte[] raw, CompressionType compressionType) throw public static ByteBuffer decompress(byte[] metrics, CompressionType compressionType) { ByteBuffer data = ByteBuffer.wrap(metrics); try (InputStream in = compressionType.wrapForInput(data, RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create()); -ByteArrayOutputStream out = new ByteArrayOutputStream()) { - + ByteBufferOutputStream out = new ByteBufferOutputStream(1)) { Review Comment: `1` is a little bit too small I think, to avoid too many expandBuffer invocation, maybe we can use 32 (which is the default byte array value in ByteArrayOutputStream) or 64 ? @chia7712 WDYT ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16397 - Use ByteBufferOutputStream to avoid array copy [kafka]
chiacyu commented on code in PR #15589: URL: https://github.com/apache/kafka/pull/15589#discussion_r1536831206 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java: ## @@ -203,20 +201,19 @@ public static byte[] compress(byte[] raw, CompressionType compressionType) throw public static ByteBuffer decompress(byte[] metrics, CompressionType compressionType) { ByteBuffer data = ByteBuffer.wrap(metrics); -try (InputStream in = compressionType.wrapForInput(data, RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create()); -ByteArrayOutputStream out = new ByteArrayOutputStream()) { - +try (InputStream in = compressionType.wrapForInput(data, RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create())) { byte[] bytes = new byte[data.capacity() * 2]; int nRead; while ((nRead = in.read(bytes, 0, bytes.length)) != -1) { -out.write(bytes, 0, nRead); +try (ByteBufferOutputStream out = new ByteBufferOutputStream(nRead)) { Review Comment: The helper function already comes with [unit tests](https://github.com/apache/kafka/blob/bf9a27fefdb3d93c7a510f871433c4c9e07de71a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java#L126). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16397 - Use ByteBufferOutputStream to avoid array copy [kafka]
chiacyu commented on code in PR #15589: URL: https://github.com/apache/kafka/pull/15589#discussion_r1536831097 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java: ## @@ -203,20 +201,19 @@ public static byte[] compress(byte[] raw, CompressionType compressionType) throw public static ByteBuffer decompress(byte[] metrics, CompressionType compressionType) { ByteBuffer data = ByteBuffer.wrap(metrics); -try (InputStream in = compressionType.wrapForInput(data, RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create()); -ByteArrayOutputStream out = new ByteArrayOutputStream()) { - +try (InputStream in = compressionType.wrapForInput(data, RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create())) { byte[] bytes = new byte[data.capacity() * 2]; int nRead; while ((nRead = in.read(bytes, 0, bytes.length)) != -1) { -out.write(bytes, 0, nRead); +try (ByteBufferOutputStream out = new ByteBufferOutputStream(nRead)) { +out.write(bytes, 0, nRead); +return out.buffer(); Review Comment: Done. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16397 - Use ByteBufferOutputStream to avoid array copy [kafka]
chia7712 commented on code in PR #15589: URL: https://github.com/apache/kafka/pull/15589#discussion_r1536830823 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java: ## @@ -204,17 +202,16 @@ public static byte[] compress(byte[] raw, CompressionType compressionType) throw public static ByteBuffer decompress(byte[] metrics, CompressionType compressionType) { ByteBuffer data = ByteBuffer.wrap(metrics); try (InputStream in = compressionType.wrapForInput(data, RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create()); -ByteArrayOutputStream out = new ByteArrayOutputStream()) { - -byte[] bytes = new byte[data.capacity() * 2]; -int nRead; -while ((nRead = in.read(bytes, 0, bytes.length)) != -1) { -out.write(bytes, 0, nRead); -} - -out.flush(); -return ByteBuffer.wrap(out.toByteArray()); -} catch (IOException e) { + ByteBufferOutputStream out = new ByteBufferOutputStream(1)) { +byte[] bytes = new byte[data.capacity() * 2]; Review Comment: Could you please remove unnecessary indent? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16397 - Use ByteBufferOutputStream to avoid array copy [kafka]
chiacyu commented on code in PR #15589: URL: https://github.com/apache/kafka/pull/15589#discussion_r1536789689 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java: ## @@ -203,17 +201,13 @@ public static byte[] compress(byte[] raw, CompressionType compressionType) throw public static ByteBuffer decompress(byte[] metrics, CompressionType compressionType) { ByteBuffer data = ByteBuffer.wrap(metrics); -try (InputStream in = compressionType.wrapForInput(data, RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create()); -ByteArrayOutputStream out = new ByteArrayOutputStream()) { - +try (InputStream in = compressionType.wrapForInput(data, RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create())) { byte[] bytes = new byte[data.capacity() * 2]; -int nRead; -while ((nRead = in.read(bytes, 0, bytes.length)) != -1) { +int nRead = in.read(bytes, 0, bytes.length); Review Comment: Thanks for the reminder. Since the initialization of ByteBufferOutputStream requires the capacity. I use totalReads to record the total read bytes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16397 - Use ByteBufferOutputStream to avoid array copy [kafka]
chia7712 commented on code in PR #15589: URL: https://github.com/apache/kafka/pull/15589#discussion_r1536789605 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java: ## @@ -203,17 +201,17 @@ public static byte[] compress(byte[] raw, CompressionType compressionType) throw public static ByteBuffer decompress(byte[] metrics, CompressionType compressionType) { ByteBuffer data = ByteBuffer.wrap(metrics); -try (InputStream in = compressionType.wrapForInput(data, RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create()); -ByteArrayOutputStream out = new ByteArrayOutputStream()) { - +try (InputStream in = compressionType.wrapForInput(data, RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create())) { byte[] bytes = new byte[data.capacity() * 2]; int nRead; +int totalReads = 0; while ((nRead = in.read(bytes, 0, bytes.length)) != -1) { -out.write(bytes, 0, nRead); +totalReads+=nRead; Review Comment: you don't need to do this since `ByteBufferOutputStream` can expand inner buffer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16397 - Use ByteBufferOutputStream to avoid array copy [kafka]
chia7712 commented on code in PR #15589: URL: https://github.com/apache/kafka/pull/15589#discussion_r1536789020 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java: ## @@ -203,20 +201,19 @@ public static byte[] compress(byte[] raw, CompressionType compressionType) throw public static ByteBuffer decompress(byte[] metrics, CompressionType compressionType) { ByteBuffer data = ByteBuffer.wrap(metrics); -try (InputStream in = compressionType.wrapForInput(data, RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create()); -ByteArrayOutputStream out = new ByteArrayOutputStream()) { - +try (InputStream in = compressionType.wrapForInput(data, RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create())) { byte[] bytes = new byte[data.capacity() * 2]; int nRead; while ((nRead = in.read(bytes, 0, bytes.length)) != -1) { -out.write(bytes, 0, nRead); +try (ByteBufferOutputStream out = new ByteBufferOutputStream(nRead)) { Review Comment: Could we use single try-catch to release both `in` and `out`? ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java: ## @@ -203,20 +201,19 @@ public static byte[] compress(byte[] raw, CompressionType compressionType) throw public static ByteBuffer decompress(byte[] metrics, CompressionType compressionType) { ByteBuffer data = ByteBuffer.wrap(metrics); -try (InputStream in = compressionType.wrapForInput(data, RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create()); -ByteArrayOutputStream out = new ByteArrayOutputStream()) { - +try (InputStream in = compressionType.wrapForInput(data, RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create())) { byte[] bytes = new byte[data.capacity() * 2]; int nRead; while ((nRead = in.read(bytes, 0, bytes.length)) != -1) { -out.write(bytes, 0, nRead); +try (ByteBufferOutputStream out = new ByteBufferOutputStream(nRead)) { +out.write(bytes, 0, nRead); +return out.buffer(); Review Comment: should we call `flip` before returning it?? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16397 - Use ByteBufferOutputStream to avoid array copy [kafka]
chia7712 commented on code in PR #15589: URL: https://github.com/apache/kafka/pull/15589#discussion_r1536784932 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java: ## @@ -203,17 +201,13 @@ public static byte[] compress(byte[] raw, CompressionType compressionType) throw public static ByteBuffer decompress(byte[] metrics, CompressionType compressionType) { ByteBuffer data = ByteBuffer.wrap(metrics); -try (InputStream in = compressionType.wrapForInput(data, RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create()); -ByteArrayOutputStream out = new ByteArrayOutputStream()) { - +try (InputStream in = compressionType.wrapForInput(data, RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create())) { byte[] bytes = new byte[data.capacity() * 2]; -int nRead; -while ((nRead = in.read(bytes, 0, bytes.length)) != -1) { +int nRead = in.read(bytes, 0, bytes.length); Review Comment: we can't remove the while loop since the `InputStream#read` does NOT guarantee to read ALL data at once. Please take a look at API spec: https://docs.oracle.com/javase/8/docs/api/java/io/InputStream.html#read-byte:A-int-int- -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org