Re: Store flushing on commit.interval.ms from KIP-63 introduces aggregation latency

2016-10-12 Thread Greg Fodor
Ah thanks so much for the insights -- we should be in a position to profile
the new library against real data in the next week or so so I'll let you
know how it goes.

On Oct 11, 2016 6:26 PM, "Guozhang Wang"  wrote:

> Hello Greg,
>
> I can share some context of KIP-63 here:
>
> 1. Like Eno mentioned, we believe RocksDB's own mem-table is already
> optimizing a large portion of IO access for its write performance, and
> adding an extra caching layer on top of that was mainly for saving ser-de
> costs (note that you still need to ser / deser key-value objects into bytes
> when interacting with RocksDB). Although it may further help IO, it is not
> the main motivation.
>
> 2. As part of KIP-63 Bill helped investigating the pros / cons of such
> object caching (https://issues.apache.org/jira/browse/KAFKA-3973), and our
> conclusion based on that is, although it saves serde costs, it also makes
> memory management very hard in the long run, with caching based on
> num.records, not num.bytes. And when you have an OOM in one of the
> instances, it may well result in cascading failures from rebalances and
> task migration. Ideally, we want to have some restrict memory bound for
> better capacity planning and integration with cluster resource managers
> (see
> https://cwiki.apache.org/confluence/display/KAFKA/Discussion%3A+Memory+
> Management+in+Kafka+Streams
> for more details).
>
> 3. So as part of KIP-63, we removed object-oriented caching and replaced
> with bytes caches, and in addition add the RocksDBConfigSetter to allow
> users to configure their RocksDB to tune for their write /
> space amplifications for IO.
>
>
> With that, I think shutting off caching for your case should not degrading
> the performance too much assuming RocksDB itself can already do a good job
> in terms of write access, it may add extra serde costs though depending
> your use case (originally it is like 1000 records per cache, so roughly
> speaking you are saving those many serde calls per store). But if you do
> observe significant performance degradation I'd personally love to learn
> more and help on that end.
>
>
> Guozhang
>
>
>
>
>
> On Tue, Oct 11, 2016 at 10:10 AM, Greg Fodor  wrote:
>
> > Thanks Eno -- my understanding is that cache is already enabled to be
> > 100MB per rocksdb so it should be on already, but I'll check. I was
> > wondering if you could shed some light on the changes between 0.10.0
> > and 0.10.1 -- in 0.10.0 there was an intermediate cache within
> > RocksDbStore -- presumably this was there to improve performance,
> > despite there still being a lower level cache managed by rocksdb. Can
> > you shed some light why this cache was needed in 0.10.0? If it sounds
> > like our use case won't warrant the same need then we might be OK.
> >
> > Overall however, this is really problematic for us, since we will have
> > to turn off caching for effectively all of our jobs. The way our
> > system works is that we have a number of jobs running kafka streams
> > that are configured via database tables we change via our web stack.
> > For example, when we want to tell our jobs to begin processing data
> > for a user, we insert a record for that user into the database which
> > gets passed via kafka connect to a kafka topic. The kafka streams job
> > is consuming this topic, does some basic group by operations and
> > repartitions on it, and joins it against other data streams so that it
> > knows what users should be getting processed.
> >
> > So fundamentally we have two types of aggregations: the typical case
> > that was I think the target for the optimizations in KIP-63, where
> > latency is less critical since we are counting and emitting counts for
> > analysis, etc. And the other type of aggregation is where we are doing
> > simple transformations on data coming from the database in a way to
> > configure the live behavior of the job. Latency here is very
> > sensitive: users expect the job to react and start sending data for a
> > user immediately after the database records are changed.
> >
> > So as you can see, since this is the paradigm we use to operate jobs,
> > we're in a bad position if we ever want to take advantage of the work
> > in KIP-63. All of our jobs are set up to work in this way, so we will
> > either have to maintain our fork or will have to shut off caching for
> > all of our jobs, neither of which sounds like a very good path.
> >
> > On Tue, Oct 11, 2016 at 4:16 AM, Eno Thereska 
> > wrote:
> > > Hi Greg,
> > >
> > > An alternative would be to set up RocksDB's cache, while keeping the
> > streams cache to 0. That might give you what you need, especially if you
> > can work with RocksDb and don't need to change the store.
> > >
> > > For example, here is how to set the Block Cache size to 100MB and the
> > Write Buffer size to 32MB
> > >
> > > https://github.com/facebook/rocksdb/wiki/Block-Cache <
> > https://github.com/facebook/rocksdb/wiki/Block-Cache>
> > > https://github.com/facebook/r

Re: Store flushing on commit.interval.ms from KIP-63 introduces aggregation latency

2016-10-11 Thread Guozhang Wang
Hello Greg,

I can share some context of KIP-63 here:

1. Like Eno mentioned, we believe RocksDB's own mem-table is already
optimizing a large portion of IO access for its write performance, and
adding an extra caching layer on top of that was mainly for saving ser-de
costs (note that you still need to ser / deser key-value objects into bytes
when interacting with RocksDB). Although it may further help IO, it is not
the main motivation.

2. As part of KIP-63 Bill helped investigating the pros / cons of such
object caching (https://issues.apache.org/jira/browse/KAFKA-3973), and our
conclusion based on that is, although it saves serde costs, it also makes
memory management very hard in the long run, with caching based on
num.records, not num.bytes. And when you have an OOM in one of the
instances, it may well result in cascading failures from rebalances and
task migration. Ideally, we want to have some restrict memory bound for
better capacity planning and integration with cluster resource managers
(see
https://cwiki.apache.org/confluence/display/KAFKA/Discussion%3A+Memory+Management+in+Kafka+Streams
for more details).

3. So as part of KIP-63, we removed object-oriented caching and replaced
with bytes caches, and in addition add the RocksDBConfigSetter to allow
users to configure their RocksDB to tune for their write /
space amplifications for IO.


With that, I think shutting off caching for your case should not degrading
the performance too much assuming RocksDB itself can already do a good job
in terms of write access, it may add extra serde costs though depending
your use case (originally it is like 1000 records per cache, so roughly
speaking you are saving those many serde calls per store). But if you do
observe significant performance degradation I'd personally love to learn
more and help on that end.


Guozhang





On Tue, Oct 11, 2016 at 10:10 AM, Greg Fodor  wrote:

> Thanks Eno -- my understanding is that cache is already enabled to be
> 100MB per rocksdb so it should be on already, but I'll check. I was
> wondering if you could shed some light on the changes between 0.10.0
> and 0.10.1 -- in 0.10.0 there was an intermediate cache within
> RocksDbStore -- presumably this was there to improve performance,
> despite there still being a lower level cache managed by rocksdb. Can
> you shed some light why this cache was needed in 0.10.0? If it sounds
> like our use case won't warrant the same need then we might be OK.
>
> Overall however, this is really problematic for us, since we will have
> to turn off caching for effectively all of our jobs. The way our
> system works is that we have a number of jobs running kafka streams
> that are configured via database tables we change via our web stack.
> For example, when we want to tell our jobs to begin processing data
> for a user, we insert a record for that user into the database which
> gets passed via kafka connect to a kafka topic. The kafka streams job
> is consuming this topic, does some basic group by operations and
> repartitions on it, and joins it against other data streams so that it
> knows what users should be getting processed.
>
> So fundamentally we have two types of aggregations: the typical case
> that was I think the target for the optimizations in KIP-63, where
> latency is less critical since we are counting and emitting counts for
> analysis, etc. And the other type of aggregation is where we are doing
> simple transformations on data coming from the database in a way to
> configure the live behavior of the job. Latency here is very
> sensitive: users expect the job to react and start sending data for a
> user immediately after the database records are changed.
>
> So as you can see, since this is the paradigm we use to operate jobs,
> we're in a bad position if we ever want to take advantage of the work
> in KIP-63. All of our jobs are set up to work in this way, so we will
> either have to maintain our fork or will have to shut off caching for
> all of our jobs, neither of which sounds like a very good path.
>
> On Tue, Oct 11, 2016 at 4:16 AM, Eno Thereska 
> wrote:
> > Hi Greg,
> >
> > An alternative would be to set up RocksDB's cache, while keeping the
> streams cache to 0. That might give you what you need, especially if you
> can work with RocksDb and don't need to change the store.
> >
> > For example, here is how to set the Block Cache size to 100MB and the
> Write Buffer size to 32MB
> >
> > https://github.com/facebook/rocksdb/wiki/Block-Cache <
> https://github.com/facebook/rocksdb/wiki/Block-Cache>
> > https://github.com/facebook/rocksdb/wiki/Basic-Operations#write-buffer <
> https://github.com/facebook/rocksdb/wiki/Basic-Operations#write-buffer>
> >
> > They can override these settings by creating an impl of
> RocksDBConfigSetter and setting 
> StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG
> in Kafka Streams.
> >
> > Hope this helps,
> > Eno
> >
> >> On 10 Oct 2016, at 18:19, Greg Fodor  wrote:
> >>
> >> Hey Eno, tha

Re: Store flushing on commit.interval.ms from KIP-63 introduces aggregation latency

2016-10-11 Thread Greg Fodor
Thanks Eno -- my understanding is that cache is already enabled to be
100MB per rocksdb so it should be on already, but I'll check. I was
wondering if you could shed some light on the changes between 0.10.0
and 0.10.1 -- in 0.10.0 there was an intermediate cache within
RocksDbStore -- presumably this was there to improve performance,
despite there still being a lower level cache managed by rocksdb. Can
you shed some light why this cache was needed in 0.10.0? If it sounds
like our use case won't warrant the same need then we might be OK.

Overall however, this is really problematic for us, since we will have
to turn off caching for effectively all of our jobs. The way our
system works is that we have a number of jobs running kafka streams
that are configured via database tables we change via our web stack.
For example, when we want to tell our jobs to begin processing data
for a user, we insert a record for that user into the database which
gets passed via kafka connect to a kafka topic. The kafka streams job
is consuming this topic, does some basic group by operations and
repartitions on it, and joins it against other data streams so that it
knows what users should be getting processed.

So fundamentally we have two types of aggregations: the typical case
that was I think the target for the optimizations in KIP-63, where
latency is less critical since we are counting and emitting counts for
analysis, etc. And the other type of aggregation is where we are doing
simple transformations on data coming from the database in a way to
configure the live behavior of the job. Latency here is very
sensitive: users expect the job to react and start sending data for a
user immediately after the database records are changed.

So as you can see, since this is the paradigm we use to operate jobs,
we're in a bad position if we ever want to take advantage of the work
in KIP-63. All of our jobs are set up to work in this way, so we will
either have to maintain our fork or will have to shut off caching for
all of our jobs, neither of which sounds like a very good path.

On Tue, Oct 11, 2016 at 4:16 AM, Eno Thereska  wrote:
> Hi Greg,
>
> An alternative would be to set up RocksDB's cache, while keeping the streams 
> cache to 0. That might give you what you need, especially if you can work 
> with RocksDb and don't need to change the store.
>
> For example, here is how to set the Block Cache size to 100MB and the Write 
> Buffer size to 32MB
>
> https://github.com/facebook/rocksdb/wiki/Block-Cache 
> 
> https://github.com/facebook/rocksdb/wiki/Basic-Operations#write-buffer 
> 
>
> They can override these settings by creating an impl of RocksDBConfigSetter 
> and setting StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG in Kafka Streams.
>
> Hope this helps,
> Eno
>
>> On 10 Oct 2016, at 18:19, Greg Fodor  wrote:
>>
>> Hey Eno, thanks for the suggestion -- understood that my patch is not
>> something that could be accepted given the API change, I posted it to help
>> make the discussion concrete and because i needed a workaround. (Likely
>> we'll maintain this patch internally so we can move forward with the new
>> version, since the consumer heartbeat issue is something we really need
>> addressed.)
>>
>> Looking at the code, it seems that setting the cache size to zero will
>> disable all caching. However, the previous version of Kafka Streams had a
>> local cache within the RocksDBStore to reduce I/O. If we were to set the
>> cache size to zero, my guess is we'd see a large increase in I/O relative
>> to the previous version since we would no longer have caching of any kind
>> even intra-store. By the looks of it there isn't an easy way to replicate
>> the same caching behavior as the old version of Kafka Streams in the new
>> system without increasing latency, but maybe I'm missing something.
>>
>>
>> On Oct 10, 2016 3:10 AM, "Eno Thereska"  wrote:
>>
>>> Hi Greg,
>>>
>>> Thanks for trying 0.10.1. The best option you have for your specific app
>>> is to simply turn off caching by setting the cache size to 0. That should
>>> give you the old behaviour:
>>> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
>>> 0L);
>>>
>>> Your PR is an alternative, but it requires changing the APIs and would
>>> require a KIP.
>>>
>>> Thanks
>>> Eno
>>>
 On 9 Oct 2016, at 23:49, Greg Fodor  wrote:

 JIRA opened here: https://issues.apache.org/jira/browse/KAFKA-4281

 On Sun, Oct 9, 2016 at 2:02 AM, Greg Fodor  wrote:
> I went ahead and did some more testing, and it feels to me one option
> for resolving this issue is having a method on KGroupedStream which
> can be used to configure if the operations on it (reduce/aggregate)
> will forward immediately or not. I did a quick patch and was able to
> determine that if the records are forwarded immediately it re

Re: Store flushing on commit.interval.ms from KIP-63 introduces aggregation latency

2016-10-11 Thread Eno Thereska
Hi Greg,

An alternative would be to set up RocksDB's cache, while keeping the streams 
cache to 0. That might give you what you need, especially if you can work with 
RocksDb and don't need to change the store.

For example, here is how to set the Block Cache size to 100MB and the Write 
Buffer size to 32MB

https://github.com/facebook/rocksdb/wiki/Block-Cache 

https://github.com/facebook/rocksdb/wiki/Basic-Operations#write-buffer 


They can override these settings by creating an impl of RocksDBConfigSetter and 
setting StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG in Kafka Streams.

Hope this helps,
Eno

> On 10 Oct 2016, at 18:19, Greg Fodor  wrote:
> 
> Hey Eno, thanks for the suggestion -- understood that my patch is not
> something that could be accepted given the API change, I posted it to help
> make the discussion concrete and because i needed a workaround. (Likely
> we'll maintain this patch internally so we can move forward with the new
> version, since the consumer heartbeat issue is something we really need
> addressed.)
> 
> Looking at the code, it seems that setting the cache size to zero will
> disable all caching. However, the previous version of Kafka Streams had a
> local cache within the RocksDBStore to reduce I/O. If we were to set the
> cache size to zero, my guess is we'd see a large increase in I/O relative
> to the previous version since we would no longer have caching of any kind
> even intra-store. By the looks of it there isn't an easy way to replicate
> the same caching behavior as the old version of Kafka Streams in the new
> system without increasing latency, but maybe I'm missing something.
> 
> 
> On Oct 10, 2016 3:10 AM, "Eno Thereska"  wrote:
> 
>> Hi Greg,
>> 
>> Thanks for trying 0.10.1. The best option you have for your specific app
>> is to simply turn off caching by setting the cache size to 0. That should
>> give you the old behaviour:
>> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
>> 0L);
>> 
>> Your PR is an alternative, but it requires changing the APIs and would
>> require a KIP.
>> 
>> Thanks
>> Eno
>> 
>>> On 9 Oct 2016, at 23:49, Greg Fodor  wrote:
>>> 
>>> JIRA opened here: https://issues.apache.org/jira/browse/KAFKA-4281
>>> 
>>> On Sun, Oct 9, 2016 at 2:02 AM, Greg Fodor  wrote:
 I went ahead and did some more testing, and it feels to me one option
 for resolving this issue is having a method on KGroupedStream which
 can be used to configure if the operations on it (reduce/aggregate)
 will forward immediately or not. I did a quick patch and was able to
 determine that if the records are forwarded immediately it resolves
 the issue I am seeing. Having it be done on a per-KGroupedStream basis
 would provide maximum flexibility.
 
 On Sun, Oct 9, 2016 at 1:06 AM, Greg Fodor  wrote:
