Hi Alessandro,

>  - how do I specify the retention period of the data? Just by setting the
> max retention time for the changelog topic?

For window and session stores, you can set retention time on a local
state store by using Materialized.withRetention(...). Consult the
javadocs for details. If the changelog topic on the broker does not
yet exist, the retention time of the changelog topic will have the
same retention time as the local store.

Additionally, you could try to set the retention time in the streams
configuration by using `StreamsConfig.TopicPrefix("retention.ms")`.

>  - wouldn't be possible, for example for my LastValueStore to compact the
> changelog and keep only the last value stored for each key? Because that's
> all I would need for my use case

That is what changelogs do. They just keep the last value written.

>  - why is it also writing that much?

Could it be that after restore of the state stores from the
changelogs, your stream app just starts working and there is a peek in
writes because the the app has a large lag initially? Maybe somebody
else has a better explanation.

Best,
Bruno

On Wed, Jul 10, 2019 at 12:39 AM Alessandro Tagliapietra
<tagliapietra.alessan...@gmail.com> wrote:
>
> I've just noticed that the store topic created automatically by our streams
> app have different cleanup.policy.
> I think that's the main reason I'm seeing that big read/write IO, having a
> compact policy instead of delete would make the topic much smaller.
> I'll try that to also see the impact on our storage usage.
>
> --
> Alessandro Tagliapietra
>
>
> On Tue, Jul 9, 2019 at 6:06 AM Alessandro Tagliapietra <
> tagliapietra.alessan...@gmail.com> wrote:
>
> > Hi Bruno,
> >
> > Oh I see, I'll try to add a persistent disk where the local stores are.
> > I've other questions then:
> >  - why is it also writing that much?
> >  - how do I specify the retention period of the data? Just by setting the
> > max retention time for the changelog topic?
> >  - wouldn't be possible, for example for my LastValueStore to compact the
> > changelog and keep only the last value stored for each key? Because that's
> > all I would need for my use case
> >
> > Thank you very much for your help
> >
> > On Tue, Jul 9, 2019, 4:00 AM Bruno Cadonna <br...@confluent.io> wrote:
> >
> >> Hi Alessandro,
> >>
> >> I am not sure I understand your issue completely. If you start your
> >> streams app in a new container without any existing local state, then
> >> it is expected that the changelog topics are read from the beginning
> >> to restore the local state stores. Am I misunderstanding you?
> >>
> >> Best,
> >> Bruno
> >>
> >> On Tue, Jul 9, 2019 at 6:42 AM Alessandro Tagliapietra
> >> <tagliapietra.alessan...@gmail.com> wrote:
> >> >
> >> > I think I'm having again this issue, this time though it only happens on
> >> > some state stores.
> >> >
> >> > Here you can find the code and the logs
> >> > https://gist.github.com/alex88/1d60f63f0eee9f1568d89d5e1900fffc
> >> > We first seen that our confluent cloud bill went up 10x, then seen that
> >> our
> >> > streams processor was restarted 12 times (kubernetes pod), checking
> >> > confluent cloud usage it seems that the writes/reads went up from the
> >> usual
> >> > 1-2 KB/s to 12-20 MB/s during app restarts.
> >> >
> >> > I've then deployed a new version on a new container (no local
> >> store/state)
> >> > to see what happened:
> >> >  - first, it logs everything up to line 460 of the log file in the gist
> >> >  - at this point confluent cloud reports high read usage and the
> >> consumer
> >> > lag starts to increase, the app is accumulating messages behind
> >> >  - after a certain point, writes go up as well
> >> >  - when streams app transition to RUNNING state, the final aggregation
> >> > function resumes back to where it stopped (without reprocessing old
> >> data)
> >> >  - consumer lag goes back to 0
> >> >
> >> > What makes me think it's re-reading everything are these lines:
> >> >
> >> > Resetting offset for partition
> >> > myapp-id-KSTREAM-AGGREGATE-STATE-STORE-0000000004-changelog-2 to offset
> >> > 20202847
> >> > Restoring task 0_2's state store
> >> KSTREAM-AGGREGATE-STATE-STORE-0000000004
> >> > from beginning of the changelog
> >> > myapp-id-KSTREAM-AGGREGATE-STATE-STORE-0000000004-changelog-2
> >> >
> >> > At first I thought it's because I don't persist the aggregate store
> >> > changelog as I do with the "LastValueStore" store which has
> >> > "withLoggingEnabled()", but even that store has:
> >> >
> >> > Resetting offset for partition myapp-id-LastValueStore-changelog-0 to
> >> > offset 403910
> >> > Restoring task 0_0's state store LastValueStore from beginning of the
> >> > changelog myapp-id-LastValueStore-changelog-0
> >> >
> >> > Thank you everyone in advance
> >> >
> >> > --
> >> > Alessandro Tagliapietra
> >> >
> >> > On Thu, Jun 6, 2019 at 11:37 AM Alessandro Tagliapietra <
> >> > tagliapietra.alessan...@gmail.com> wrote:
> >> >
> >> > > I'm not sure, one thing I know for sure is that on the cloud control
> >> > > panel, in the consumer lag page, the offset didn't reset on the input
> >> > > topic, so it was probably something after that.
> >> > >
> >> > > Anyway, thanks a lot for helping, if we experience that again I'll
> >> try to
> >> > > add more verbose logging to better understand what's going on.
> >> > >
> >> > > Have a great day
> >> > >
> >> > > --
> >> > > Alessandro Tagliapietra
> >> > >
> >> > >
> >> > > On Thu, Jun 6, 2019 at 11:27 AM Guozhang Wang <wangg...@gmail.com>
> >> wrote:
> >> > >
> >> > >> Honestly I cannot think of an issue that fixed in 2.2.1 but not in
> >> 2.2.0
> >> > >> which could be correlated to your observations:
> >> > >>
> >> > >>
> >> > >>
> >> https://issues.apache.org/jira/issues/?filter=-1&jql=project%20%3D%20KAFKA%20AND%20resolution%20%3D%20Fixed%20AND%20fixVersion%20%3D%202.2.1%20AND%20component%20%3D%20streams%20order%20by%20updated%20DESC
> >> > >>
> >> > >> If you observed that on the cloud, both partitions of the source
> >> topic
> >> > >> gets
> >> > >> re-processed from the beginning, then it means the committed offsets
> >> were
> >> > >> somehow lost, and hence has to start reading the source topic from
> >> > >> scratch.
> >> > >> If this is a re-producible issue maybe there are some lurking things
> >> in
> >> > >> 2.2.0.
> >> > >>
> >> > >> On Thu, Jun 6, 2019 at 11:10 AM Alessandro Tagliapietra <
> >> > >> tagliapietra.alessan...@gmail.com> wrote:
> >> > >>
> >> > >> > Yes that's right,
> >> > >> >
> >> > >> > could that be the problem? Anyway, so far after upgrading to 2.2.1
> >> from
> >> > >> > 2.2.0 we didn't experience that problem anymore.
> >> > >> >
> >> > >> > Regards
> >> > >> >
> >> > >> > --
> >> > >> > Alessandro Tagliapietra
> >> > >> >
> >> > >> >
> >> > >> > On Thu, Jun 6, 2019 at 10:50 AM Guozhang Wang <wangg...@gmail.com>
> >> > >> wrote:
> >> > >> >
> >> > >> > > That's right, but local state is used as a "materialized view"
> >> of your
> >> > >> > > changelog topics: if you have nothing locally, then it has to
> >> > >> bootstrap
> >> > >> > > from the beginning of your changelog topic.
> >> > >> > >
> >> > >> > > But I think your question was about the source "sensors-input"
> >> topic,
> >> > >> not
> >> > >> > > the changelog topic. I looked at the logs from two runs, and it
> >> seems
> >> > >> > > locally your sensors-input has one partition, but on the cloud
> >> your
> >> > >> > > sensors-input has two partitions. Is that right?
> >> > >> > >
> >> > >> > >
> >> > >> > > Guozhang
> >> > >> > >
> >> > >> > >
> >> > >> > > On Thu, Jun 6, 2019 at 10:23 AM Alessandro Tagliapietra <
> >> > >> > > tagliapietra.alessan...@gmail.com> wrote:
> >> > >> > >
> >> > >> > > > Isn't the windowing state stored in the additional state store
> >> > >> topics
> >> > >> > > that
> >> > >> > > > I had to additionally create?
> >> > >> > > >
> >> > >> > > > Like these I have here:
> >> > >> > > >
> >> > >> > > >
> >> sensors-pipeline-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog
> >> > >> > > >
> >> sensors-pipeline-KTABLE-SUPPRESS-STATE-STORE-0000000004-changelog
> >> > >> > > >
> >> > >> > > > Thank you
> >> > >> > > >
> >> > >> > > > --
> >> > >> > > > Alessandro Tagliapietra
> >> > >> > > >
> >> > >> > > >
> >> > >> > > > On Thu, Jun 6, 2019 at 10:13 AM Guozhang Wang <
> >> wangg...@gmail.com>
> >> > >> > > wrote:
> >> > >> > > >
> >> > >> > > > > If you deploy your streams app into a docker container, you'd
> >> > >> need to
> >> > >> > > > make
> >> > >> > > > > sure local state directories are preserved, since otherwise
> >> > >> whenever
> >> > >> > > you
> >> > >> > > > > restart all the state would be lost and Streams then has to
> >> > >> bootstrap
> >> > >> > > > from
> >> > >> > > > > scratch. E.g. if you are using K8s for cluster management,
> >> you'd
> >> > >> > better
> >> > >> > > > use
> >> > >> > > > > stateful sets to make sure local states are preserves across
> >> > >> > > > re-deployment.
> >> > >> > > > >
> >> > >> > > > >
> >> > >> > > > > Guozhang
> >> > >> > > > >
> >> > >> > > > > On Wed, Jun 5, 2019 at 4:52 PM Alessandro Tagliapietra <
> >> > >> > > > > tagliapietra.alessan...@gmail.com> wrote:
> >> > >> > > > >
> >> > >> > > > > > Hi Guozhang,
> >> > >> > > > > >
> >> > >> > > > > > sorry, by "app" i mean the stream processor app, the one
> >> shown
> >> > >> in
> >> > >> > > > > > pipeline.kt.
> >> > >> > > > > >
> >> > >> > > > > > The app reads a topic of data sent by a sensor each second
> >> and
> >> > >> > > > generates
> >> > >> > > > > a
> >> > >> > > > > > 20 second window output to another topic.
> >> > >> > > > > > My "problem" is that when running locally with my local
> >> kafka
> >> > >> > setup,
> >> > >> > > > > let's
> >> > >> > > > > > say I stop it and start it again, it continues processing
> >> the
> >> > >> last
> >> > >> > > > > window.
> >> > >> > > > > > When deploying the app into a docker container and using
> >> the
> >> > >> > > confluent
> >> > >> > > > > > cloud as broker, every time I restart the app it starts
> >> > >> processing
> >> > >> > > > again
> >> > >> > > > > > from the beginning of the input topic and generates again
> >> old
> >> > >> > windows
> >> > >> > > > it
> >> > >> > > > > > already processed.
> >> > >> > > > > >
> >> > >> > > > > > In the meantime I'm trying to upgrade to kafka 2.2.1 to
> >> see if I
> >> > >> > get
> >> > >> > > > any
> >> > >> > > > > > improvement.
> >> > >> > > > > >
> >> > >> > > > > > --
> >> > >> > > > > > Alessandro Tagliapietra
> >> > >> > > > > >
> >> > >> > > > > >
> >> > >> > > > > > On Wed, Jun 5, 2019 at 4:45 PM Guozhang Wang <
> >> > >> wangg...@gmail.com>
> >> > >> > > > wrote:
> >> > >> > > > > >
> >> > >> > > > > > > Hello Alessandro,
> >> > >> > > > > > >
> >> > >> > > > > > > What did you do for `restarting the app online`? I'm not
> >> sure
> >> > >> I
> >> > >> > > > follow
> >> > >> > > > > > the
> >> > >> > > > > > > difference between "restart the streams app" and
> >> "restart the
> >> > >> app
> >> > >> > > > > online"
> >> > >> > > > > > > from your description.
> >> > >> > > > > > >
> >> > >> > > > > > >
> >> > >> > > > > > > Guozhang
> >> > >> > > > > > >
> >> > >> > > > > > >
> >> > >> > > > > > > On Wed, Jun 5, 2019 at 10:42 AM Alessandro Tagliapietra <
> >> > >> > > > > > > tagliapietra.alessan...@gmail.com> wrote:
> >> > >> > > > > > > >
> >> > >> > > > > > > > Hello everyone,
> >> > >> > > > > > > >
> >> > >> > > > > > > > I've a small streams app, the configuration and part
> >> of the
> >> > >> > code
> >> > >> > > > I'm
> >> > >> > > > > > > using
> >> > >> > > > > > > > can be found here
> >> > >> > > > > > > >
> >> > >> > https://gist.github.com/alex88/6b7b31c2b008817a24f63246557099bc
> >> > >> > > > > > > > There's also the log when the app is started locally
> >> and
> >> > >> when
> >> > >> > the
> >> > >> > > > app
> >> > >> > > > > > is
> >> > >> > > > > > > > started on our servers connecting to the confluent
> >> cloud
> >> > >> kafka
> >> > >> > > > > broker.
> >> > >> > > > > > > >
> >> > >> > > > > > > > The problem is that locally everything is working
> >> properly,
> >> > >> if
> >> > >> > I
> >> > >> > > > > > restart
> >> > >> > > > > > > > the streams app it just continues where it left, if I
> >> > >> restart
> >> > >> > the
> >> > >> > > > app
> >> > >> > > > > > > > online it reprocesses the whole topic.
> >> > >> > > > > > > >
> >> > >> > > > > > > > That shouldn't happen right?
> >> > >> > > > > > > >
> >> > >> > > > > > > > Thanks in advance
> >> > >> > > > > > > >
> >> > >> > > > > > > > --
> >> > >> > > > > > > > Alessandro Tagliapietra
> >> > >> > > > > > >
> >> > >> > > > > > >
> >> > >> > > > > > >
> >> > >> > > > > > > --
> >> > >> > > > > > > -- Guozhang
> >> > >> > > > > > >
> >> > >> > > > > >
> >> > >> > > > >
> >> > >> > > > >
> >> > >> > > > > --
> >> > >> > > > > -- Guozhang
> >> > >> > > > >
> >> > >> > > >
> >> > >> > >
> >> > >> > >
> >> > >> > > --
> >> > >> > > -- Guozhang
> >> > >> > >
> >> > >> >
> >> > >>
> >> > >>
> >> > >> --
> >> > >> -- Guozhang
> >> > >>
> >> > >
> >>
> >

Reply via email to