I went ahead and did some more testing, and it feels to me one option for resolving this issue is having a method on KGroupedStream which can be used to configure if the operations on it (reduce/aggregate) will forward immediately or not. I did a quick patch and was able to determine that if the records are forwarded immediately it resolves the issue I am seeing. Having it be done on a per-KGroupedStream basis would provide maximum flexibility.
On Sun, Oct 9, 2016 at 1:06 AM, Greg Fodor <gfo...@gmail.com> wrote: > I'm taking 0.10.1 for a spin on our existing Kafka Streams jobs and > I'm hitting what seems to be a serious issue (at least, for us) with > the changes brought about in KIP-63. In our job, we have a number of > steps in the topology where we perform a repartition and aggregation > on topics that require low latency. These topics have a very low > message volume but require subsecond latency for the aggregations to > complete since they are configuration data that drive the rest of the > job and need to be applied immediately. > > In 0.10.0, we performed a through (for repartitioning) and aggregateBy > and this resulted in minimal latency as the aggregateBy would just > result in a consumer attached to the output of the through and the > processor would consume + aggregate messages immediately passing them > to the next step in the topology. > > However, in 0.10.1 the aggregateBy API is no longer available and it > is necessary to pivot the data through a groupByKey and then > aggregate(). The problem is that this mechanism results in the > intermediate KTable state store storing the data as usual, but the > data is not forwarded downstream until the next store flush. (Due to > the use of ForwardingCacheFlushListener instead of calling forward() > during the process of the record.) > > As noted in KIP-63 and as I saw in the code, the flush interval of > state stores is commit.interval.ms. For us, this has been tuned to a > few seconds, and since we have a number of these aggregations in our > job sequentially, this now results in many seconds of latency in the > worst case for a tuple to travel through our topology. > > It seems too inflexible to have the flush interval always be the same > as the commit interval across all aggregates. For certain aggregations > which are idempotent regardless of messages being reprocessed, being > able to flush more often than the commit interval seems like a very > important option when lower latency is required. It would still make > sense to flush every commit as well, but having an additional > configuration to set the maximum time between state store flushes > seems like it would solve our problem. > > In our case, we'd set our flush interval to a few hundred ms. Ideally, > we would really prefer to be able to disable interval based flushing > altogether (and just put + forward all processed records) for certain > KTables that are low volume, latency sensitive, and which are > idempotent under message reprocessing. > > Thanks for any help! Right now the only option it seems is for us to > radically lower the commit interval and accept any leftover latency, > but unless we can find a sweet spot this may be a blocker for us to > moving to 0.10.1.