[ 
https://issues.apache.org/jira/browse/KAFKA-7041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518598#comment-16518598
 ] 

Nikki Thean edited comment on KAFKA-7041 at 6/20/18 9:35 PM:
-------------------------------------------------------------

Hi [~mjsax], I was interested in taking this ticket, but looked through the 
code and I believe that StandbyTasks are already using bulk loading if the 
state store is a RocksDBStore. Please tell me if I'm wrong though, as I'm 
obviously not as familiar with the code as the maintainers, and would be happy 
to learn more about the Streams library.

Here's what I think is happening: When StandbyTasks are 
[updated|https://github.com/apache/kafka/blob/e5ec3f55c968292ae8fd0cb8ad8b626abed8f503/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java#L167]
 the ProcessorStateManager uses a 
[BatchingStateRestoreCallback|https://github.com/apache/kafka/blob/e5ec3f55c968292ae8fd0cb8ad8b626abed8f503/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L183],
 which will either return a true 'batching' callback [using RocksDB bulk 
loading|https://github.com/apache/kafka/blob/e5ec3f55c968292ae8fd0cb8ad8b626abed8f503/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L547],
 or a [wrapper that iterates through 
records|https://github.com/apache/kafka/blob/e5ec3f55c968292ae8fd0cb8ad8b626abed8f503/streams/src/main/java/org/apache/kafka/streams/processor/internals/WrappedBatchingStateRestoreCallback.java]
 for regular StateRestoreCallbacks. So when the original state store is 
registered, the appropriate callback will be registered, [like 
this|https://github.com/apache/kafka/blob/e5ec3f55c968292ae8fd0cb8ad8b626abed8f503/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L168-L172]
 in RocksDBStore.

However, I do think that something like RocksDBSegmentedBytesStore, which uses 
a [non-batching 
callback|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java#L129-L134],
 could be updated to use RocksDB bulk loading. If that makes sense I would be 
happy to work on that.


was (Author: nthean):
Hi [~mjsax], I was interested in taking this ticket, but looked through the 
code and I believe that StandbyTasks are already using bulk loading if the 
state store is a RocksDBStore. Please tell me if I'm wrong though, as I'm 
obviously not as familiar with the code as the maintainers, and would be happy 
to learn more about the Streams library.

Here's what I think is happening: When StandbyTasks are 
[updated|https://github.com/apache/kafka/blob/e5ec3f55c968292ae8fd0cb8ad8b626abed8f503/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java#L167]
 the ProcessorStateManager uses a 
[BatchingStateRestoreCallback|https://github.com/apache/kafka/blob/e5ec3f55c968292ae8fd0cb8ad8b626abed8f503/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L183],
 which will either return a true 'batching' callback [using RocksDB bulk 
loading|https://github.com/apache/kafka/blob/e5ec3f55c968292ae8fd0cb8ad8b626abed8f503/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L547],
 or a [wrapper that iterates through 
records|https://github.com/apache/kafka/blob/e5ec3f55c968292ae8fd0cb8ad8b626abed8f503/streams/src/main/java/org/apache/kafka/streams/processor/internals/WrappedBatchingStateRestoreCallback.java]
 for regular StateRestoreCallbacks. So when the original state store is 
registered, the appropriate callback will be registered, [like 
this|https://github.com/apache/kafka/blob/e5ec3f55c968292ae8fd0cb8ad8b626abed8f503/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L168-L172]
 in RocksDBStore.

However, I do think that something like RocksDBSegmentedBytes, which uses a 
[non-batching 
callback|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java#L129-L134],
 could be updated to use RocksDB bulk loading. If that makes sense I would be 
happy to work on that.

> Using RocksDB bulk loading for StandbyTasks
> -------------------------------------------
>
>                 Key: KAFKA-7041
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7041
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Matthias J. Sax
>            Priority: Major
>
> In KAFKA-5363 we introduced RocksDB bulk loading to speed up store recovery. 
> We could do the same optimization for StandbyTasks to make them more 
> efficient and to reduce the likelihood that StandbyTasks lag behind.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to