Hi Matthias, When I redeployment the application with the same application Id, it will cause a rebalance loop: partition revoked -> rebalance -> offset reset to zero -> partition assigned -> partition revoked.
The app was running well before the redeployment, but once redeployed, it will keep rebalancing for hours and I have to switch to a new application id to stop that. You mentioned that, reduce() could use RocksDB as stores by default while suppress() is in memory. Is that the reason that reduce() has both -repartition and -changelog topics while suppress() only has -changelog topic? And will that be related to the shutdown hook? If I don't provide shutdown hook and perform a redeployment, will it cause above issue? Thanks! On Thu, Oct 31, 2019 at 5:55 AM Matthias J. Sax <matth...@confluent.io> wrote: > > Just a follow up: currently, suppress() only supports in-memory stores > (note, that `suppress()` has it's own store). > > For the actually `reduce()` store, you can pick between RocksDB and > in-memory (default is RocksDB). Hence, if you restart an application on > the same host, it should not be necessary to reload the state from the > changelog topic if you use RocksDB. > > However, the suppress buffer must be recreated from the > suppress-changelog topics on restart atm. > > Originally, `suppress()` intended to support persistent stores as well, > but it was not implement yet. We hope to close this gap in the future. > > >>> I haven't figured out the reason, but after restart, the app will keep > >>> reset changelog topic offset to ZERO and trigger rebalance. > > Resetting to zero would happen is the full state needs to be recovered. > However, this should not result in a rebalance. Can you elaborate on the > rebalancing issue you described? > > > > -Matthias > > On 10/28/19 5:54 PM, Alex Brekken wrote: > > I assume you're using RocksDB as your state stores... The bytes out you're > > seeing on the changelog topics is probably because they are restoring your > > state stores. If your state stores are in-memory, then on every > > application startup they're going to be restored from the changelog > > topics. If your state stores are persistent (saved to disk), then a > > restore can still happen if you've lost your filesystem. (maybe you're > > doing a state store cleanup on startup/shutdown, or have temporal storage > > such as emptyDir in k8s, for example) So *I think* what you're seeing is > > normal, though if you want to dig deeper there are rocksdb metrics that can > > be exposed and will show restore related info. Additionally, there is a > > StateRestoreListener interface that you can implement if you'd like to log > > some of the state store restoration details. > > > > Alex > > > > On Mon, Oct 28, 2019 at 4:41 PM Xiyuan Hu <xiyuan.h...@gmail.com> wrote: > > > >> Hi, > >> I'm using 2.3.1 now and having the same issue. During restarting, I > >> noticed a lot logging like below: > >> Seeking to EARLIEST offset of partition > >> XX-KSTREAM-REDUCE-STATE-STORE-0000000014-changelog-41 > >> Seeking to EARLIEST offset of partition > >> XX-KTABLE-SUPPRESS-STATE-STORE-0000000020-changelog-41 > >> > >> After restarting, the bytesout of changelog topic is as high as > >> 800-900MB/s while normally, it has zero bytes out. Is this expected? > >> I haven't figured out the reason, but after restart, the app will keep > >> reset changelog topic offset to ZERO and trigger rebalance. It seems a > >> dead loop? > >> Rebalance -> reset to ZERO -> rebalance > >> > >> Is there any config I should set? > >> > >> Thanks! > >> > >> On Sun, Oct 27, 2019 at 11:25 PM Matthias J. Sax <matth...@confluent.io> > >> wrote: > >>> > >>> What version are you using? We fixed couple of bugs in `suppress()` -- I > >>> would recommend to use latest 2.3.1 bug-fix release. > >>> > >>> > >>> -Matthia > >>> > >>> On 10/25/19 9:12 AM, Tao Wang wrote: > >>>> When using suppress operator with windowed Ktable, it looks like > >> restarting the kafka stream causes the aggregated messages from the > >> SUPPRESS-STATE-STORE published again.. > >>>> > >>>> Here is the sudo code .. anything I am missing or anything can be done > >> to avoid this .. > >>>> > >>>> > >>>> KTable<Windowed<String>, String> test = <KStreamObject> > >>>> .groupByKey() > >>>> > >> .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ofSeconds(3))) > >>>> .aggregate( > >>>> .... > >>>> , > >>>> > >> > >> Materialized.<String,String,WindowStore<String,String>>as("aggregated-stream21-store") > >>>> .withRetention(Duration.ofMinutes(5)) > >>>> .with(Serdes.String(), Serdes.String()) > >>>> ) > >>>> > >> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) > >>>> > >>>> .toStream() > >>>> > >>>> .to("<topic_out>") > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> So when restarting the stream app, the <topic_out> will have > >> duplicated messages from a while back ... is this expected behavior ? > >>>> > >>>> Thanks, > >>>> Tao Wang > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> ________________________________ > >>>> > >>>> This message may contain confidential information and is intended for > >> specific recipients unless explicitly noted otherwise. If you have reason > >> to believe you are not an intended recipient of this message, please delete > >> it and notify the sender. This message may not represent the opinion of > >> Intercontinental Exchange, Inc. (ICE), its subsidiaries or affiliates, and > >> does not constitute a contract or guarantee. Unencrypted electronic mail is > >> not secure and the recipient of this message is expected to provide > >> safeguards from viruses and pursue alternate means of communication where > >> privacy or a binding message is desired. > >>>> > >>> > >> > > >