divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1200352336


##########
clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java:
##########
@@ -356,164 +346,100 @@ private static DefaultRecord readFrom(ByteBuffer buffer,
                 throw new InvalidRecordException("Invalid record size: 
expected to read " + sizeOfBodyInBytes +
                         " bytes in record payload, but instead read " + 
(buffer.position() - recordStart));
 
-            return new DefaultRecord(sizeInBytes, attributes, offset, 
timestamp, sequence, key, value, headers);
+            int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + 
sizeOfBodyInBytes;
+            return new DefaultRecord(totalSizeInBytes, attributes, offset, 
timestamp, sequence, key, value, headers);
         } catch (BufferUnderflowException | IllegalArgumentException e) {
             throw new InvalidRecordException("Found invalid record structure", 
e);
         }
     }
 
-    public static PartialDefaultRecord readPartiallyFrom(DataInput input,
-                                                         byte[] skipArray,
+    public static PartialDefaultRecord readPartiallyFrom(InputStream input,
                                                          long baseOffset,
                                                          long baseTimestamp,
                                                          int baseSequence,
                                                          Long logAppendTime) 
throws IOException {
         int sizeOfBodyInBytes = ByteUtils.readVarint(input);
         int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + 
sizeOfBodyInBytes;
 
-        return readPartiallyFrom(input, skipArray, totalSizeInBytes, 
sizeOfBodyInBytes, baseOffset, baseTimestamp,
+        return readPartiallyFrom(input, totalSizeInBytes, baseOffset, 
baseTimestamp,
             baseSequence, logAppendTime);
     }
 
-    private static PartialDefaultRecord readPartiallyFrom(DataInput input,
-                                                          byte[] skipArray,
+    private static PartialDefaultRecord readPartiallyFrom(InputStream input,
                                                           int sizeInBytes,
-                                                          int 
sizeOfBodyInBytes,
                                                           long baseOffset,
                                                           long baseTimestamp,
                                                           int baseSequence,
                                                           Long logAppendTime) 
throws IOException {
-        ByteBuffer skipBuffer = ByteBuffer.wrap(skipArray);
-        // set its limit to 0 to indicate no bytes readable yet
-        skipBuffer.limit(0);
-
         try {
-            // reading the attributes / timestamp / offset and key-size does 
not require
-            // any byte array allocation and therefore we can just read them 
straight-forwardly
-            IntRef bytesRemaining = PrimitiveRef.ofInt(sizeOfBodyInBytes);
-
-            byte attributes = readByte(skipBuffer, input, bytesRemaining);
-            long timestampDelta = readVarLong(skipBuffer, input, 
bytesRemaining);
+            byte attributes = (byte) input.read();
+            long timestampDelta = ByteUtils.readVarlong(input);
             long timestamp = baseTimestamp + timestampDelta;
             if (logAppendTime != null)
                 timestamp = logAppendTime;
 
-            int offsetDelta = readVarInt(skipBuffer, input, bytesRemaining);
+            int offsetDelta = ByteUtils.readVarint(input);
             long offset = baseOffset + offsetDelta;
             int sequence = baseSequence >= 0 ?
                 DefaultRecordBatch.incrementSequence(baseSequence, 
offsetDelta) :
                 RecordBatch.NO_SEQUENCE;
 
-            // first skip key
-            int keySize = skipLengthDelimitedField(skipBuffer, input, 
bytesRemaining);
+            // skip key
+            int keySize = ByteUtils.readVarint(input);
+            skipBytes(input, keySize);
 
-            // then skip value
-            int valueSize = skipLengthDelimitedField(skipBuffer, input, 
bytesRemaining);
+            // skip value
+            int valueSize = ByteUtils.readVarint(input);
+            skipBytes(input, valueSize);
 
-            // then skip header
-            int numHeaders = readVarInt(skipBuffer, input, bytesRemaining);
+            // skip header
+            int numHeaders = ByteUtils.readVarint(input);
             if (numHeaders < 0)
                 throw new InvalidRecordException("Found invalid number of 
record headers " + numHeaders);
             for (int i = 0; i < numHeaders; i++) {
-                int headerKeySize = skipLengthDelimitedField(skipBuffer, 
input, bytesRemaining);
+                int headerKeySize = ByteUtils.readVarint(input);
                 if (headerKeySize < 0)
                     throw new InvalidRecordException("Invalid negative header 
key size " + headerKeySize);
+                skipBytes(input, headerKeySize);
 
                 // headerValueSize
-                skipLengthDelimitedField(skipBuffer, input, bytesRemaining);
+                int headerValueSize = ByteUtils.readVarint(input);
+                skipBytes(input, headerValueSize);
             }
 
-            if (bytesRemaining.value > 0 || skipBuffer.remaining() > 0)
-                throw new InvalidRecordException("Invalid record size: 
expected to read " + sizeOfBodyInBytes +
-                    " bytes in record payload, but there are still bytes 
remaining");
-
             return new PartialDefaultRecord(sizeInBytes, attributes, offset, 
timestamp, sequence, keySize, valueSize);
         } catch (BufferUnderflowException | IllegalArgumentException e) {
             throw new InvalidRecordException("Found invalid record structure", 
e);
         }
     }
 
-    private static byte readByte(ByteBuffer buffer, DataInput input, IntRef 
bytesRemaining) throws IOException {
-        if (buffer.remaining() < 1 && bytesRemaining.value > 0) {
-            readMore(buffer, input, bytesRemaining);
-        }
-
-        return buffer.get();
-    }
-
-    private static long readVarLong(ByteBuffer buffer, DataInput input, IntRef 
bytesRemaining) throws IOException {
-        if (buffer.remaining() < 10 && bytesRemaining.value > 0) {
-            readMore(buffer, input, bytesRemaining);
-        }
-
-        return ByteUtils.readVarlong(buffer);
-    }
-
-    private static int readVarInt(ByteBuffer buffer, DataInput input, IntRef 
bytesRemaining) throws IOException {
-        if (buffer.remaining() < 5 && bytesRemaining.value > 0) {
-            readMore(buffer, input, bytesRemaining);
-        }
-
-        return ByteUtils.readVarint(buffer);
-    }
-
-    private static int skipLengthDelimitedField(ByteBuffer buffer, DataInput 
input, IntRef bytesRemaining) throws IOException {
-        boolean needMore = false;
-        int sizeInBytes = -1;
-        int bytesToSkip = -1;
-
-        while (true) {
-            if (needMore) {
-                readMore(buffer, input, bytesRemaining);
-                needMore = false;
-            }
-
-            if (bytesToSkip < 0) {
-                if (buffer.remaining() < 5 && bytesRemaining.value > 0) {
-                    needMore = true;
-                } else {
-                    sizeInBytes = ByteUtils.readVarint(buffer);
-                    if (sizeInBytes <= 0)
-                        return sizeInBytes;
-                    else
-                        bytesToSkip = sizeInBytes;
 
+    /**
+     * Skips n bytes from the data input.

Review Comment:
   Ah! Thanks for catching. Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to