> I'm taking 0.10.1 for a spin on our existing Kafka Streams jobs and
> I'm hitting what seems to be a serious issue (at least, for us) with
> the changes brought about in KIP-63. In our job, we have a number of
> steps in the topology where we perform a repartition and aggregation
> on topics that require low latency. These topics have a very low
> message volume but require subsecond latency for the aggregations to
> complete since they are configuration data that drive the rest of the
> job and need to be applied immediately.
> 
> In 0.10.0, we performed a through (for repartitioning) and aggregateBy
> and this resulted in minimal latency as the aggregateBy would just
> result in a consumer attached to the output of the through and the
> processor would consume + aggregate messages immediately passing them
> to the next step in the topology.
> 
> However, in 0.10.1 the aggregateBy API is no longer available and it
> is necessary to pivot the data through a groupByKey and then
> aggregate(). The problem is that this mechanism results in the
> intermediate KTable state store storing the data as usual, but the
> data is not forwarded downstream until the next store flush. (Due to
> the use of ForwardingCacheFlushListener instead of calling forward()
> during the process of the record.)
> 
> As noted in KIP-63 and as I saw in the code, the flush interval of
> state stores is commit.interval.ms. For us, this has been tuned to a
> few seconds, and since we have a number of these aggregations in our
> job sequentially, this now results in many seconds of latency in the
> worst case for a tuple to travel through our topology.
> 
> It seems too inflexible to have the flush interval always be the same
> as the commit interval across all aggregates. For certain aggregations
> which are idempotent regardless of messages being reprocessed, being
> able to flush more oft

