I went ahead and did some more testing, and it feels to me one option
for resolving this issue is having a method on KGroupedStream which
can be used to configure if the operations on it (reduce/aggregate)
will forward immediately or not. I did a quick patch and was able to
determine that if the records are forwarded immediately it resolves
the issue I am seeing. Having it be done on a per-KGroupedStream basis
would provide maximum flexibility.

On Sun, Oct 9, 2016 at 1:06 AM, Greg Fodor <gfo...@gmail.com> wrote:
> I'm taking 0.10.1 for a spin on our existing Kafka Streams jobs and
> I'm hitting what seems to be a serious issue (at least, for us) with
> the changes brought about in KIP-63. In our job, we have a number of
> steps in the topology where we perform a repartition and aggregation
> on topics that require low latency. These topics have a very low
> message volume but require subsecond latency for the aggregations to
> complete since they are configuration data that drive the rest of the
> job and need to be applied immediately.
>
> In 0.10.0, we performed a through (for repartitioning) and aggregateBy
> and this resulted in minimal latency as the aggregateBy would just
> result in a consumer attached to the output of the through and the
> processor would consume + aggregate messages immediately passing them
> to the next step in the topology.
>
> However, in 0.10.1 the aggregateBy API is no longer available and it
> is necessary to pivot the data through a groupByKey and then
> aggregate(). The problem is that this mechanism results in the
> intermediate KTable state store storing the data as usual, but the
> data is not forwarded downstream until the next store flush. (Due to
> the use of ForwardingCacheFlushListener instead of calling forward()
> during the process of the record.)
>
> As noted in KIP-63 and as I saw in the code, the flush interval of
> state stores is commit.interval.ms. For us, this has been tuned to a
> few seconds, and since we have a number of these aggregations in our
> job sequentially, this now results in many seconds of latency in the
> worst case for a tuple to travel through our topology.
>
> It seems too inflexible to have the flush interval always be the same
> as the commit interval across all aggregates. For certain aggregations
> which are idempotent regardless of messages being reprocessed, being
> able to flush more often than the commit interval seems like a very
> important option when lower latency is required. It would still make
> sense to flush every commit as well, but having an additional
> configuration to set the maximum time between state store flushes
> seems like it would solve our problem.
>
> In our case, we'd set our flush interval to a few hundred ms. Ideally,
> we would really prefer to be able to disable interval based flushing
> altogether (and just put + forward all processed records) for certain
> KTables that are low volume, latency sensitive, and which are
> idempotent under message reprocessing.
>
> Thanks for any help! Right now the only option it seems is for us to
> radically lower the commit interval and accept any leftover latency,
> but unless we can find a sweet spot this may be a blocker for us to
> moving to 0.10.1.

Reply via email to