divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1200434460


##########
clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java:
##########
@@ -356,164 +346,100 @@ private static DefaultRecord readFrom(ByteBuffer buffer,
                 throw new InvalidRecordException("Invalid record size: 
expected to read " + sizeOfBodyInBytes +
                         " bytes in record payload, but instead read " + 
(buffer.position() - recordStart));
 
-            return new DefaultRecord(sizeInBytes, attributes, offset, 
timestamp, sequence, key, value, headers);
+            int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + 
sizeOfBodyInBytes;
+            return new DefaultRecord(totalSizeInBytes, attributes, offset, 
timestamp, sequence, key, value, headers);
         } catch (BufferUnderflowException | IllegalArgumentException e) {
             throw new InvalidRecordException("Found invalid record structure", 
e);
         }
     }
 
-    public static PartialDefaultRecord readPartiallyFrom(DataInput input,
-                                                         byte[] skipArray,
+    public static PartialDefaultRecord readPartiallyFrom(InputStream input,
                                                          long baseOffset,
                                                          long baseTimestamp,
                                                          int baseSequence,
                                                          Long logAppendTime) 
throws IOException {
         int sizeOfBodyInBytes = ByteUtils.readVarint(input);
         int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + 
sizeOfBodyInBytes;
 
-        return readPartiallyFrom(input, skipArray, totalSizeInBytes, 
sizeOfBodyInBytes, baseOffset, baseTimestamp,
+        return readPartiallyFrom(input, totalSizeInBytes, baseOffset, 
baseTimestamp,
             baseSequence, logAppendTime);
     }
 
-    private static PartialDefaultRecord readPartiallyFrom(DataInput input,
-                                                          byte[] skipArray,
+    private static PartialDefaultRecord readPartiallyFrom(InputStream input,
                                                           int sizeInBytes,
-                                                          int 
sizeOfBodyInBytes,
                                                           long baseOffset,
                                                           long baseTimestamp,
                                                           int baseSequence,
                                                           Long logAppendTime) 
throws IOException {
-        ByteBuffer skipBuffer = ByteBuffer.wrap(skipArray);
-        // set its limit to 0 to indicate no bytes readable yet
-        skipBuffer.limit(0);
-
         try {
-            // reading the attributes / timestamp / offset and key-size does 
not require
-            // any byte array allocation and therefore we can just read them 
straight-forwardly
-            IntRef bytesRemaining = PrimitiveRef.ofInt(sizeOfBodyInBytes);
-
-            byte attributes = readByte(skipBuffer, input, bytesRemaining);
-            long timestampDelta = readVarLong(skipBuffer, input, 
bytesRemaining);
+            byte attributes = (byte) input.read();
+            long timestampDelta = ByteUtils.readVarlong(input);
             long timestamp = baseTimestamp + timestampDelta;
             if (logAppendTime != null)
                 timestamp = logAppendTime;
 
-            int offsetDelta = readVarInt(skipBuffer, input, bytesRemaining);
+            int offsetDelta = ByteUtils.readVarint(input);
             long offset = baseOffset + offsetDelta;
             int sequence = baseSequence >= 0 ?
                 DefaultRecordBatch.incrementSequence(baseSequence, 
offsetDelta) :
                 RecordBatch.NO_SEQUENCE;
 
-            // first skip key
-            int keySize = skipLengthDelimitedField(skipBuffer, input, 
bytesRemaining);
+            // skip key
+            int keySize = ByteUtils.readVarint(input);
+            skipBytes(input, keySize);
 
-            // then skip value
-            int valueSize = skipLengthDelimitedField(skipBuffer, input, 
bytesRemaining);
+            // skip value
+            int valueSize = ByteUtils.readVarint(input);
+            skipBytes(input, valueSize);
 
-            // then skip header
-            int numHeaders = readVarInt(skipBuffer, input, bytesRemaining);
+            // skip header
+            int numHeaders = ByteUtils.readVarint(input);
             if (numHeaders < 0)
                 throw new InvalidRecordException("Found invalid number of 
record headers " + numHeaders);
             for (int i = 0; i < numHeaders; i++) {
-                int headerKeySize = skipLengthDelimitedField(skipBuffer, 
input, bytesRemaining);
+                int headerKeySize = ByteUtils.readVarint(input);
                 if (headerKeySize < 0)
                     throw new InvalidRecordException("Invalid negative header 
key size " + headerKeySize);
+                skipBytes(input, headerKeySize);
 
                 // headerValueSize
-                skipLengthDelimitedField(skipBuffer, input, bytesRemaining);
+                int headerValueSize = ByteUtils.readVarint(input);
+                skipBytes(input, headerValueSize);
             }
 
-            if (bytesRemaining.value > 0 || skipBuffer.remaining() > 0)
-                throw new InvalidRecordException("Invalid record size: 
expected to read " + sizeOfBodyInBytes +
-                    " bytes in record payload, but there are still bytes 
remaining");
-
             return new PartialDefaultRecord(sizeInBytes, attributes, offset, 
timestamp, sequence, keySize, valueSize);
         } catch (BufferUnderflowException | IllegalArgumentException e) {
             throw new InvalidRecordException("Found invalid record structure", 
e);
         }
     }
 
-    private static byte readByte(ByteBuffer buffer, DataInput input, IntRef 
bytesRemaining) throws IOException {
-        if (buffer.remaining() < 1 && bytesRemaining.value > 0) {
-            readMore(buffer, input, bytesRemaining);
-        }
-
-        return buffer.get();
-    }
-
-    private static long readVarLong(ByteBuffer buffer, DataInput input, IntRef 
bytesRemaining) throws IOException {
-        if (buffer.remaining() < 10 && bytesRemaining.value > 0) {
-            readMore(buffer, input, bytesRemaining);
-        }
-
-        return ByteUtils.readVarlong(buffer);
-    }
-
-    private static int readVarInt(ByteBuffer buffer, DataInput input, IntRef 
bytesRemaining) throws IOException {
-        if (buffer.remaining() < 5 && bytesRemaining.value > 0) {
-            readMore(buffer, input, bytesRemaining);
-        }
-
-        return ByteUtils.readVarint(buffer);
-    }
-
-    private static int skipLengthDelimitedField(ByteBuffer buffer, DataInput 
input, IntRef bytesRemaining) throws IOException {
-        boolean needMore = false;
-        int sizeInBytes = -1;
-        int bytesToSkip = -1;
-
-        while (true) {
-            if (needMore) {
-                readMore(buffer, input, bytesRemaining);
-                needMore = false;
-            }
-
-            if (bytesToSkip < 0) {
-                if (buffer.remaining() < 5 && bytesRemaining.value > 0) {
-                    needMore = true;
-                } else {
-                    sizeInBytes = ByteUtils.readVarint(buffer);
-                    if (sizeInBytes <= 0)
-                        return sizeInBytes;
-                    else
-                        bytesToSkip = sizeInBytes;
 
+    /**
+     * Skips n bytes from the data input.
+     *
+     * No-op for case where bytesToSkip <= 0. This could occur for cases where 
field is expected to be null.
+     * @throws  InvalidRecordException if the number of bytes could not be 
skipped.
+     */
+    private static void skipBytes(InputStream in, int bytesToSkip) throws 
IOException {
+        if (bytesToSkip <= 0) return;
+
+        // Starting JDK 12, this implementation could be replaced by 
InputStream#skipNBytes
+        while (bytesToSkip > 0) {
+            long ns = in.skip(bytesToSkip);
+            if (ns > 0 && ns <= bytesToSkip) {
+                // adjust number to skip
+                bytesToSkip -= ns;
+            } else if (ns == 0) { // no bytes skipped
+                // read one byte to check for EOS
+                if (in.read() == -1) {

Review Comment:
   First, as per the interface contract of InputStream#skip, it is possible 
that it returns smaller number of bytes than expected even if bytes are 
available to be skipped. That is why we iterate over the skip() multiple times 
in this loop. Hence, we keep calling skip() until we reach end of file, since 
any other case (0 or positive) is expected as per the contract.
   
   Also, note that the implementation of this loop is same as 
InputStream#skipNBytes introduced in JDK 12.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to