Re: Store flushing on commit.interval.ms from KIP-63 introduces aggregation latency

2016-10-10 Thread Greg Fodor
Hey Eno, thanks for the suggestion -- understood that my patch is not
something that could be accepted given the API change, I posted it to help
make the discussion concrete and because i needed a workaround. (Likely
we'll maintain this patch internally so we can move forward with the new
version, since the consumer heartbeat issue is something we really need
addressed.)

Looking at the code, it seems that setting the cache size to zero will
disable all caching. However, the previous version of Kafka Streams had a
local cache within the RocksDBStore to reduce I/O. If we were to set the
cache size to zero, my guess is we'd see a large increase in I/O relative
to the previous version since we would no longer have caching of any kind
even intra-store. By the looks of it there isn't an easy way to replicate
the same caching behavior as the old version of Kafka Streams in the new
system without increasing latency, but maybe I'm missing something.


On Oct 10, 2016 3:10 AM, "Eno Thereska"  wrote:

> Hi Greg,
>
> Thanks for trying 0.10.1. The best option you have for your specific app
> is to simply turn off caching by setting the cache size to 0. That should
> give you the old behaviour:
> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
> 0L);
>
> Your PR is an alternative, but it requires changing the APIs and would
> require a KIP.
>
> Thanks
> Eno
>
> > On 9 Oct 2016, at 23:49, Greg Fodor  wrote:
> >
> > JIRA opened here: https://issues.apache.org/jira/browse/KAFKA-4281
> >
> > On Sun, Oct 9, 2016 at 2:02 AM, Greg Fodor  wrote:
> >> I went ahead and did some more testing, and it feels to me one option
> >> for resolving this issue is having a method on KGroupedStream which
> >> can be used to configure if the operations on it (reduce/aggregate)
> >> will forward immediately or not. I did a quick patch and was able to
> >> determine that if the records are forwarded immediately it resolves
> >> the issue I am seeing. Having it be done on a per-KGroupedStream basis
> >> would provide maximum flexibility.
> >>
> >> On Sun, Oct 9, 2016 at 1:06 AM, Greg Fodor  wrote:
> >>> I'm taking 0.10.1 for a spin on our existing Kafka Streams jobs and
> >>> I'm hitting what seems to be a serious issue (at least, for us) with
> >>> the changes brought about in KIP-63. In our job, we have a number of
> >>> steps in the topology where we perform a repartition and aggregation
> >>> on topics that require low latency. These topics have a very low
> >>> message volume but require subsecond latency for the aggregations to
> >>> complete since they are configuration data that drive the rest of the
> >>> job and need to be applied immediately.
> >>>
> >>> In 0.10.0, we performed a through (for repartitioning) and aggregateBy
> >>> and this resulted in minimal latency as the aggregateBy would just
> >>> result in a consumer attached to the output of the through and the
> >>> processor would consume + aggregate messages immediately passing them
> >>> to the next step in the topology.
> >>>
> >>> However, in 0.10.1 the aggregateBy API is no longer available and it
> >>> is necessary to pivot the data through a groupByKey and then
> >>> aggregate(). The problem is that this mechanism results in the
> >>> intermediate KTable state store storing the data as usual, but the
> >>> data is not forwarded downstream until the next store flush. (Due to
> >>> the use of ForwardingCacheFlushListener instead of calling forward()
> >>> during the process of the record.)
> >>>
> >>> As noted in KIP-63 and as I saw in the code, the flush interval of
> >>> state stores is commit.interval.ms. For us, this has been tuned to a
> >>> few seconds, and since we have a number of these aggregations in our
> >>> job sequentially, this now results in many seconds of latency in the
> >>> worst case for a tuple to travel through our topology.
> >>>
> >>> It seems too inflexible to have the flush interval always be the same
> >>> as the commit interval across all aggregates. For certain aggregations
> >>> which are idempotent regardless of messages being reprocessed, being
> >>> able to flush more often than the commit interval seems like a very
> >>> important option when lower latency is required. It would still make
> >>> sense to flush every commit as well, but having an additional
> >>> configuration to set the maximum time between state store flushes
> >>> seems like it would solve our problem.
> >>>
> >>> In our case, we'd set our flush interval to a few hundred ms. Ideally,
> >>> we would really prefer to be able to disable interval based flushing
> >>> altogether (and just put + forward all processed records) for certain
> >>> KTables that are low volume, latency sensitive, and which are
> >>> idempotent under message reprocessing.
> >>>
> >>> Thanks for any help! Right now the only option it seems is for us to
> >>> radically lower the commit interval and accept any leftover latency,
> >>> but unless we can find a sweet spot this ma

