Hi Matthias, Could you help with above issue? Or any suggestions?
Thanks a lot! On Thu, Oct 31, 2019 at 4:00 PM Xiyuan Hu <xiyuan.h...@gmail.com> wrote: > > Hi Matthias, > > Some additional information, after I restart the app, it went to > endless rebalancing. Join rate loos like below attachment. It's > basically rebalanced every 5 minutes. I checked into each node > logging. And found below warning: > > On node A: > 2019/10/31 10:13:46 | 2019-10-31 10:13:46,543 WARN > [kafka-coordinator-heartbeat-thread | XXX] > o.a.k.c.c.i.AbstractCoordinator [Consumer > clientId=XXX-StreamThread-1-consumer, groupId=XXX] This member will > leave the group because consumer poll timeout has expired. This means > the time between subsequent calls to poll() was longer than the > configured max.poll.interval.ms, which typically implies that the poll > loop is spending too much time processing messages. You can address > this either by increasing max.poll.interval.ms or by reducing the > maximum size of batches returned in poll() with max.poll.records. > 2019/10/31 10:13:46 | 2019-10-31 10:13:46,544 INFO > [kafka-coordinator-heartbeat-thread | XXX] > o.a.k.c.c.i.AbstractCoordinator [Consumer > clientId=XXX-StreamThread-1-consumer, groupId=XXX] Member > XXX-StreamThread-1-consumer-7307ab88-9724-4af8-99b8-5d1c3ef5294f > sending LeaveGroup request to coordinator xx:9092 (id: 2147483644 > rack: null) > 2019/10/31 10:13:52 | 2019-10-31 10:13:52,766 INFO > [XXX-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore Opening store > KSTREAM-REDUCE-STATE-STORE-0000000003.1572480000000 in regular mode > 2019/10/31 10:13:52 | 2019-10-31 10:13:52,767 INFO > [XXX-StreamThread-1] o.a.k.s.p.i.StoreChangelogReader stream-thread > [XXX-StreamThread-1] Restoring task 1_3's state store > KTABLE-SUPPRESS-STATE-STORE-0000000009 from beginning of the changelog > XXX-KTABLE-SUPPRESS-STATE-STORE-0000000009-changelog-3 > 2019/10/31 10:13:52 | 2019-10-31 10:13:52,794 INFO > [XXX-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [Consumer > clientId=XXX-StreamThread-1-consumer, groupId=XXX] Revoking previously > assigned partitions > [XXX-KSTREAM-REDUCE-STATE-STORE-0000000003-repartition-3] > 2019/10/31 10:13:52 | 2019-10-31 10:13:52,794 INFO > [XXX-StreamThread-1] o.a.k.s.p.internals.StreamThread stream-thread > [XXX-StreamThread-1] State transition from PARTITIONS_ASSIGNED to > PARTITIONS_REVOKED > > While on other nodes say: 2019/10/31 10:13:47 Attempt to heartbeat > failed since group is rebalancing. > > If my understanding is correct, above warning caused the group > rebalancing? My questions are: > 1) Why it only happened after restart? > 2) Even if it rebalanced, why it keeps rebalancing like a endless > loop? I can't understand the behavior here. > 3) You mentioned reduce() is using RocksDB by default, but I also > noticed offset has been set to zero for reduce changelog topic. Is it > wrong? > > Thanks a lot for the help! > > On Thu, Oct 31, 2019 at 8:11 AM Xiyuan Hu <xiyuan.h...@gmail.com> wrote: > > > > Hi Matthias, > > > > When I redeployment the application with the same application Id, it > > will cause a rebalance loop: partition revoked -> rebalance -> offset > > reset to zero -> partition assigned -> partition revoked. > > > > The app was running well before the redeployment, but once redeployed, > > it will keep rebalancing for hours and I have to switch to a new > > application id to stop that. > > > > You mentioned that, reduce() could use RocksDB as stores by default > > while suppress() is in memory. Is that the reason that reduce() has > > both -repartition and -changelog topics while suppress() only has > > -changelog topic? > > > > And will that be related to the shutdown hook? If I don't provide > > shutdown hook and perform a redeployment, will it cause above issue? > > > > Thanks! > > > > On Thu, Oct 31, 2019 at 5:55 AM Matthias J. Sax <matth...@confluent.io> > > wrote: > > > > > > Just a follow up: currently, suppress() only supports in-memory stores > > > (note, that `suppress()` has it's own store). > > > > > > For the actually `reduce()` store, you can pick between RocksDB and > > > in-memory (default is RocksDB). Hence, if you restart an application on > > > the same host, it should not be necessary to reload the state from the > > > changelog topic if you use RocksDB. > > > > > > However, the suppress buffer must be recreated from the > > > suppress-changelog topics on restart atm. > > > > > > Originally, `suppress()` intended to support persistent stores as well, > > > but it was not implement yet. We hope to close this gap in the future. > > > > > > >>> I haven't figured out the reason, but after restart, the app will keep > > > >>> reset changelog topic offset to ZERO and trigger rebalance. > > > > > > Resetting to zero would happen is the full state needs to be recovered. > > > However, this should not result in a rebalance. Can you elaborate on the > > > rebalancing issue you described? > > > > > > > > > > > > -Matthias > > > > > > On 10/28/19 5:54 PM, Alex Brekken wrote: > > > > I assume you're using RocksDB as your state stores... The bytes out > > > > you're > > > > seeing on the changelog topics is probably because they are restoring > > > > your > > > > state stores. If your state stores are in-memory, then on every > > > > application startup they're going to be restored from the changelog > > > > topics. If your state stores are persistent (saved to disk), then a > > > > restore can still happen if you've lost your filesystem. (maybe you're > > > > doing a state store cleanup on startup/shutdown, or have temporal > > > > storage > > > > such as emptyDir in k8s, for example) So *I think* what you're seeing > > > > is > > > > normal, though if you want to dig deeper there are rocksdb metrics that > > > > can > > > > be exposed and will show restore related info. Additionally, there is a > > > > StateRestoreListener interface that you can implement if you'd like to > > > > log > > > > some of the state store restoration details. > > > > > > > > Alex > > > > > > > > On Mon, Oct 28, 2019 at 4:41 PM Xiyuan Hu <xiyuan.h...@gmail.com> wrote: > > > > > > > >> Hi, > > > >> I'm using 2.3.1 now and having the same issue. During restarting, I > > > >> noticed a lot logging like below: > > > >> Seeking to EARLIEST offset of partition > > > >> XX-KSTREAM-REDUCE-STATE-STORE-0000000014-changelog-41 > > > >> Seeking to EARLIEST offset of partition > > > >> XX-KTABLE-SUPPRESS-STATE-STORE-0000000020-changelog-41 > > > >> > > > >> After restarting, the bytesout of changelog topic is as high as > > > >> 800-900MB/s while normally, it has zero bytes out. Is this expected? > > > >> I haven't figured out the reason, but after restart, the app will keep > > > >> reset changelog topic offset to ZERO and trigger rebalance. It seems a > > > >> dead loop? > > > >> Rebalance -> reset to ZERO -> rebalance > > > >> > > > >> Is there any config I should set? > > > >> > > > >> Thanks! > > > >> > > > >> On Sun, Oct 27, 2019 at 11:25 PM Matthias J. Sax > > > >> <matth...@confluent.io> > > > >> wrote: > > > >>> > > > >>> What version are you using? We fixed couple of bugs in `suppress()` > > > >>> -- I > > > >>> would recommend to use latest 2.3.1 bug-fix release. > > > >>> > > > >>> > > > >>> -Matthia > > > >>> > > > >>> On 10/25/19 9:12 AM, Tao Wang wrote: > > > >>>> When using suppress operator with windowed Ktable, it looks like > > > >> restarting the kafka stream causes the aggregated messages from the > > > >> SUPPRESS-STATE-STORE published again.. > > > >>>> > > > >>>> Here is the sudo code .. anything I am missing or anything can be > > > >>>> done > > > >> to avoid this .. > > > >>>> > > > >>>> > > > >>>> KTable<Windowed<String>, String> test = <KStreamObject> > > > >>>> .groupByKey() > > > >>>> > > > >> .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ofSeconds(3))) > > > >>>> .aggregate( > > > >>>> .... > > > >>>> , > > > >>>> > > > >> > > > >> Materialized.<String,String,WindowStore<String,String>>as("aggregated-stream21-store") > > > >>>> .withRetention(Duration.ofMinutes(5)) > > > >>>> .with(Serdes.String(), Serdes.String()) > > > >>>> ) > > > >>>> > > > >> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) > > > >>>> > > > >>>> .toStream() > > > >>>> > > > >>>> .to("<topic_out>") > > > >>>> > > > >>>> > > > >>>> > > > >>>> > > > >>>> > > > >>>> So when restarting the stream app, the <topic_out> will have > > > >> duplicated messages from a while back ... is this expected behavior ? > > > >>>> > > > >>>> Thanks, > > > >>>> Tao Wang > > > >>>> > > > >>>> > > > >>>> > > > >>>> > > > >>>> > > > >>>> ________________________________ > > > >>>> > > > >>>> This message may contain confidential information and is intended for > > > >> specific recipients unless explicitly noted otherwise. If you have > > > >> reason > > > >> to believe you are not an intended recipient of this message, please > > > >> delete > > > >> it and notify the sender. This message may not represent the opinion of > > > >> Intercontinental Exchange, Inc. (ICE), its subsidiaries or affiliates, > > > >> and > > > >> does not constitute a contract or guarantee. Unencrypted electronic > > > >> mail is > > > >> not secure and the recipient of this message is expected to provide > > > >> safeguards from viruses and pursue alternate means of communication > > > >> where > > > >> privacy or a binding message is desired. > > > >>>> > > > >>> > > > >> > > > > > > >