Hi Bruno,

Oh I see, I'll try to add a persistent disk where the local stores are.
I've other questions then:
 - why is it also writing that much?
 - how do I specify the retention period of the data? Just by setting the
max retention time for the changelog topic?
 - wouldn't be possible, for example for my LastValueStore to compact the
changelog and keep only the last value stored for each key? Because that's
all I would need for my use case

Thank you very much for your help

On Tue, Jul 9, 2019, 4:00 AM Bruno Cadonna <br...@confluent.io> wrote:

> Hi Alessandro,
>
> I am not sure I understand your issue completely. If you start your
> streams app in a new container without any existing local state, then
> it is expected that the changelog topics are read from the beginning
> to restore the local state stores. Am I misunderstanding you?
>
> Best,
> Bruno
>
> On Tue, Jul 9, 2019 at 6:42 AM Alessandro Tagliapietra
> <tagliapietra.alessan...@gmail.com> wrote:
> >
> > I think I'm having again this issue, this time though it only happens on
> > some state stores.
> >
> > Here you can find the code and the logs
> > https://gist.github.com/alex88/1d60f63f0eee9f1568d89d5e1900fffc
> > We first seen that our confluent cloud bill went up 10x, then seen that
> our
> > streams processor was restarted 12 times (kubernetes pod), checking
> > confluent cloud usage it seems that the writes/reads went up from the
> usual
> > 1-2 KB/s to 12-20 MB/s during app restarts.
> >
> > I've then deployed a new version on a new container (no local
> store/state)
> > to see what happened:
> >  - first, it logs everything up to line 460 of the log file in the gist
> >  - at this point confluent cloud reports high read usage and the consumer
> > lag starts to increase, the app is accumulating messages behind
> >  - after a certain point, writes go up as well
> >  - when streams app transition to RUNNING state, the final aggregation
> > function resumes back to where it stopped (without reprocessing old data)
> >  - consumer lag goes back to 0
> >
> > What makes me think it's re-reading everything are these lines:
> >
> > Resetting offset for partition
> > myapp-id-KSTREAM-AGGREGATE-STATE-STORE-0000000004-changelog-2 to offset
> > 20202847
> > Restoring task 0_2's state store KSTREAM-AGGREGATE-STATE-STORE-0000000004
> > from beginning of the changelog
> > myapp-id-KSTREAM-AGGREGATE-STATE-STORE-0000000004-changelog-2
> >
> > At first I thought it's because I don't persist the aggregate store
> > changelog as I do with the "LastValueStore" store which has
> > "withLoggingEnabled()", but even that store has:
> >
> > Resetting offset for partition myapp-id-LastValueStore-changelog-0 to
> > offset 403910
> > Restoring task 0_0's state store LastValueStore from beginning of the
> > changelog myapp-id-LastValueStore-changelog-0
> >
> > Thank you everyone in advance
> >
> > --
> > Alessandro Tagliapietra
> >
> > On Thu, Jun 6, 2019 at 11:37 AM Alessandro Tagliapietra <
> > tagliapietra.alessan...@gmail.com> wrote:
> >
> > > I'm not sure, one thing I know for sure is that on the cloud control
> > > panel, in the consumer lag page, the offset didn't reset on the input
> > > topic, so it was probably something after that.
> > >
> > > Anyway, thanks a lot for helping, if we experience that again I'll try
> to
> > > add more verbose logging to better understand what's going on.
> > >
> > > Have a great day
> > >
> > > --
> > > Alessandro Tagliapietra
> > >
> > >
> > > On Thu, Jun 6, 2019 at 11:27 AM Guozhang Wang <wangg...@gmail.com>
> wrote:
> > >
> > >> Honestly I cannot think of an issue that fixed in 2.2.1 but not in
> 2.2.0
> > >> which could be correlated to your observations:
> > >>
> > >>
> > >>
> https://issues.apache.org/jira/issues/?filter=-1&jql=project%20%3D%20KAFKA%20AND%20resolution%20%3D%20Fixed%20AND%20fixVersion%20%3D%202.2.1%20AND%20component%20%3D%20streams%20order%20by%20updated%20DESC
> > >>
> > >> If you observed that on the cloud, both partitions of the source topic
> > >> gets
> > >> re-processed from the beginning, then it means the committed offsets
> were
> > >> somehow lost, and hence has to start reading the source topic from
> > >> scratch.
> > >> If this is a re-producible issue maybe there are some lurking things
> in
> > >> 2.2.0.
> > >>
> > >> On Thu, Jun 6, 2019 at 11:10 AM Alessandro Tagliapietra <
> > >> tagliapietra.alessan...@gmail.com> wrote:
> > >>
> > >> > Yes that's right,
> > >> >
> > >> > could that be the problem? Anyway, so far after upgrading to 2.2.1
> from
> > >> > 2.2.0 we didn't experience that problem anymore.
> > >> >
> > >> > Regards
> > >> >
> > >> > --
> > >> > Alessandro Tagliapietra
> > >> >
> > >> >
> > >> > On Thu, Jun 6, 2019 at 10:50 AM Guozhang Wang <wangg...@gmail.com>
> > >> wrote:
> > >> >
> > >> > > That's right, but local state is used as a "materialized view" of
> your
> > >> > > changelog topics: if you have nothing locally, then it has to
> > >> bootstrap
> > >> > > from the beginning of your changelog topic.
> > >> > >
> > >> > > But I think your question was about the source "sensors-input"
> topic,
> > >> not
> > >> > > the changelog topic. I looked at the logs from two runs, and it
> seems
> > >> > > locally your sensors-input has one partition, but on the cloud
> your
> > >> > > sensors-input has two partitions. Is that right?
> > >> > >
> > >> > >
> > >> > > Guozhang
> > >> > >
> > >> > >
> > >> > > On Thu, Jun 6, 2019 at 10:23 AM Alessandro Tagliapietra <
> > >> > > tagliapietra.alessan...@gmail.com> wrote:
> > >> > >
> > >> > > > Isn't the windowing state stored in the additional state store
> > >> topics
> > >> > > that
> > >> > > > I had to additionally create?
> > >> > > >
> > >> > > > Like these I have here:
> > >> > > >
> > >> > > >
> sensors-pipeline-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog
> > >> > > >
> sensors-pipeline-KTABLE-SUPPRESS-STATE-STORE-0000000004-changelog
> > >> > > >
> > >> > > > Thank you
> > >> > > >
> > >> > > > --
> > >> > > > Alessandro Tagliapietra
> > >> > > >
> > >> > > >
> > >> > > > On Thu, Jun 6, 2019 at 10:13 AM Guozhang Wang <
> wangg...@gmail.com>
> > >> > > wrote:
> > >> > > >
> > >> > > > > If you deploy your streams app into a docker container, you'd
> > >> need to
> > >> > > > make
> > >> > > > > sure local state directories are preserved, since otherwise
> > >> whenever
> > >> > > you
> > >> > > > > restart all the state would be lost and Streams then has to
> > >> bootstrap
> > >> > > > from
> > >> > > > > scratch. E.g. if you are using K8s for cluster management,
> you'd
> > >> > better
> > >> > > > use
> > >> > > > > stateful sets to make sure local states are preserves across
> > >> > > > re-deployment.
> > >> > > > >
> > >> > > > >
> > >> > > > > Guozhang
> > >> > > > >
> > >> > > > > On Wed, Jun 5, 2019 at 4:52 PM Alessandro Tagliapietra <
> > >> > > > > tagliapietra.alessan...@gmail.com> wrote:
> > >> > > > >
> > >> > > > > > Hi Guozhang,
> > >> > > > > >
> > >> > > > > > sorry, by "app" i mean the stream processor app, the one
> shown
> > >> in
> > >> > > > > > pipeline.kt.
> > >> > > > > >
> > >> > > > > > The app reads a topic of data sent by a sensor each second
> and
> > >> > > > generates
> > >> > > > > a
> > >> > > > > > 20 second window output to another topic.
> > >> > > > > > My "problem" is that when running locally with my local
> kafka
> > >> > setup,
> > >> > > > > let's
> > >> > > > > > say I stop it and start it again, it continues processing
> the
> > >> last
> > >> > > > > window.
> > >> > > > > > When deploying the app into a docker container and using the
> > >> > > confluent
> > >> > > > > > cloud as broker, every time I restart the app it starts
> > >> processing
> > >> > > > again
> > >> > > > > > from the beginning of the input topic and generates again
> old
> > >> > windows
> > >> > > > it
> > >> > > > > > already processed.
> > >> > > > > >
> > >> > > > > > In the meantime I'm trying to upgrade to kafka 2.2.1 to see
> if I
> > >> > get
> > >> > > > any
> > >> > > > > > improvement.
> > >> > > > > >
> > >> > > > > > --
> > >> > > > > > Alessandro Tagliapietra
> > >> > > > > >
> > >> > > > > >
> > >> > > > > > On Wed, Jun 5, 2019 at 4:45 PM Guozhang Wang <
> > >> wangg...@gmail.com>
> > >> > > > wrote:
> > >> > > > > >
> > >> > > > > > > Hello Alessandro,
> > >> > > > > > >
> > >> > > > > > > What did you do for `restarting the app online`? I'm not
> sure
> > >> I
> > >> > > > follow
> > >> > > > > > the
> > >> > > > > > > difference between "restart the streams app" and "restart
> the
> > >> app
> > >> > > > > online"
> > >> > > > > > > from your description.
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > > Guozhang
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > > On Wed, Jun 5, 2019 at 10:42 AM Alessandro Tagliapietra <
> > >> > > > > > > tagliapietra.alessan...@gmail.com> wrote:
> > >> > > > > > > >
> > >> > > > > > > > Hello everyone,
> > >> > > > > > > >
> > >> > > > > > > > I've a small streams app, the configuration and part of
> the
> > >> > code
> > >> > > > I'm
> > >> > > > > > > using
> > >> > > > > > > > can be found here
> > >> > > > > > > >
> > >> > https://gist.github.com/alex88/6b7b31c2b008817a24f63246557099bc
> > >> > > > > > > > There's also the log when the app is started locally and
> > >> when
> > >> > the
> > >> > > > app
> > >> > > > > > is
> > >> > > > > > > > started on our servers connecting to the confluent cloud
> > >> kafka
> > >> > > > > broker.
> > >> > > > > > > >
> > >> > > > > > > > The problem is that locally everything is working
> properly,
> > >> if
> > >> > I
> > >> > > > > > restart
> > >> > > > > > > > the streams app it just continues where it left, if I
> > >> restart
> > >> > the
> > >> > > > app
> > >> > > > > > > > online it reprocesses the whole topic.
> > >> > > > > > > >
> > >> > > > > > > > That shouldn't happen right?
> > >> > > > > > > >
> > >> > > > > > > > Thanks in advance
> > >> > > > > > > >
> > >> > > > > > > > --
> > >> > > > > > > > Alessandro Tagliapietra
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > > --
> > >> > > > > > > -- Guozhang
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > > >
> > >> > > > > --
> > >> > > > > -- Guozhang
> > >> > > > >
> > >> > > >
> > >> > >
> > >> > >
> > >> > > --
> > >> > > -- Guozhang
> > >> > >
> > >> >
> > >>
> > >>
> > >> --
> > >> -- Guozhang
> > >>
> > >
>

Reply via email to