divijvaidya commented on PR #13135: URL: https://github.com/apache/kafka/pull/13135#issuecomment-1521852820
This is ready for review. A summary of the changes is provided below. **On the server:** 1. This PR starts using buffer pools to allocate intermediate buffer which is used by the stream that converts compressed to uncompressed data. This is achieved by using new `ChunkedBytesStream` instead of `BufferedInputStream` for ZSTD & GZIP. For LZ4 and SNAPPY, which weren't using `BufferedInputStream`, this is a no op (see point for 2 for changes to them). The impact of allocation on Zstd can be observed from the [before](https://issues.apache.org/jira/secure/attachment/13057480/flamegraph-trunk-heapalloc-before.html) & [after](https://issues.apache.org/jira/secure/attachment/13057479/flamegraph-pr-heapalloc-after.html) object allocation flamegraph linked to the [JIRA](https://issues.apache.org/jira/browse/KAFKA-14633). Please observe how in the *after* flamegraph, the contribution of allocation by `validateMessagesAndAssignOffsets` and decreased drastically from 39% to 5%. 2. This PR reduces the number of buffer pools used during decompression from 2 to 1. Earlier we created a "skip buffer" of size 2KB for ALL compression algorithms and another intermediate buffer created by `BufferedInputStream` for some of the compression algorithms (ZSTD & GZIP). This PR uses the same intermediate buffer for ZSTD & GZIP, hence reducing the number of allocations to 1 (instead of 2). For LZ4 and SNAPPY, the number of allocations remain same but the 2KB skip buffer is allocated from the buffer pool now. 3. The skip implementation for some compression algorithms allocated new buffers. As an example, skip implementation of ZSTD-JNI allocates new buffer of different size (from buffer pool) on every skip invocation. This PR uses the intermediate buffer to perform skip instead of pushing it to down to ZSTD-JNI. The impact of the above two changes on throughput is observed by `RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize`. You will notice 20-70% improvement there. (see attached benchmark sheet in description) **On the consumer:** The change 1 remains same for consumer and changes 2 & 3 does not impact consumer since it doesn't use a "skip" iterator. The impact of the above two changes on consumer throughput is observed by `RecordBatchIterationBenchmark. measureStreamingIteratorForVariableBatchSize` (note that this a different benchmark that was specified for server, this one doesn't use skipIterator). You will notice mix bag of single digit regression for some compression type to 10-50% improvement for Zstd. The reason that we don't see equivalent gains in consumer is because it copies all uncompressed data in a single buffer and then reads off it. We have not reduced any buffer allocation for consumer scenario(since change 2 & 3 aren't applicable to consumers). There are other optimizations that we can perform for consumer listed below but they are out of scope for this PR. **Future optimisations (out of scope of this PR)** 1. For non-skip iterators (used by consumers), we currently allocate intermediate buffer for decompression and then allocate another buffer for storing key & value. The flow looks like: uncompressed data => intermediate buffer => inputStream => recordByteBuffer. This can be improved to uncompressed data => recordByteBuffer, and hence, we would allocate only 1 byte buffer. 2. We need to revisit whether we require a skipBuffer for LZ4 and SNAPPY. In the current PR, we wanted to maintain parity with legacy implementation, hence a 2KB intermediate buffer in ChunkedBytesStream is used for them but it could potentially be removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org