divijvaidya commented on code in PR #13135: URL: https://github.com/apache/kafka/pull/13135#discussion_r1200434460
########## clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java: ########## @@ -356,164 +346,100 @@ private static DefaultRecord readFrom(ByteBuffer buffer, throw new InvalidRecordException("Invalid record size: expected to read " + sizeOfBodyInBytes + " bytes in record payload, but instead read " + (buffer.position() - recordStart)); - return new DefaultRecord(sizeInBytes, attributes, offset, timestamp, sequence, key, value, headers); + int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes; + return new DefaultRecord(totalSizeInBytes, attributes, offset, timestamp, sequence, key, value, headers); } catch (BufferUnderflowException | IllegalArgumentException e) { throw new InvalidRecordException("Found invalid record structure", e); } } - public static PartialDefaultRecord readPartiallyFrom(DataInput input, - byte[] skipArray, + public static PartialDefaultRecord readPartiallyFrom(InputStream input, long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) throws IOException { int sizeOfBodyInBytes = ByteUtils.readVarint(input); int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes; - return readPartiallyFrom(input, skipArray, totalSizeInBytes, sizeOfBodyInBytes, baseOffset, baseTimestamp, + return readPartiallyFrom(input, totalSizeInBytes, baseOffset, baseTimestamp, baseSequence, logAppendTime); } - private static PartialDefaultRecord readPartiallyFrom(DataInput input, - byte[] skipArray, + private static PartialDefaultRecord readPartiallyFrom(InputStream input, int sizeInBytes, - int sizeOfBodyInBytes, long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) throws IOException { - ByteBuffer skipBuffer = ByteBuffer.wrap(skipArray); - // set its limit to 0 to indicate no bytes readable yet - skipBuffer.limit(0); - try { - // reading the attributes / timestamp / offset and key-size does not require - // any byte array allocation and therefore we can just read them straight-forwardly - IntRef bytesRemaining = PrimitiveRef.ofInt(sizeOfBodyInBytes); - - byte attributes = readByte(skipBuffer, input, bytesRemaining); - long timestampDelta = readVarLong(skipBuffer, input, bytesRemaining); + byte attributes = (byte) input.read(); + long timestampDelta = ByteUtils.readVarlong(input); long timestamp = baseTimestamp + timestampDelta; if (logAppendTime != null) timestamp = logAppendTime; - int offsetDelta = readVarInt(skipBuffer, input, bytesRemaining); + int offsetDelta = ByteUtils.readVarint(input); long offset = baseOffset + offsetDelta; int sequence = baseSequence >= 0 ? DefaultRecordBatch.incrementSequence(baseSequence, offsetDelta) : RecordBatch.NO_SEQUENCE; - // first skip key - int keySize = skipLengthDelimitedField(skipBuffer, input, bytesRemaining); + // skip key + int keySize = ByteUtils.readVarint(input); + skipBytes(input, keySize); - // then skip value - int valueSize = skipLengthDelimitedField(skipBuffer, input, bytesRemaining); + // skip value + int valueSize = ByteUtils.readVarint(input); + skipBytes(input, valueSize); - // then skip header - int numHeaders = readVarInt(skipBuffer, input, bytesRemaining); + // skip header + int numHeaders = ByteUtils.readVarint(input); if (numHeaders < 0) throw new InvalidRecordException("Found invalid number of record headers " + numHeaders); for (int i = 0; i < numHeaders; i++) { - int headerKeySize = skipLengthDelimitedField(skipBuffer, input, bytesRemaining); + int headerKeySize = ByteUtils.readVarint(input); if (headerKeySize < 0) throw new InvalidRecordException("Invalid negative header key size " + headerKeySize); + skipBytes(input, headerKeySize); // headerValueSize - skipLengthDelimitedField(skipBuffer, input, bytesRemaining); + int headerValueSize = ByteUtils.readVarint(input); + skipBytes(input, headerValueSize); } - if (bytesRemaining.value > 0 || skipBuffer.remaining() > 0) - throw new InvalidRecordException("Invalid record size: expected to read " + sizeOfBodyInBytes + - " bytes in record payload, but there are still bytes remaining"); - return new PartialDefaultRecord(sizeInBytes, attributes, offset, timestamp, sequence, keySize, valueSize); } catch (BufferUnderflowException | IllegalArgumentException e) { throw new InvalidRecordException("Found invalid record structure", e); } } - private static byte readByte(ByteBuffer buffer, DataInput input, IntRef bytesRemaining) throws IOException { - if (buffer.remaining() < 1 && bytesRemaining.value > 0) { - readMore(buffer, input, bytesRemaining); - } - - return buffer.get(); - } - - private static long readVarLong(ByteBuffer buffer, DataInput input, IntRef bytesRemaining) throws IOException { - if (buffer.remaining() < 10 && bytesRemaining.value > 0) { - readMore(buffer, input, bytesRemaining); - } - - return ByteUtils.readVarlong(buffer); - } - - private static int readVarInt(ByteBuffer buffer, DataInput input, IntRef bytesRemaining) throws IOException { - if (buffer.remaining() < 5 && bytesRemaining.value > 0) { - readMore(buffer, input, bytesRemaining); - } - - return ByteUtils.readVarint(buffer); - } - - private static int skipLengthDelimitedField(ByteBuffer buffer, DataInput input, IntRef bytesRemaining) throws IOException { - boolean needMore = false; - int sizeInBytes = -1; - int bytesToSkip = -1; - - while (true) { - if (needMore) { - readMore(buffer, input, bytesRemaining); - needMore = false; - } - - if (bytesToSkip < 0) { - if (buffer.remaining() < 5 && bytesRemaining.value > 0) { - needMore = true; - } else { - sizeInBytes = ByteUtils.readVarint(buffer); - if (sizeInBytes <= 0) - return sizeInBytes; - else - bytesToSkip = sizeInBytes; + /** + * Skips n bytes from the data input. + * + * No-op for case where bytesToSkip <= 0. This could occur for cases where field is expected to be null. + * @throws InvalidRecordException if the number of bytes could not be skipped. + */ + private static void skipBytes(InputStream in, int bytesToSkip) throws IOException { + if (bytesToSkip <= 0) return; + + // Starting JDK 12, this implementation could be replaced by InputStream#skipNBytes + while (bytesToSkip > 0) { + long ns = in.skip(bytesToSkip); + if (ns > 0 && ns <= bytesToSkip) { + // adjust number to skip + bytesToSkip -= ns; + } else if (ns == 0) { // no bytes skipped + // read one byte to check for EOS + if (in.read() == -1) { Review Comment: First, as per the interface contract of InputStream#skip, it is possible that it returns smaller number of bytes than expected even if bytes are available to be skipped. That is why we iterate over the skip() multiple times in this loop. Hence, we keep calling skip() until we reach end of file, since any other case (0 or positive) is expected as per the contract. Also, note that the implementation of this loop is same as InputStream#skipNBytes introduced in JDK 12. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org