[ 
https://issues.apache.org/jira/browse/KAFKA-8295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17373572#comment-17373572
 ] 

Sagar Rao edited comment on KAFKA-8295 at 7/2/21, 2:36 PM:
-----------------------------------------------------------

[~ableegoldman], I did some more analysis and here's a summary of it:

1) the RocksDB merge operator is useful for both associative and partial 
merges. The examples as per the documentation is ranging from a simple counter, 
to appending values to a string/list or partial updates in a json document.

2) Here's where I feel it could be useful in Kafka Streams context:

2.1) One option could be we introduce a merge operator in StateStores 
interface. This merge operator could be used for incrementing a counter or 
appending value to an existing String/List. It could be useful as all the 
operations i described above are RMW operations. As per RocksDB documentation, 
random Get() is relatively slow, so that might be one advantage. Infact, this 
could be easily extended to in memory state stores as well so we aren't tied to 
Rockdb here.

The only downside I see is that looks like we can set only one mergeOperator to 
the options. So, we might want to set the correct MergerOperator based upon the 
values.

If you think such an operator is useful, I can do a benchmarking exercise by 
adding an interface and comparing the performance of the same counter example 
by RMW and merge() operator.

2) I also looked at the count() api in KGroupedStream and one of its 
implementation. As per my understanding, eventually, the logic resides in 
KStreamAggregate.process() method where it's still a RMW sequence:
{code:java}
final ValueAndTimestamp<T> oldAggAndTimestamp = store.get(key);
 T oldAgg = getValueOrNull(oldAggAndTimestamp);
final T newAgg;
 final long newTimestamp;
if (oldAgg == null)
{ oldAgg = initializer.apply(); newTimestamp = context().timestamp(); }
else
{ oldAgg = oldAggAndTimestamp.value(); newTimestamp = 
Math.max(context().timestamp(), oldAggAndTimestamp.timestamp()); }
newAgg = aggregator.apply(key, value, oldAgg);
{code}

 
 and the aggregator is a countAggregator:
{code:java}
final Aggregator<K, V, Long> countAggregator = (aggKey, value, aggregate) -> 
aggregate + 1;
{code}
the merge API in C++ supports both an initial value when it and updating an 
existing value. The initial value is being handled here:
{code:java}
final Initializer<Long> countInitializer = () -> 0L
{code}
 

So, in theory this sequence could also be replaced but I will have to try it 
out. One thing to note is that, the value is ValueTimeStamp which I believe is 
not supported in the UInt64AddOperator. So, we might want a custom merge 
operator in c++ and then add it here and I am not sure of the performance :)

I can give it a try if needed.

WDYT? Option 1 is simpler so should we go ahead with that and if we get good 
results we can try option 2?


was (Author: sagarrao):
[~ableegoldman], I did some more analysis and here's a summary of it:

1) the RocksDB merge operator is useful for both associative and partial 
merges. The examples as per the documentation is ranging from a simple counter, 
to appending values to a string/list or partial updates in a json document.

2) Here's where I feel it could be useful in Kafka Streams context:

2.1) One option could be we introduce a merge operator in StateStores 
interface. This merge operator could be used for incrementing a counter or 
appending value to an existing String/List. It could be useful as all the 
operations i described above are RMW operations. As per RocksDB documentation, 
random Get() is relatively slow, so that might be one advantage. Infact, this 
could be easily extended to in memory state stores as well so we aren't tied to 
Rockdb here.

The only downside I see is that looks like we can set only one mergeOperator to 
the options. So, we might want to set the correct MergerOperator based upon the 
values.

If you think such an operator is useful, I can do a benchmarking exercise by 
adding an interface and comparing the performance of the same counter example 
by RMW and merge() operator.

2) I also looked at the count() api in KGroupedStream and one of its 
implementation. As per my understanding, eventually, the logic resides in 
KStreamAggregate.process() method where it's still a RMW sequence:

```
final ValueAndTimestamp<T> oldAggAndTimestamp = store.get(key);
            T oldAgg = getValueOrNull(oldAggAndTimestamp);

            final T newAgg;
            final long newTimestamp;

            if (oldAgg == null) {
                oldAgg = initializer.apply();
                newTimestamp = context().timestamp();
            } else {
                oldAgg = oldAggAndTimestamp.value();
                newTimestamp = Math.max(context().timestamp(), 
oldAggAndTimestamp.timestamp());
            }

            newAgg = aggregator.apply(key, value, oldAgg);
```
and the aggregator is a countAggregator:

`final Aggregator<K, V, Long> countAggregator = (aggKey, value, aggregate) -> 
aggregate + 1;` 

the merge API in C++ supports both an initial value when it and updating an 
existing value. The initial value is being handled here:

`final Initializer<Long> countInitializer = () -> 0L;` 

So, in theory this sequence could also be replaced but I will have to try it 
out. One thing to note is that, the value is ValueTimeStamp which I believe is 
not supported in the UInt64AddOperator. So, we might want a custom merge 
operator in c++ and then add it here and I am not sure of the performance :) 

I can give it a try if needed.

WDYT? Option 1 is simpler so should we go ahead with that and if we get good 
results we can try option 2?







> Optimize count() using RocksDB merge operator
> ---------------------------------------------
>
>                 Key: KAFKA-8295
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8295
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: A. Sophie Blee-Goldman
>            Assignee: Sagar Rao
>            Priority: Major
>
> In addition to regular put/get/delete RocksDB provides a fourth operation, 
> merge. This essentially provides an optimized read/update/write path in a 
> single operation. One of the built-in (C++) merge operators exposed over the 
> Java API is a counter. We should be able to leverage this for a more 
> efficient implementation of count()
>  
> (Note: Unfortunately it seems unlikely we can use this to optimize general 
> aggregations, even if RocksJava allowed for a custom merge operator, unless 
> we provide a way for the user to specify and connect a C++ implemented 
> aggregator – otherwise we incur too much cost crossing the jni for a net 
> performance benefit)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to