Hello,
I'm trying to use kafka streams to aggregate some time series data using
1 second tumbling time windows. The data is ordered approximately by
timestamp with some "jitter" which I'm limiting at the input by a custom
TimestampExtractor that moves events into the future if they come in to
late guaranteeing that the timestamp of each event never jumps back for
more that 4 seconds according to previous most recent event timestamp. I
then give the tumbling windows a grace period of 5 seconds...
Here's a sample kafka streams processor:
KStream<String, Val> input
=builder.stream(inputTopic,Consumed.with(Serdes.String(), new
Val.Serde()).withTimestampExtractor((rec, prevTs) -> {String key =
(String) rec.key();Val val = (Val) rec.value();return
Math.max(val.getTimestamp(), Math.max(0L, prevTs -
4000));}));KStream<Windowed<String>, IntegerList> grouped
=input.groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(1)).advanceBy(Duration.ofSeconds(1)).grace(Duration.ofSeconds(5))).aggregate(IntegerList::new,(k,
v, list) -> {list.add(v.getValue());return
list;},Materialized.with(Serdes.String(), new
IntegerList.Serde())).suppress(Suppressed.untilWindowCloses(new
StrictBufferConfigImpl())).toStream();grouped.to(outputTopic,Produced.with(new
SelfSerde.TimeWindowed<>(Serdes.String()), new IntegerList.Serde()));
I'm using KTable.suppress with Suppressed.untilWindowCloses to suppress
all but final versions of aggregations. This works as expected and I
only get one final result per grouping key and window instance in the
output topic. But this only works as expected and advertised until I
restart the karfka streams process during the course of aggregating the
events. After restart, I can see some non-final versions of aggregations
in the output topic followed by final versions. So the guarantee
advertised by Suppressed.untilWindowCloses() which says:
/"This option is suitable for use cases in which the business logic
requires a hard guarantee that only the final result is propagated."/
...is only true when the kafka streams process is not restarted. Is this
expected behavior or maybe a bug?
Thanks,
Peter Levart