divijvaidya commented on code in PR #13135: URL: https://github.com/apache/kafka/pull/13135#discussion_r1200352336
########## clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java: ########## @@ -356,164 +346,100 @@ private static DefaultRecord readFrom(ByteBuffer buffer, throw new InvalidRecordException("Invalid record size: expected to read " + sizeOfBodyInBytes + " bytes in record payload, but instead read " + (buffer.position() - recordStart)); - return new DefaultRecord(sizeInBytes, attributes, offset, timestamp, sequence, key, value, headers); + int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes; + return new DefaultRecord(totalSizeInBytes, attributes, offset, timestamp, sequence, key, value, headers); } catch (BufferUnderflowException | IllegalArgumentException e) { throw new InvalidRecordException("Found invalid record structure", e); } } - public static PartialDefaultRecord readPartiallyFrom(DataInput input, - byte[] skipArray, + public static PartialDefaultRecord readPartiallyFrom(InputStream input, long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) throws IOException { int sizeOfBodyInBytes = ByteUtils.readVarint(input); int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes; - return readPartiallyFrom(input, skipArray, totalSizeInBytes, sizeOfBodyInBytes, baseOffset, baseTimestamp, + return readPartiallyFrom(input, totalSizeInBytes, baseOffset, baseTimestamp, baseSequence, logAppendTime); } - private static PartialDefaultRecord readPartiallyFrom(DataInput input, - byte[] skipArray, + private static PartialDefaultRecord readPartiallyFrom(InputStream input, int sizeInBytes, - int sizeOfBodyInBytes, long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) throws IOException { - ByteBuffer skipBuffer = ByteBuffer.wrap(skipArray); - // set its limit to 0 to indicate no bytes readable yet - skipBuffer.limit(0); - try { - // reading the attributes / timestamp / offset and key-size does not require - // any byte array allocation and therefore we can just read them straight-forwardly - IntRef bytesRemaining = PrimitiveRef.ofInt(sizeOfBodyInBytes); - - byte attributes = readByte(skipBuffer, input, bytesRemaining); - long timestampDelta = readVarLong(skipBuffer, input, bytesRemaining); + byte attributes = (byte) input.read(); + long timestampDelta = ByteUtils.readVarlong(input); long timestamp = baseTimestamp + timestampDelta; if (logAppendTime != null) timestamp = logAppendTime; - int offsetDelta = readVarInt(skipBuffer, input, bytesRemaining); + int offsetDelta = ByteUtils.readVarint(input); long offset = baseOffset + offsetDelta; int sequence = baseSequence >= 0 ? DefaultRecordBatch.incrementSequence(baseSequence, offsetDelta) : RecordBatch.NO_SEQUENCE; - // first skip key - int keySize = skipLengthDelimitedField(skipBuffer, input, bytesRemaining); + // skip key + int keySize = ByteUtils.readVarint(input); + skipBytes(input, keySize); - // then skip value - int valueSize = skipLengthDelimitedField(skipBuffer, input, bytesRemaining); + // skip value + int valueSize = ByteUtils.readVarint(input); + skipBytes(input, valueSize); - // then skip header - int numHeaders = readVarInt(skipBuffer, input, bytesRemaining); + // skip header + int numHeaders = ByteUtils.readVarint(input); if (numHeaders < 0) throw new InvalidRecordException("Found invalid number of record headers " + numHeaders); for (int i = 0; i < numHeaders; i++) { - int headerKeySize = skipLengthDelimitedField(skipBuffer, input, bytesRemaining); + int headerKeySize = ByteUtils.readVarint(input); if (headerKeySize < 0) throw new InvalidRecordException("Invalid negative header key size " + headerKeySize); + skipBytes(input, headerKeySize); // headerValueSize - skipLengthDelimitedField(skipBuffer, input, bytesRemaining); + int headerValueSize = ByteUtils.readVarint(input); + skipBytes(input, headerValueSize); } - if (bytesRemaining.value > 0 || skipBuffer.remaining() > 0) - throw new InvalidRecordException("Invalid record size: expected to read " + sizeOfBodyInBytes + - " bytes in record payload, but there are still bytes remaining"); - return new PartialDefaultRecord(sizeInBytes, attributes, offset, timestamp, sequence, keySize, valueSize); } catch (BufferUnderflowException | IllegalArgumentException e) { throw new InvalidRecordException("Found invalid record structure", e); } } - private static byte readByte(ByteBuffer buffer, DataInput input, IntRef bytesRemaining) throws IOException { - if (buffer.remaining() < 1 && bytesRemaining.value > 0) { - readMore(buffer, input, bytesRemaining); - } - - return buffer.get(); - } - - private static long readVarLong(ByteBuffer buffer, DataInput input, IntRef bytesRemaining) throws IOException { - if (buffer.remaining() < 10 && bytesRemaining.value > 0) { - readMore(buffer, input, bytesRemaining); - } - - return ByteUtils.readVarlong(buffer); - } - - private static int readVarInt(ByteBuffer buffer, DataInput input, IntRef bytesRemaining) throws IOException { - if (buffer.remaining() < 5 && bytesRemaining.value > 0) { - readMore(buffer, input, bytesRemaining); - } - - return ByteUtils.readVarint(buffer); - } - - private static int skipLengthDelimitedField(ByteBuffer buffer, DataInput input, IntRef bytesRemaining) throws IOException { - boolean needMore = false; - int sizeInBytes = -1; - int bytesToSkip = -1; - - while (true) { - if (needMore) { - readMore(buffer, input, bytesRemaining); - needMore = false; - } - - if (bytesToSkip < 0) { - if (buffer.remaining() < 5 && bytesRemaining.value > 0) { - needMore = true; - } else { - sizeInBytes = ByteUtils.readVarint(buffer); - if (sizeInBytes <= 0) - return sizeInBytes; - else - bytesToSkip = sizeInBytes; + /** + * Skips n bytes from the data input. Review Comment: Ah! Thanks for catching. Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org