Re: Store flushing on commit.interval.ms from KIP-63 introduces aggregation latency

2016-10-10 Thread Eno Thereska
Hi Greg,

Thanks for trying 0.10.1. The best option you have for your specific app is to 
simply turn off caching by setting the cache size to 0. That should give you 
the old behaviour:
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0L);

Your PR is an alternative, but it requires changing the APIs and would require 
a KIP. 

Thanks
Eno

> On 9 Oct 2016, at 23:49, Greg Fodor  wrote:
> 
> JIRA opened here: https://issues.apache.org/jira/browse/KAFKA-4281
> 
> On Sun, Oct 9, 2016 at 2:02 AM, Greg Fodor  wrote:
>> I went ahead and did some more testing, and it feels to me one option
>> for resolving this issue is having a method on KGroupedStream which
>> can be used to configure if the operations on it (reduce/aggregate)
>> will forward immediately or not. I did a quick patch and was able to
>> determine that if the records are forwarded immediately it resolves
>> the issue I am seeing. Having it be done on a per-KGroupedStream basis
>> would provide maximum flexibility.
>> 
>> On Sun, Oct 9, 2016 at 1:06 AM, Greg Fodor  wrote:
>>> I'm taking 0.10.1 for a spin on our existing Kafka Streams jobs and
>>> I'm hitting what seems to be a serious issue (at least, for us) with
>>> the changes brought about in KIP-63. In our job, we have a number of
>>> steps in the topology where we perform a repartition and aggregation
>>> on topics that require low latency. These topics have a very low
>>> message volume but require subsecond latency for the aggregations to
>>> complete since they are configuration data that drive the rest of the
>>> job and need to be applied immediately.
>>> 
>>> In 0.10.0, we performed a through (for repartitioning) and aggregateBy
>>> and this resulted in minimal latency as the aggregateBy would just
>>> result in a consumer attached to the output of the through and the
>>> processor would consume + aggregate messages immediately passing them
>>> to the next step in the topology.
>>> 
>>> However, in 0.10.1 the aggregateBy API is no longer available and it
>>> is necessary to pivot the data through a groupByKey and then
>>> aggregate(). The problem is that this mechanism results in the
>>> intermediate KTable state store storing the data as usual, but the
>>> data is not forwarded downstream until the next store flush. (Due to
>>> the use of ForwardingCacheFlushListener instead of calling forward()
>>> during the process of the record.)
>>> 
>>> As noted in KIP-63 and as I saw in the code, the flush interval of
>>> state stores is commit.interval.ms. For us, this has been tuned to a
>>> few seconds, and since we have a number of these aggregations in our
>>> job sequentially, this now results in many seconds of latency in the
>>> worst case for a tuple to travel through our topology.
>>> 
>>> It seems too inflexible to have the flush interval always be the same
>>> as the commit interval across all aggregates. For certain aggregations
>>> which are idempotent regardless of messages being reprocessed, being
>>> able to flush more often than the commit interval seems like a very
>>> important option when lower latency is required. It would still make
>>> sense to flush every commit as well, but having an additional
>>> configuration to set the maximum time between state store flushes
>>> seems like it would solve our problem.
>>> 
>>> In our case, we'd set our flush interval to a few hundred ms. Ideally,
>>> we would really prefer to be able to disable interval based flushing
>>> altogether (and just put + forward all processed records) for certain
>>> KTables that are low volume, latency sensitive, and which are
>>> idempotent under message reprocessing.
>>> 
>>> Thanks for any help! Right now the only option it seems is for us to
>>> radically lower the commit interval and accept any leftover latency,
>>> but unless we can find a sweet spot this may be a blocker for us to
>>> moving to 0.10.1.



