When using suppress operator with windowed Ktable, it looks like restarting the
kafka stream causes the aggregated messages from the SUPPRESS-STATE-STORE
published again..
Here is the sudo code .. anything I am missing or anything can be done to avoid
this ..
KTable<Windowed<String>, String> test = <KStreamObject>
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ofSeconds(3)))
.aggregate(
....
,
Materialized.<String,String,WindowStore<String,String>>as("aggregated-stream21-store")
.withRetention(Duration.ofMinutes(5))
.with(Serdes.String(), Serdes.String())
)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.to("<topic_out>")
So when restarting the stream app, the <topic_out> will have duplicated
messages from a while back ... is this expected behavior ?
Thanks,
Tao Wang
________________________________
This message may contain confidential information and is intended for specific
recipients unless explicitly noted otherwise. If you have reason to believe you
are not an intended recipient of this message, please delete it and notify the
sender. This message may not represent the opinion of Intercontinental
Exchange, Inc. (ICE), its subsidiaries or affiliates, and does not constitute a
contract or guarantee. Unencrypted electronic mail is not secure and the
recipient of this message is expected to provide safeguards from viruses and
pursue alternate means of communication where privacy or a binding message is
desired.