Hi Greg,

An alternative would be to set up RocksDB's cache, while keeping the streams 
cache to 0. That might give you what you need, especially if you can work with 
RocksDb and don't need to change the store.

For example, here is how to set the Block Cache size to 100MB and the Write 
Buffer size to 32MB

https://github.com/facebook/rocksdb/wiki/Block-Cache 
<https://github.com/facebook/rocksdb/wiki/Block-Cache>
https://github.com/facebook/rocksdb/wiki/Basic-Operations#write-buffer 
<https://github.com/facebook/rocksdb/wiki/Basic-Operations#write-buffer>

They can override these settings by creating an impl of RocksDBConfigSetter and 
setting StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG in Kafka Streams.

Hope this helps,
Eno

> On 10 Oct 2016, at 18:19, Greg Fodor <gfo...@gmail.com> wrote:
> 
> Hey Eno, thanks for the suggestion -- understood that my patch is not
> something that could be accepted given the API change, I posted it to help
> make the discussion concrete and because i needed a workaround. (Likely
> we'll maintain this patch internally so we can move forward with the new
> version, since the consumer heartbeat issue is something we really need
> addressed.)
> 
> Looking at the code, it seems that setting the cache size to zero will
> disable all caching. However, the previous version of Kafka Streams had a
> local cache within the RocksDBStore to reduce I/O. If we were to set the
> cache size to zero, my guess is we'd see a large increase in I/O relative
> to the previous version since we would no longer have caching of any kind
> even intra-store. By the looks of it there isn't an easy way to replicate
> the same caching behavior as the old version of Kafka Streams in the new
> system without increasing latency, but maybe I'm missing something.
> 
> 
> On Oct 10, 2016 3:10 AM, "Eno Thereska" <eno.there...@gmail.com> wrote:
> 
>> Hi Greg,
>> 
>> Thanks for trying 0.10.1. The best option you have for your specific app
>> is to simply turn off caching by setting the cache size to 0. That should
>> give you the old behaviour:
>> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
>> 0L);
>> 
>> Your PR is an alternative, but it requires changing the APIs and would
>> require a KIP.
>> 
>> Thanks
>> Eno
>> 
>>> On 9 Oct 2016, at 23:49, Greg Fodor <gfo...@gmail.com> wrote:
>>> 
>>> JIRA opened here: https://issues.apache.org/jira/browse/KAFKA-4281
>>> 
>>> On Sun, Oct 9, 2016 at 2:02 AM, Greg Fodor <gfo...@gmail.com> wrote:
>>>> I went ahead and did some more testing, and it feels to me one option
>>>> for resolving this issue is having a method on KGroupedStream which
>>>> can be used to configure if the operations on it (reduce/aggregate)
>>>> will forward immediately or not. I did a quick patch and was able to
>>>> determine that if the records are forwarded immediately it resolves
>>>> the issue I am seeing. Having it be done on a per-KGroupedStream basis
>>>> would provide maximum flexibility.
>>>> 
>>>> On Sun, Oct 9, 2016 at 1:06 AM, Greg Fodor <gfo...@gmail.com> wrote:
>>>>> I'm taking 0.10.1 for a spin on our existing Kafka Streams jobs and
>>>>> I'm hitting what seems to be a serious issue (at least, for us) with
>>>>> the changes brought about in KIP-63. In our job, we have a number of
>>>>> steps in the topology where we perform a repartition and aggregation
>>>>> on topics that require low latency. These topics have a very low
>>>>> message volume but require subsecond latency for the aggregations to
>>>>> complete since they are configuration data that drive the rest of the
>>>>> job and need to be applied immediately.
>>>>> 
>>>>> In 0.10.0, we performed a through (for repartitioning) and aggregateBy
>>>>> and this resulted in minimal latency as the aggregateBy would just
>>>>> result in a consumer attached to the output of the through and the
>>>>> processor would consume + aggregate messages immediately passing them
>>>>> to the next step in the topology.
>>>>> 
>>>>> However, in 0.10.1 the aggregateBy API is no longer available and it
>>>>> is necessary to pivot the data through a groupByKey and then
>>>>> aggregate(). The problem is that this mechanism results in the
>>>>> intermediate KTable state store storing the data as usual, but the
>>>>> data is not forwarded downstream until the next store flush. (Due to
>>>>> the use of ForwardingCacheFlushListener instead of calling forward()
>>>>> during the process of the record.)
>>>>> 
>>>>> As noted in KIP-63 and as I saw in the code, the flush interval of
>>>>> state stores is commit.interval.ms. For us, this has been tuned to a
>>>>> few seconds, and since we have a number of these aggregations in our
>>>>> job sequentially, this now results in many seconds of latency in the
>>>>> worst case for a tuple to travel through our topology.
>>>>> 
>>>>> It seems too inflexible to have the flush interval always be the same
>>>>> as the commit interval across all aggregates. For certain aggregations
>>>>> which are idempotent regardless of messages being reprocessed, being
>>>>> able to flush more often than the commit interval seems like a very
>>>>> important option when lower latency is required. It would still make
>>>>> sense to flush every commit as well, but having an additional
>>>>> configuration to set the maximum time between state store flushes
>>>>> seems like it would solve our problem.
>>>>> 
>>>>> In our case, we'd set our flush interval to a few hundred ms. Ideally,
>>>>> we would really prefer to be able to disable interval based flushing
>>>>> altogether (and just put + forward all processed records) for certain
>>>>> KTables that are low volume, latency sensitive, and which are
>>>>> idempotent under message reprocessing.
>>>>> 
>>>>> Thanks for any help! Right now the only option it seems is for us to
>>>>> radically lower the commit interval and accept any leftover latency,
>>>>> but unless we can find a sweet spot this may be a blocker for us to
>>>>> moving to 0.10.1.
>> 
>> 

Reply via email to