Re: Store flushing on commit.interval.ms from KIP-63 introduces aggregation latency

2016-10-09 Thread Greg Fodor
JIRA opened here: https://issues.apache.org/jira/browse/KAFKA-4281

On Sun, Oct 9, 2016 at 2:02 AM, Greg Fodor  wrote:
> I went ahead and did some more testing, and it feels to me one option
> for resolving this issue is having a method on KGroupedStream which
> can be used to configure if the operations on it (reduce/aggregate)
> will forward immediately or not. I did a quick patch and was able to
> determine that if the records are forwarded immediately it resolves
> the issue I am seeing. Having it be done on a per-KGroupedStream basis
> would provide maximum flexibility.
>
> On Sun, Oct 9, 2016 at 1:06 AM, Greg Fodor  wrote:
>> I'm taking 0.10.1 for a spin on our existing Kafka Streams jobs and
>> I'm hitting what seems to be a serious issue (at least, for us) with
>> the changes brought about in KIP-63. In our job, we have a number of
>> steps in the topology where we perform a repartition and aggregation
>> on topics that require low latency. These topics have a very low
>> message volume but require subsecond latency for the aggregations to
>> complete since they are configuration data that drive the rest of the
>> job and need to be applied immediately.
>>
>> In 0.10.0, we performed a through (for repartitioning) and aggregateBy
>> and this resulted in minimal latency as the aggregateBy would just
>> result in a consumer attached to the output of the through and the
>> processor would consume + aggregate messages immediately passing them
>> to the next step in the topology.
>>
>> However, in 0.10.1 the aggregateBy API is no longer available and it
>> is necessary to pivot the data through a groupByKey and then
>> aggregate(). The problem is that this mechanism results in the
>> intermediate KTable state store storing the data as usual, but the
>> data is not forwarded downstream until the next store flush. (Due to
>> the use of ForwardingCacheFlushListener instead of calling forward()
>> during the process of the record.)
>>
>> As noted in KIP-63 and as I saw in the code, the flush interval of
>> state stores is commit.interval.ms. For us, this has been tuned to a
>> few seconds, and since we have a number of these aggregations in our
>> job sequentially, this now results in many seconds of latency in the
>> worst case for a tuple to travel through our topology.
>>
>> It seems too inflexible to have the flush interval always be the same
>> as the commit interval across all aggregates. For certain aggregations
>> which are idempotent regardless of messages being reprocessed, being
>> able to flush more often than the commit interval seems like a very
>> important option when lower latency is required. It would still make
>> sense to flush every commit as well, but having an additional
>> configuration to set the maximum time between state store flushes
>> seems like it would solve our problem.
>>
>> In our case, we'd set our flush interval to a few hundred ms. Ideally,
>> we would really prefer to be able to disable interval based flushing
>> altogether (and just put + forward all processed records) for certain
>> KTables that are low volume, latency sensitive, and which are
>> idempotent under message reprocessing.
>>
>> Thanks for any help! Right now the only option it seems is for us to
>> radically lower the commit interval and accept any leftover latency,
>> but unless we can find a sweet spot this may be a blocker for us to
>> moving to 0.10.1.


