Guozhang Wang created KAFKA-13286:
-------------------------------------

             Summary: Revisit Streams State Store and Serde Implementation
                 Key: KAFKA-13286
                 URL: https://issues.apache.org/jira/browse/KAFKA-13286
             Project: Kafka
          Issue Type: Improvement
          Components: streams
            Reporter: Guozhang Wang


Kafka Streams state store is built in hierarchical layers as metered -> cached 
-> logged -> [convert] -> raw stores (rocksDB, in-memory), and it leveraged on 
the builtin Serde libraries for serialize / deserialize. There are several 
inefficiencies in the current design:

* The API only supports serde using byte arrays. This means we generate a lot 
of garbage and spend unnecessary time copying bytes, especially when working 
with windowed state stores that rely on composite keys. In many places in the 
code we have extract parts of the composite key to deserialize the either the 
timestamp or the message key from the state store key (e.g. the methods in 
WindowStoreUtils).
* The serde operation could happen on multiple layers of the state store 
hierarchies, which means we need to extra byte array copies as we move along 
doing serdes. For example, we do serde in the metered layer, but then again in 
cached layer with cache functions, and also in logged stores for generated the 
key/value in bytes to send to Kafka.

To improve on this, we can consider having support for serde into/from 
ByteBuffers would allow us to reuse the underlying bytearrays and just pass 
around slices of the underlying Buffers to avoid the unnecessary copying. 

1) More specifically, e.g. the serialize interface could be refactored to:

{code}
ByteBuffer serialize(String topic, T data, ByteBuffer);
{code}

Where the serialized bytes would be appended to the ByteBuffer. When a series 
of serialize functions are called along side the state store hierarchies, we 
then just need to make sure that what's should be appended first to the 
ByteBuffer would be serialized first. E.g. if the serialized bytes format of a 
WindowSchema is <timestamp, boolean, key>

Then we would need to call the serialize as in:

{code}
serialize(key, serialize(leftRightBoolean, serialize(timestamp, buffer))); 
{code}

2) In addition, we can consider having a pool of ByteBuffers representing a set 
of byte arrays that can be re-used. This can be captured as an intelligent 
{{ByteBufferSupplier}}, which provides:

{code}
ByteBuffer ByteBufferSupplier#allocate(long size)
{code}

Its implementation can choose to either create new byte arrays, or re-use 
existing ones in the pool; the gottcha though is that we may usually not know 
the serialized byte length for raw keys (think: in practice the keys would be 
in json/avro etc), and hence would not know how to pass in {{size}} for 
serialization, and hence may need to be conservative, or trial and error etc.

Of course callers then would be responsible for returning the used ByteBuffer 
back to the Supplier via

{code}
ByteBufferSupplier#deallocate(ByteBuffer buffer)
{code}

3) With RocksDB's direct byte-buffer (KAFKA-9168) we can optionally also 
allocate them from RocksDB directly so that using them for puts/gets would not 
go through JNI, hence is more efficient. The Supplier then would need to be 
careful to deallocate these direct byte-buffers since they would not be GC'ed 
by the JVM.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to