On a stream definition I perform an "aggregate" which is configured with a
state store.

*Goal*: Persist the aggregation results into a database, e.g. MySQL or
MongoDB

*Working approach*:
I have created a custom StateStore backed by a changelog topic like the
builtin state stores. Whenever the store gets flushed I save to the
database, mark the record as being persisted and log the change in the
changelog.

If something goes wrong during processing, the changelog guarantees that I
do not lose data, restores the state and if some data point was not
persisted, the next stream instance will persist it on its flush operation.

1. I cannot store too much data in the changelog, even with compaction, if
I have too much data, bootstrapping a stream instance would take a long time
2. On the other hand, if I take too long to recover from a failure, I may
lose data. So there seems to be a delicate tradeoff here

*Questions*:

1. Is this a reasonable use case?
2. In a scenario where my stream would have a fanout (multiple sub-streams
based on the same stream), each branch would perform different "aggregate"
operations, each with its own state store. Are state stores flushed in
parallel or sequentially?
3. The above also applies per-partition. As a stream definition is
parallelized by partition, will one instance hold different store instances
for each one?
4. Through synthetic sleeps I simulated slow flushes, slower than the
commit interval. The stream seems to be ok with it and didn't throw, I
assume the Kafka consumer does not poll more records until all of the
previous poll's are committed, but I couldn't find documentation to back
this statement. Is there a timeout for "commit" operations?


Sample code

public class AggregateHolder {

    private Long commonKey;
    private List<Double> rawValues = new ArrayList<>();
    private boolean persisted;

// ...
}

And stream definition

source.groupByKey(Serdes.String(), recordSerdes)
              .aggregate(
                      AggregateHolder::new,
                      (aggKey, value, aggregate) ->
aggregate.addValue(value.getValue()),
                      new DemoStoreSupplier<>(/* ... */)
              )
              .foreach((key, agg) -> log.debug("Aggregate: {}={} ({})",
key, agg.getAverage(), agg));

Reply via email to