Re: Store flushing on commit.interval.ms from KIP-63 introduces aggregation latency

2016-10-09 Thread Greg Fodor
I went ahead and did some more testing, and it feels to me one option
for resolving this issue is having a method on KGroupedStream which
can be used to configure if the operations on it (reduce/aggregate)
will forward immediately or not. I did a quick patch and was able to
determine that if the records are forwarded immediately it resolves
the issue I am seeing. Having it be done on a per-KGroupedStream basis
would provide maximum flexibility.

On Sun, Oct 9, 2016 at 1:06 AM, Greg Fodor  wrote:
> I'm taking 0.10.1 for a spin on our existing Kafka Streams jobs and
> I'm hitting what seems to be a serious issue (at least, for us) with
> the changes brought about in KIP-63. In our job, we have a number of
> steps in the topology where we perform a repartition and aggregation
> on topics that require low latency. These topics have a very low
> message volume but require subsecond latency for the aggregations to
> complete since they are configuration data that drive the rest of the
> job and need to be applied immediately.
>
> In 0.10.0, we performed a through (for repartitioning) and aggregateBy
> and this resulted in minimal latency as the aggregateBy would just
> result in a consumer attached to the output of the through and the
> processor would consume + aggregate messages immediately passing them
> to the next step in the topology.
>
> However, in 0.10.1 the aggregateBy API is no longer available and it
> is necessary to pivot the data through a groupByKey and then
> aggregate(). The problem is that this mechanism results in the
> intermediate KTable state store storing the data as usual, but the
> data is not forwarded downstream until the next store flush. (Due to
> the use of ForwardingCacheFlushListener instead of calling forward()
> during the process of the record.)
>
> As noted in KIP-63 and as I saw in the code, the flush interval of
> state stores is commit.interval.ms. For us, this has been tuned to a
> few seconds, and since we have a number of these aggregations in our
> job sequentially, this now results in many seconds of latency in the
> worst case for a tuple to travel through our topology.
>
> It seems too inflexible to have the flush interval always be the same
> as the commit interval across all aggregates. For certain aggregations
> which are idempotent regardless of messages being reprocessed, being
> able to flush more often than the commit interval seems like a very
> important option when lower latency is required. It would still make
> sense to flush every commit as well, but having an additional
> configuration to set the maximum time between state store flushes
> seems like it would solve our problem.
>
> In our case, we'd set our flush interval to a few hundred ms. Ideally,
> we would really prefer to be able to disable interval based flushing
> altogether (and just put + forward all processed records) for certain
> KTables that are low volume, latency sensitive, and which are
> idempotent under message reprocessing.
>
> Thanks for any help! Right now the only option it seems is for us to
> radically lower the commit interval and accept any leftover latency,
> but unless we can find a sweet spot this may be a blocker for us to
> moving to 0.10.1.


