Hi, I'm using 2.3.1 now and having the same issue. During restarting, I noticed a lot logging like below: Seeking to EARLIEST offset of partition XX-KSTREAM-REDUCE-STATE-STORE-0000000014-changelog-41 Seeking to EARLIEST offset of partition XX-KTABLE-SUPPRESS-STATE-STORE-0000000020-changelog-41
After restarting, the bytesout of changelog topic is as high as 800-900MB/s while normally, it has zero bytes out. Is this expected? I haven't figured out the reason, but after restart, the app will keep reset changelog topic offset to ZERO and trigger rebalance. It seems a dead loop? Rebalance -> reset to ZERO -> rebalance Is there any config I should set? Thanks! On Sun, Oct 27, 2019 at 11:25 PM Matthias J. Sax <matth...@confluent.io> wrote: > > What version are you using? We fixed couple of bugs in `suppress()` -- I > would recommend to use latest 2.3.1 bug-fix release. > > > -Matthia > > On 10/25/19 9:12 AM, Tao Wang wrote: > > When using suppress operator with windowed Ktable, it looks like restarting > > the kafka stream causes the aggregated messages from the > > SUPPRESS-STATE-STORE published again.. > > > > Here is the sudo code .. anything I am missing or anything can be done to > > avoid this .. > > > > > > KTable<Windowed<String>, String> test = <KStreamObject> > > .groupByKey() > > .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ofSeconds(3))) > > .aggregate( > > .... > > , > > > > Materialized.<String,String,WindowStore<String,String>>as("aggregated-stream21-store") > > .withRetention(Duration.ofMinutes(5)) > > .with(Serdes.String(), Serdes.String()) > > ) > > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) > > > > .toStream() > > > > .to("<topic_out>") > > > > > > > > > > > > So when restarting the stream app, the <topic_out> will have duplicated > > messages from a while back ... is this expected behavior ? > > > > Thanks, > > Tao Wang > > > > > > > > > > > > ________________________________ > > > > This message may contain confidential information and is intended for > > specific recipients unless explicitly noted otherwise. If you have reason > > to believe you are not an intended recipient of this message, please delete > > it and notify the sender. This message may not represent the opinion of > > Intercontinental Exchange, Inc. (ICE), its subsidiaries or affiliates, and > > does not constitute a contract or guarantee. Unencrypted electronic mail is > > not secure and the recipient of this message is expected to provide > > safeguards from viruses and pursue alternate means of communication where > > privacy or a binding message is desired. > > >