[GitHub] [kafka] divijvaidya commented on pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

2023-05-24 Thread via GitHub


divijvaidya commented on PR #13135:
URL: https://github.com/apache/kafka/pull/13135#issuecomment-1561407539

   Failing tests are unrelated:
   ```
   Build / JDK 11 and Scala 2.13 / [1] quorum=kraft, isIdempotenceEnabled=true 
– kafka.api.SaslPlainSslEndToEndAuthorizationTest
   14s
   Build / JDK 8 and Scala 2.12 / testReplicateSourceDefault() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest
   1m 42s
   Build / JDK 8 and Scala 2.12 / testReplicateSourceDefault() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest
   1m 43s
   Build / JDK 8 and Scala 2.12 / testConnectorBoundary – 
org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest
   57s
   Build / JDK 8 and Scala 2.12 / shouldFollowLeaderEpochBasicWorkflow() – 
kafka.server.epoch.EpochDrivenReplicationProtocolAcceptanceWithIbp26Test
   12s
   Build / JDK 8 and Scala 2.12 / 
shouldAddNamedTopologyToRunningApplicationWithMultipleNodes() – 
org.apache.kafka.streams.integration.NamedTopologyIntegrationTest
   10s
   Build / JDK 8 and Scala 2.12 / testTaskRequestWithOldStartMsGetsUpdated() – 
org.apache.kafka.trogdor.coordinator.CoordinatorTest
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

2023-04-25 Thread via GitHub


divijvaidya commented on PR #13135:
URL: https://github.com/apache/kafka/pull/13135#issuecomment-1521852820

   This is ready for review. A summary of the changes is provided below.
   
   **On the server:**
   1. This PR starts using buffer pools to allocate intermediate buffer which 
is used by the stream that converts compressed to uncompressed data. This is 
achieved by using new `ChunkedBytesStream` instead of `BufferedInputStream` for 
ZSTD & GZIP. For LZ4 and SNAPPY, which weren't using `BufferedInputStream`, 
this is a no op (see point for 2 for changes to them). The impact of allocation 
on Zstd can be observed from the 
[before](https://issues.apache.org/jira/secure/attachment/13057480/flamegraph-trunk-heapalloc-before.html)
 & 
[after](https://issues.apache.org/jira/secure/attachment/13057479/flamegraph-pr-heapalloc-after.html)
 object allocation flamegraph linked to the 
[JIRA](https://issues.apache.org/jira/browse/KAFKA-14633). Please observe how 
in the *after* flamegraph, the contribution of allocation by 
`validateMessagesAndAssignOffsets` and decreased drastically from 39% to 5%.
   2. This PR reduces the number of buffer pools used during decompression from 
2 to 1. Earlier we created a "skip buffer" of size 2KB for ALL compression 
algorithms and another intermediate buffer created by `BufferedInputStream` for 
some of the compression algorithms (ZSTD & GZIP). This PR uses the same 
intermediate buffer for ZSTD & GZIP, hence reducing the number of allocations 
to 1 (instead of 2). For LZ4 and SNAPPY, the number of allocations remain same 
but the 2KB skip buffer is allocated from the buffer pool now.
   3. The skip implementation for some compression algorithms allocated new 
buffers. As an example, skip implementation of ZSTD-JNI allocates new buffer of 
different size (from buffer pool) on every skip invocation. This PR uses the 
intermediate buffer to perform skip instead of pushing it to down to ZSTD-JNI. 
   
   The impact of the above two changes on throughput is observed by 
`RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize`. You 
will notice 20-70% improvement there. (see attached benchmark sheet in 
description)
   
   **On the consumer:**
   The change 1 remains same for consumer and changes 2 & 3 does not impact 
consumer since it doesn't use a "skip" iterator.
   
   The impact of the above two changes on consumer  throughput is observed by 
`RecordBatchIterationBenchmark. measureStreamingIteratorForVariableBatchSize` 
(note that this a different benchmark that was specified for server, this one 
doesn't use skipIterator). You will notice mix bag of single digit regression 
for some compression type to 10-50% improvement for Zstd. The reason that we 
don't see equivalent gains in consumer is because it copies all uncompressed 
data in a single buffer and then reads off it. We have not reduced any buffer 
allocation for consumer scenario(since change 2 & 3 aren't applicable to 
consumers). There are other optimizations that we can perform for consumer 
listed below but they are out of scope for this PR.
   
   **Future optimisations (out of scope of this PR)**
   1. For non-skip iterators (used by consumers), we currently allocate 
intermediate buffer for decompression and then allocate another buffer for 
storing key & value. The flow looks like: uncompressed data => intermediate 
buffer => inputStream => recordByteBuffer. This can be improved to uncompressed 
data => recordByteBuffer, and hence, we would allocate only 1 byte buffer.
   2. We need to revisit whether we require a skipBuffer for LZ4 and SNAPPY. In 
the current PR, we wanted to maintain parity with legacy implementation, hence 
a 2KB intermediate buffer in ChunkedBytesStream is used for them but it could 
potentially be removed.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

2023-04-05 Thread via GitHub


divijvaidya commented on PR #13135:
URL: https://github.com/apache/kafka/pull/13135#issuecomment-1497651824

   Thank you @ijuma for your review. I will address the comments in next one 
week since I am on vacation for next few days.
   
   > Was this done?
   
   Yes, consumer performance impact was benchmarked using 
`RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize` 
(the non-skip API used by the clients) and the results were added to the 
Results section in the Summary at the top.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

2023-02-13 Thread via GitHub


divijvaidya commented on PR #13135:
URL: https://github.com/apache/kafka/pull/13135#issuecomment-1428471245

   @ijuma I have updated this PR by extracting out an interface for the new 
streams. Please take a look when you get a chance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

2023-01-23 Thread via GitHub


divijvaidya commented on PR #13135:
URL: https://github.com/apache/kafka/pull/13135#issuecomment-1400407941

   TODO (will update PR in a short while) - 
   
   1. Add benchmark for case when batch contains single 10 byte message
   2. Test consumer performance
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

2023-01-20 Thread GitBox


divijvaidya commented on PR #13135:
URL: https://github.com/apache/kafka/pull/13135#issuecomment-1398428117

   @ijuma please review when you get a chance since you already have context 
about this code change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org