[GitHub] [kafka] divijvaidya commented on pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression
divijvaidya commented on PR #13135: URL: https://github.com/apache/kafka/pull/13135#issuecomment-1561407539 Failing tests are unrelated: ``` Build / JDK 11 and Scala 2.13 / [1] quorum=kraft, isIdempotenceEnabled=true – kafka.api.SaslPlainSslEndToEndAuthorizationTest 14s Build / JDK 8 and Scala 2.12 / testReplicateSourceDefault() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest 1m 42s Build / JDK 8 and Scala 2.12 / testReplicateSourceDefault() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest 1m 43s Build / JDK 8 and Scala 2.12 / testConnectorBoundary – org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest 57s Build / JDK 8 and Scala 2.12 / shouldFollowLeaderEpochBasicWorkflow() – kafka.server.epoch.EpochDrivenReplicationProtocolAcceptanceWithIbp26Test 12s Build / JDK 8 and Scala 2.12 / shouldAddNamedTopologyToRunningApplicationWithMultipleNodes() – org.apache.kafka.streams.integration.NamedTopologyIntegrationTest 10s Build / JDK 8 and Scala 2.12 / testTaskRequestWithOldStartMsGetsUpdated() – org.apache.kafka.trogdor.coordinator.CoordinatorTest ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression
divijvaidya commented on PR #13135: URL: https://github.com/apache/kafka/pull/13135#issuecomment-1521852820 This is ready for review. A summary of the changes is provided below. **On the server:** 1. This PR starts using buffer pools to allocate intermediate buffer which is used by the stream that converts compressed to uncompressed data. This is achieved by using new `ChunkedBytesStream` instead of `BufferedInputStream` for ZSTD & GZIP. For LZ4 and SNAPPY, which weren't using `BufferedInputStream`, this is a no op (see point for 2 for changes to them). The impact of allocation on Zstd can be observed from the [before](https://issues.apache.org/jira/secure/attachment/13057480/flamegraph-trunk-heapalloc-before.html) & [after](https://issues.apache.org/jira/secure/attachment/13057479/flamegraph-pr-heapalloc-after.html) object allocation flamegraph linked to the [JIRA](https://issues.apache.org/jira/browse/KAFKA-14633). Please observe how in the *after* flamegraph, the contribution of allocation by `validateMessagesAndAssignOffsets` and decreased drastically from 39% to 5%. 2. This PR reduces the number of buffer pools used during decompression from 2 to 1. Earlier we created a "skip buffer" of size 2KB for ALL compression algorithms and another intermediate buffer created by `BufferedInputStream` for some of the compression algorithms (ZSTD & GZIP). This PR uses the same intermediate buffer for ZSTD & GZIP, hence reducing the number of allocations to 1 (instead of 2). For LZ4 and SNAPPY, the number of allocations remain same but the 2KB skip buffer is allocated from the buffer pool now. 3. The skip implementation for some compression algorithms allocated new buffers. As an example, skip implementation of ZSTD-JNI allocates new buffer of different size (from buffer pool) on every skip invocation. This PR uses the intermediate buffer to perform skip instead of pushing it to down to ZSTD-JNI. The impact of the above two changes on throughput is observed by `RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize`. You will notice 20-70% improvement there. (see attached benchmark sheet in description) **On the consumer:** The change 1 remains same for consumer and changes 2 & 3 does not impact consumer since it doesn't use a "skip" iterator. The impact of the above two changes on consumer throughput is observed by `RecordBatchIterationBenchmark. measureStreamingIteratorForVariableBatchSize` (note that this a different benchmark that was specified for server, this one doesn't use skipIterator). You will notice mix bag of single digit regression for some compression type to 10-50% improvement for Zstd. The reason that we don't see equivalent gains in consumer is because it copies all uncompressed data in a single buffer and then reads off it. We have not reduced any buffer allocation for consumer scenario(since change 2 & 3 aren't applicable to consumers). There are other optimizations that we can perform for consumer listed below but they are out of scope for this PR. **Future optimisations (out of scope of this PR)** 1. For non-skip iterators (used by consumers), we currently allocate intermediate buffer for decompression and then allocate another buffer for storing key & value. The flow looks like: uncompressed data => intermediate buffer => inputStream => recordByteBuffer. This can be improved to uncompressed data => recordByteBuffer, and hence, we would allocate only 1 byte buffer. 2. We need to revisit whether we require a skipBuffer for LZ4 and SNAPPY. In the current PR, we wanted to maintain parity with legacy implementation, hence a 2KB intermediate buffer in ChunkedBytesStream is used for them but it could potentially be removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression
divijvaidya commented on PR #13135: URL: https://github.com/apache/kafka/pull/13135#issuecomment-1497651824 Thank you @ijuma for your review. I will address the comments in next one week since I am on vacation for next few days. > Was this done? Yes, consumer performance impact was benchmarked using `RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize` (the non-skip API used by the clients) and the results were added to the Results section in the Summary at the top. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression
divijvaidya commented on PR #13135: URL: https://github.com/apache/kafka/pull/13135#issuecomment-1428471245 @ijuma I have updated this PR by extracting out an interface for the new streams. Please take a look when you get a chance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression
divijvaidya commented on PR #13135: URL: https://github.com/apache/kafka/pull/13135#issuecomment-1400407941 TODO (will update PR in a short while) - 1. Add benchmark for case when batch contains single 10 byte message 2. Test consumer performance -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression
divijvaidya commented on PR #13135: URL: https://github.com/apache/kafka/pull/13135#issuecomment-1398428117 @ijuma please review when you get a chance since you already have context about this code change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org