divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1100085805
##########
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##########
@@ -108,12 +117,18 @@ public OutputStream wrapForOutput(ByteBufferOutputStream
buffer, byte messageVer
@Override
public InputStream wrapForInput(ByteBuffer inputBuffer, byte
messageVersion, BufferSupplier decompressionBufferSupplier) {
try {
- return new KafkaLZ4BlockInputStream(inputBuffer,
decompressionBufferSupplier,
- messageVersion ==
RecordBatch.MAGIC_VALUE_V0);
+ return new ChunkedDataInputStream(
Review Comment:
Note to reviewer:
We use `ChunkedDataInputStream` here instead of
`SkippableChunkedDataInputStream` because we wish to push down the `skip()`
implementation to KafkaLZ4BlockInputStream since it is optimized (unlike
`ZstdInputStream#skip()` where we have to use our custom implementation of skip
in `SkippableChunkedDataInputStream`)
##########
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java:
##########
@@ -270,30 +270,28 @@ public int partitionLeaderEpoch() {
return buffer.getInt(PARTITION_LEADER_EPOCH_OFFSET);
}
- public DataInputStream recordInputStream(BufferSupplier bufferSupplier) {
+ public InputStream recordInputStream(BufferSupplier bufferSupplier) {
final ByteBuffer buffer = this.buffer.duplicate();
buffer.position(RECORDS_OFFSET);
- return new DataInputStream(compressionType().wrapForInput(buffer,
magic(), bufferSupplier));
+ InputStream is = compressionType().wrapForInput(buffer, magic(),
bufferSupplier);
+ return (is instanceof DataInput) ? is : new DataInputStream(is);
}
private CloseableIterator<Record> compressedIterator(BufferSupplier
bufferSupplier, boolean skipKeyValue) {
- final DataInputStream inputStream = recordInputStream(bufferSupplier);
+ final InputStream inputStream = recordInputStream(bufferSupplier);
if (skipKeyValue) {
- // this buffer is used to skip length delimited fields like key,
value, headers
- byte[] skipArray = new byte[MAX_SKIP_BUFFER_SIZE];
Review Comment:
Note to reviewers
Removal of this intermediate buffer affects all compression types. For Zstd
and the ones which were already using an intermediate buffer, this is an
optimisation since we reduce buffer allocation and data copy. For the ones
which weren't using an intermediate buffer, we started using intermediate
buffer of size 2KB (similar to this) using ChunkedDataInputStream
##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java:
##########
@@ -103,6 +104,12 @@ public void init() {
}
}
+ @TearDown
+ public void cleanUp() {
+ if (requestLocal != null)
+ requestLocal.close();
Review Comment:
Note to reviewers
This benchmark has a flaw. We use `@Setup` and `@TearDown` at the default
level of `Level.Trial`, which means this method will be executed before/after
each run of the benchmark. A benchmark may be executed using multiple threads
and hence, it is not guaranteed that each invocation of the benchmark will use
the same BufferPool.
I will fix this in a separate PR. This does not have negative impact on the
results, at the worst, it will downplay the affect of re-using buffers in the
code.
##########
clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java:
##########
@@ -363,157 +361,79 @@ private static DefaultRecord readFrom(ByteBuffer buffer,
}
public static PartialDefaultRecord readPartiallyFrom(DataInput input,
- byte[] skipArray,
long baseOffset,
long baseTimestamp,
int baseSequence,
Long logAppendTime)
throws IOException {
int sizeOfBodyInBytes = ByteUtils.readVarint(input);
int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) +
sizeOfBodyInBytes;
- return readPartiallyFrom(input, skipArray, totalSizeInBytes,
sizeOfBodyInBytes, baseOffset, baseTimestamp,
+ return readPartiallyFrom(input, totalSizeInBytes, baseOffset,
baseTimestamp,
baseSequence, logAppendTime);
}
private static PartialDefaultRecord readPartiallyFrom(DataInput input,
- byte[] skipArray,
int sizeInBytes,
- int
sizeOfBodyInBytes,
long baseOffset,
long baseTimestamp,
int baseSequence,
Long logAppendTime)
throws IOException {
- ByteBuffer skipBuffer = ByteBuffer.wrap(skipArray);
- // set its limit to 0 to indicate no bytes readable yet
- skipBuffer.limit(0);
-
try {
- // reading the attributes / timestamp / offset and key-size does
not require
- // any byte array allocation and therefore we can just read them
straight-forwardly
- IntRef bytesRemaining = PrimitiveRef.ofInt(sizeOfBodyInBytes);
-
- byte attributes = readByte(skipBuffer, input, bytesRemaining);
- long timestampDelta = readVarLong(skipBuffer, input,
bytesRemaining);
+ byte attributes = input.readByte();
+ long timestampDelta = ByteUtils.readVarlong(input);
long timestamp = baseTimestamp + timestampDelta;
if (logAppendTime != null)
timestamp = logAppendTime;
- int offsetDelta = readVarInt(skipBuffer, input, bytesRemaining);
+ int offsetDelta = ByteUtils.readVarint(input);
long offset = baseOffset + offsetDelta;
int sequence = baseSequence >= 0 ?
DefaultRecordBatch.incrementSequence(baseSequence,
offsetDelta) :
RecordBatch.NO_SEQUENCE;
- // first skip key
- int keySize = skipLengthDelimitedField(skipBuffer, input,
bytesRemaining);
+ // skip key
+ int keySize = ByteUtils.readVarint(input);
+ skipBytes(input, keySize);
- // then skip value
- int valueSize = skipLengthDelimitedField(skipBuffer, input,
bytesRemaining);
+ // skip value
+ int valueSize = ByteUtils.readVarint(input);
+ skipBytes(input, valueSize);
- // then skip header
- int numHeaders = readVarInt(skipBuffer, input, bytesRemaining);
+ // skip header
+ int numHeaders = ByteUtils.readVarint(input);
if (numHeaders < 0)
throw new InvalidRecordException("Found invalid number of
record headers " + numHeaders);
for (int i = 0; i < numHeaders; i++) {
- int headerKeySize = skipLengthDelimitedField(skipBuffer,
input, bytesRemaining);
+ int headerKeySize = ByteUtils.readVarint(input);
if (headerKeySize < 0)
throw new InvalidRecordException("Invalid negative header
key size " + headerKeySize);
+ skipBytes(input, headerKeySize);
// headerValueSize
- skipLengthDelimitedField(skipBuffer, input, bytesRemaining);
+ int headerValueSize = ByteUtils.readVarint(input);
+ skipBytes(input, headerValueSize);
}
- if (bytesRemaining.value > 0 || skipBuffer.remaining() > 0)
- throw new InvalidRecordException("Invalid record size:
expected to read " + sizeOfBodyInBytes +
- " bytes in record payload, but there are still bytes
remaining");
Review Comment:
Note for reviewer:
It is ok to remove this validation because we perform a similar validation
at the end of every record. see
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java#L608
##########
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##########
@@ -108,12 +117,18 @@ public OutputStream wrapForOutput(ByteBufferOutputStream
buffer, byte messageVer
@Override
public InputStream wrapForInput(ByteBuffer inputBuffer, byte
messageVersion, BufferSupplier decompressionBufferSupplier) {
try {
- return new KafkaLZ4BlockInputStream(inputBuffer,
decompressionBufferSupplier,
- messageVersion ==
RecordBatch.MAGIC_VALUE_V0);
+ return new ChunkedDataInputStream(
+ new KafkaLZ4BlockInputStream(inputBuffer,
decompressionBufferSupplier, messageVersion == RecordBatch.MAGIC_VALUE_V0),
+ decompressionBufferSupplier, getRecommendedDOutSize());
} catch (Throwable e) {
throw new KafkaException(e);
}
}
+
+ @Override
+ public int getRecommendedDOutSize() {
+ return 2 * 1024; // 2KB
Review Comment:
The size of 2KB is based on the size of `skipArray` which has now been
removed in this PR.
##########
clients/src/main/java/org/apache/kafka/common/utils/ByteBufferInputStream.java:
##########
@@ -29,6 +30,11 @@ public ByteBufferInputStream(ByteBuffer buffer) {
this.buffer = buffer;
}
+ @Override
+ public int available() throws IOException {
Review Comment:
Note to reviewer
This was a bug. Prior to this, `available()` calls `InputStream`
`available()` which always returns 0. This impacts the cases where we may rely
on value of available() to determine end of input.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]