Hello,

I'm trying to use kafka streams to aggregate some time series data using 1 second tumbling time windows. The data is ordered approximately by timestamp with some "jitter" which I'm limiting at the input by a custom TimestampExtractor that moves events into the future if they come in to late guaranteeing that the timestamp of each event never jumps back for more that 4 seconds according to previous most recent event timestamp. I then give the tumbling windows a grace period of 5 seconds...

Here's a sample kafka streams processor:

KStream<String, Val> input =builder.stream(inputTopic,Consumed.with(Serdes.String(), new Val.Serde()).withTimestampExtractor((rec, prevTs) -> {String key = (String) rec.key();Val val = (Val) rec.value();return Math.max(val.getTimestamp(), Math.max(0L, prevTs - 4000));}));KStream<Windowed<String>, IntegerList> grouped =input.groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(1)).advanceBy(Duration.ofSeconds(1)).grace(Duration.ofSeconds(5))).aggregate(IntegerList::new,(k, v, list) -> {list.add(v.getValue());return list;},Materialized.with(Serdes.String(), new IntegerList.Serde())).suppress(Suppressed.untilWindowCloses(new StrictBufferConfigImpl())).toStream();grouped.to(outputTopic,Produced.with(new SelfSerde.TimeWindowed<>(Serdes.String()), new IntegerList.Serde()));


I'm using KTable.suppress with Suppressed.untilWindowCloses to suppress all but final versions of aggregations. This works as expected and I only get one final result per grouping key and window instance in the output topic. But this only works as expected and advertised until I restart the karfka streams process during the course of aggregating the events. After restart, I can see some non-final versions of aggregations in the output topic followed by final versions. So the guarantee advertised by Suppressed.untilWindowCloses() which says:

/"This option is suitable for use cases in which the business logic requires a hard guarantee that only the final result is propagated."/

...is only true when the kafka streams process is not restarted. Is this expected behavior or maybe a bug?

Thanks,

Peter Levart

Reply via email to