Store flushing on commit.interval.ms from KIP-63 introduces aggregation latency

2016-10-09 Thread Greg Fodor
I'm taking 0.10.1 for a spin on our existing Kafka Streams jobs and
I'm hitting what seems to be a serious issue (at least, for us) with
the changes brought about in KIP-63. In our job, we have a number of
steps in the topology where we perform a repartition and aggregation
on topics that require low latency. These topics have a very low
message volume but require subsecond latency for the aggregations to
complete since they are configuration data that drive the rest of the
job and need to be applied immediately.

In 0.10.0, we performed a through (for repartitioning) and aggregateBy
and this resulted in minimal latency as the aggregateBy would just
result in a consumer attached to the output of the through and the
processor would consume + aggregate messages immediately passing them
to the next step in the topology.

However, in 0.10.1 the aggregateBy API is no longer available and it
is necessary to pivot the data through a groupByKey and then
aggregate(). The problem is that this mechanism results in the
intermediate KTable state store storing the data as usual, but the
data is not forwarded downstream until the next store flush. (Due to
the use of ForwardingCacheFlushListener instead of calling forward()
during the process of the record.)

As noted in KIP-63 and as I saw in the code, the flush interval of
state stores is commit.interval.ms. For us, this has been tuned to a
few seconds, and since we have a number of these aggregations in our
job sequentially, this now results in many seconds of latency in the
worst case for a tuple to travel through our topology.

It seems too inflexible to have the flush interval always be the same
as the commit interval across all aggregates. For certain aggregations
which are idempotent regardless of messages being reprocessed, being
able to flush more often than the commit interval seems like a very
important option when lower latency is required. It would still make
sense to flush every commit as well, but having an additional
configuration to set the maximum time between state store flushes
seems like it would solve our problem.

In our case, we'd set our flush interval to a few hundred ms. Ideally,
we would really prefer to be able to disable interval based flushing
altogether (and just put + forward all processed records) for certain
KTables that are low volume, latency sensitive, and which are
idempotent under message reprocessing.

Thanks for any help! Right now the only option it seems is for us to
radically lower the commit interval and accept any leftover latency,
but unless we can find a sweet spot this may be a blocker for us to
moving to 0.10.1.