[ 
https://issues.apache.org/jira/browse/KAFKA-15057?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josep Prat updated KAFKA-15057:
-------------------------------
    Fix Version/s: 3.9.0
                       (was: 3.8.0)

> Use new interface ZstdBufferDecompressingStreamNoFinalizer from zstd-jni
> ------------------------------------------------------------------------
>
>                 Key: KAFKA-15057
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15057
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>    Affects Versions: 3.6.0
>            Reporter: Divij Vaidya
>            Assignee: Divij Vaidya
>            Priority: Major
>             Fix For: 3.9.0
>
>         Attachments: zstd-upgrade.xlsx
>
>
> h1. Background
> In Kafka's code, every batch of records is stored in a in-memory byte buffer. 
> For compressed workload, this buffer contains data in compressed form. Before 
> writing it to the log, Kafka performs some validations such as ensuring that 
> offsets are monotonically increasing etc. To perform this validation, Kafka 
> needs to uncompress the data stored in byte buffer.
> For zstd compressed batches, Kafka uses ZstdInputStreamNoFinalizer interface 
> provided by the downstream zstd-jni library to perform decompression. 
> ZstdInputStreamNoFinalizer takes input an InputStream and provides output an 
> InputStream. Since, Kafka stores the entire batch in a ByteBuffer, Kafka 
> wraps the ByteBuffer into an InputStream to satisfy the input contract for 
> ZstdInputStreamNoFinalizer.
> h1. Problem
> ZstdInputStreamNoFinalizer is not a good fit for our use case because we 
> already have the entire compressed data stored in a buffer. We don't have a 
> need for an interface which takes InputStream as an input. Our requirement is 
> for an interface which takes a ByteBuffer as an input and provides a stream 
> of uncompressed data as output. Prior to zstd-jni 1.5.5, no such interface 
> existed. Hence, we were forced to use ZstdInputStreamNoFinalizer.
> Usage of ZstdInputStreamNoFinalizer has the following problems:
> 1. When decompression of batch is complete, we try to read another byte to 
> check if the actual batch size if equal to declared batch size. This is done 
> at RecordIterator#next(). This extra call to read another byte leads to a JNI 
> call in existing interface.
> 2. Since this interface requires input as a InputStream, we take the 
> ByteBuffer containing compressed batch and convert it into a InputStream. 
> This interface internally uses an intermediate buffer to read data from this 
> InputStream in chunks. The chunk size is determined by underlying zstd 
> library and hence, we will allocate a new buffer with very batch. This leads 
> to the following transformation: ByteBuffer (compressed batch) -> InputStream 
> (compressed batch) -> data copy to intermediate ByteBuffer (chunk of 
> compressed batch) -> send chunk to zstd library for decompression -> refill 
> the intermediate buffer by copying the data to intermediate ByteBuffer (next 
> chunk of compressed batch)
> h1. Solution
> I have extended an an interface in downstream library zstd-jni to suit the 
> use case of Kafka. The new interface is called 
> ZstdBufferDecompressingStreamNoFinalizer. It provides an interface where it 
> takes input as a ByteBuffer containing compressed data and provides output as 
> an InputStream. It solves the above problems as follows:
> 1. When we read the final decompressed frame, this interface sets a flag to 
> mark that all uncompressed data has been consumed. When RecordIterator#next() 
> tries to determine if the stream has ended, we simply read the flag and 
> hence, do not have to make a JNI call.
> 2. It does not require any buffer allocation for input. It takes the input 
> buffer and passes it across the JNI boundary without any intermediate 
> copying. Hence, we don't perform any buffer allocation.
> h1. References
> h2. Changes in downstream zstd-jni
> Add new interface - 
> [https://github.com/luben/zstd-jni/commit/d65490e8b8aadc4b59545755e55f7dd368fe8aa5]
> Bug fixes in new interface - 
> [https://github.com/luben/zstd-jni/commit/8bf8066438785ce55b62fc7e6816faafe1e3b39e]
>  
> [https://github.com/luben/zstd-jni/commit/100c434dfcec17a865ca2c2b844afe1046ce1b10]
> [https://github.com/luben/zstd-jni/commit/355b8511a2967d097a619047a579930cac2ccd9d]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to