Hi,
I'm using 2.3.1 now and having the same issue. During restarting, I
noticed a lot logging like below:
Seeking to EARLIEST offset of partition
XX-KSTREAM-REDUCE-STATE-STORE-0000000014-changelog-41
Seeking to EARLIEST offset of partition
XX-KTABLE-SUPPRESS-STATE-STORE-0000000020-changelog-41

After restarting, the bytesout of changelog topic is as high as
800-900MB/s while normally, it has zero bytes out. Is this expected?
I haven't figured out the reason, but after restart, the app will keep
reset changelog topic offset to ZERO and trigger rebalance. It seems a
dead loop?
Rebalance -> reset to ZERO -> rebalance

Is there any config I should set?

Thanks!

On Sun, Oct 27, 2019 at 11:25 PM Matthias J. Sax <matth...@confluent.io> wrote:
>
> What version are you using? We fixed couple of bugs in `suppress()` -- I
> would recommend to use latest 2.3.1 bug-fix release.
>
>
> -Matthia
>
> On 10/25/19 9:12 AM, Tao Wang wrote:
> > When using suppress operator with windowed Ktable, it looks like restarting 
> > the kafka stream causes the aggregated messages from the 
> > SUPPRESS-STATE-STORE published again..
> >
> > Here is the sudo code .. anything I am missing or anything can be done to 
> > avoid this ..
> >
> >
> > KTable<Windowed<String>, String> test = <KStreamObject>
> > .groupByKey()
> > .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ofSeconds(3)))
> > .aggregate(
> >         ....
> >         ,
> >         
> > Materialized.<String,String,WindowStore<String,String>>as("aggregated-stream21-store")
> >                     .withRetention(Duration.ofMinutes(5))
> >                     .with(Serdes.String(), Serdes.String())
> >    )
> > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> >
> > .toStream()
> >
> > .to("<topic_out>")
> >
> >
> >
> >
> >
> > So when restarting the stream app, the <topic_out> will have duplicated 
> > messages from a while back ... is this expected behavior ?
> >
> > Thanks,
> > Tao Wang
> >
> >
> >
> >
> >
> > ________________________________
> >
> > This message may contain confidential information and is intended for 
> > specific recipients unless explicitly noted otherwise. If you have reason 
> > to believe you are not an intended recipient of this message, please delete 
> > it and notify the sender. This message may not represent the opinion of 
> > Intercontinental Exchange, Inc. (ICE), its subsidiaries or affiliates, and 
> > does not constitute a contract or guarantee. Unencrypted electronic mail is 
> > not secure and the recipient of this message is expected to provide 
> > safeguards from viruses and pursue alternate means of communication where 
> > privacy or a binding message is desired.
> >
>

Reply via email to