ijuma commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1084344515


##########
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java:
##########
@@ -273,20 +272,32 @@ public int partitionLeaderEpoch() {
     public DataInputStream recordInputStream(BufferSupplier bufferSupplier) {
         final ByteBuffer buffer = this.buffer.duplicate();
         buffer.position(RECORDS_OFFSET);
-        return new DataInputStream(compressionType().wrapForInput(buffer, 
magic(), bufferSupplier));
+        final InputStream decompressedStream = 
compressionType().wrapForInput(buffer, magic(), bufferSupplier);
+        return decompressedStream instanceof DataInputStream ? 
(DataInputStream) decompressedStream : new DataInputStream(decompressedStream);
     }
 
     private CloseableIterator<Record> compressedIterator(BufferSupplier 
bufferSupplier, boolean skipKeyValue) {
         final DataInputStream inputStream = recordInputStream(bufferSupplier);
 
         if (skipKeyValue) {
             // this buffer is used to skip length delimited fields like key, 
value, headers
-            byte[] skipArray = new byte[MAX_SKIP_BUFFER_SIZE];
+            final ByteBuffer skipBuffer = 
bufferSupplier.get(compressionType().getRecommendedDOutSize());

Review Comment:
   Since we cache buffers per thread, I think you mean we will use two buffers 
instead of one per thread (for the zstd case).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to