On a stream definition I perform an "aggregate" which is configured with a state store.
*Goal*: Persist the aggregation results into a database, e.g. MySQL or MongoDB *Working approach*: I have created a custom StateStore backed by a changelog topic like the builtin state stores. Whenever the store gets flushed I save to the database, mark the record as being persisted and log the change in the changelog. If something goes wrong during processing, the changelog guarantees that I do not lose data, restores the state and if some data point was not persisted, the next stream instance will persist it on its flush operation. 1. I cannot store too much data in the changelog, even with compaction, if I have too much data, bootstrapping a stream instance would take a long time 2. On the other hand, if I take too long to recover from a failure, I may lose data. So there seems to be a delicate tradeoff here *Questions*: 1. Is this a reasonable use case? 2. In a scenario where my stream would have a fanout (multiple sub-streams based on the same stream), each branch would perform different "aggregate" operations, each with its own state store. Are state stores flushed in parallel or sequentially? 3. The above also applies per-partition. As a stream definition is parallelized by partition, will one instance hold different store instances for each one? 4. Through synthetic sleeps I simulated slow flushes, slower than the commit interval. The stream seems to be ok with it and didn't throw, I assume the Kafka consumer does not poll more records until all of the previous poll's are committed, but I couldn't find documentation to back this statement. Is there a timeout for "commit" operations? Sample code public class AggregateHolder { private Long commonKey; private List<Double> rawValues = new ArrayList<>(); private boolean persisted; // ... } And stream definition source.groupByKey(Serdes.String(), recordSerdes) .aggregate( AggregateHolder::new, (aggKey, value, aggregate) -> aggregate.addValue(value.getValue()), new DemoStoreSupplier<>(/* ... */) ) .foreach((key, agg) -> log.debug("Aggregate: {}={} ({})", key, agg.getAverage(), agg));