divijvaidya opened a new pull request, #13814:
URL: https://github.com/apache/kafka/pull/13814

   ## Background
   
   In Kafka's code, every batch of records is stored in an in-memory byte 
buffer. For compressed workload, this buffer contains data in compressed form. 
Before writing it to the log, Kafka performs some validations such as ensuring 
that offsets are monotonically increasing etc. To perform this validation, 
Kafka needs to uncompress the data stored in byte buffer.
   
   For zstd compressed batches, Kafka uses ZstdInputStreamNoFinalizer interface 
provided by the downstream zstd-jni library to perform decompression. 
   
   ZstdInputStreamNoFinalizer takes input an InputStream and provides output an 
InputStream. Since, Kafka stores the entire batch in a ByteBuffer, Kafka wraps 
the ByteBuffer into an InputStream to satisfy the input contract for 
ZstdInputStreamNoFinalizer.
   
   ## Problem
   
   ZstdInputStreamNoFinalizer is not a good fit for our use case because we 
already have the entire compressed data stored in a buffer. We don't have a 
need for an interface which takes InputStream as an input. Our requirement is 
for an interface which takes a ByteBuffer as an input and provides a stream of 
uncompressed data as output. Prior to zstd-jni 1.5.5, no such interface 
existed. Hence, we were forced to use ZstdInputStreamNoFinalizer.
   
   Usage of ZstdInputStreamNoFinalizer has the following problems:
   1. When decompression of batch is complete, we try to read another byte to 
check if the actual batch size if equal to declared batch size. This is done at 
RecordIterator#next(). This extra call to read another byte leads to a JNI call 
in existing interface.
   2. Since this interface requires input as an InputStream, we take the 
ByteBuffer containing compressed batch and convert it into a InputStream. This 
interface internally uses an intermediate buffer to read data from this 
InputStream in chunks. The chunk size is determined by underlying zstd library 
and hence, we will allocate a new buffer with very batch. This leads to the 
following transformation: ByteBuffer (compressed batch) -> InputStream 
(compressed batch) -> data copy to intermediate ByteBuffer (chunk of compressed 
batch) -> send chunk to zstd library for decompression -> refill the 
intermediate buffer by copying the data to intermediate ByteBuffer (next chunk 
of compressed batch)
   
   ## Solution
   
   I have extended an an interface in downstream library zstd-jni to suit the 
use case of Kafka. The new interface is called 
ZstdBufferDecompressingStreamNoFinalizer. It provides an interface where it 
takes input as a ByteBuffer containing compressed data and provides output as 
an InputStream. It solves the above problems as follows:
   1. When we read the final decompressed frame, this interface sets a flag to 
mark that all uncompressed data has been consumed. When RecordIterator#next() 
tries to determine if the stream has ended, we simply read the flag and hence, 
do not have to make a JNI call.
   2. It does not require any buffer allocation for input. It takes the input 
buffer and passes it across the JNI boundary without any intermediate copying. 
Hence, we don't perform any buffer allocation.
   
   ## References
   ### Changes in downstream zstd-jni
   
   Add new interface - 
   
https://github.com/luben/zstd-jni/commit/d65490e8b8aadc4b59545755e55f7dd368fe8aa5
   
   Bug fixes in new interface - 
   
https://github.com/luben/zstd-jni/commit/8bf8066438785ce55b62fc7e6816faafe1e3b39e
 
   
https://github.com/luben/zstd-jni/commit/100c434dfcec17a865ca2c2b844afe1046ce1b10
   
https://github.com/luben/zstd-jni/commit/355b8511a2967d097a619047a579930cac2ccd9d
 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to