I've just noticed that the store topic created automatically by our streams
app have different cleanup.policy.
I think that's the main reason I'm seeing that big read/write IO, having a
compact policy instead of delete would make the topic much smaller.
I'll try that to also see the impact on our storage usage.

--
Alessandro Tagliapietra


On Tue, Jul 9, 2019 at 6:06 AM Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:

> Hi Bruno,
>
> Oh I see, I'll try to add a persistent disk where the local stores are.
> I've other questions then:
>  - why is it also writing that much?
>  - how do I specify the retention period of the data? Just by setting the
> max retention time for the changelog topic?
>  - wouldn't be possible, for example for my LastValueStore to compact the
> changelog and keep only the last value stored for each key? Because that's
> all I would need for my use case
>
> Thank you very much for your help
>
> On Tue, Jul 9, 2019, 4:00 AM Bruno Cadonna <br...@confluent.io> wrote:
>
>> Hi Alessandro,
>>
>> I am not sure I understand your issue completely. If you start your
>> streams app in a new container without any existing local state, then
>> it is expected that the changelog topics are read from the beginning
>> to restore the local state stores. Am I misunderstanding you?
>>
>> Best,
>> Bruno
>>
>> On Tue, Jul 9, 2019 at 6:42 AM Alessandro Tagliapietra
>> <tagliapietra.alessan...@gmail.com> wrote:
>> >
>> > I think I'm having again this issue, this time though it only happens on
>> > some state stores.
>> >
>> > Here you can find the code and the logs
>> > https://gist.github.com/alex88/1d60f63f0eee9f1568d89d5e1900fffc
>> > We first seen that our confluent cloud bill went up 10x, then seen that
>> our
>> > streams processor was restarted 12 times (kubernetes pod), checking
>> > confluent cloud usage it seems that the writes/reads went up from the
>> usual
>> > 1-2 KB/s to 12-20 MB/s during app restarts.
>> >
>> > I've then deployed a new version on a new container (no local
>> store/state)
>> > to see what happened:
>> >  - first, it logs everything up to line 460 of the log file in the gist
>> >  - at this point confluent cloud reports high read usage and the
>> consumer
>> > lag starts to increase, the app is accumulating messages behind
>> >  - after a certain point, writes go up as well
>> >  - when streams app transition to RUNNING state, the final aggregation
>> > function resumes back to where it stopped (without reprocessing old
>> data)
>> >  - consumer lag goes back to 0
>> >
>> > What makes me think it's re-reading everything are these lines:
>> >
>> > Resetting offset for partition
>> > myapp-id-KSTREAM-AGGREGATE-STATE-STORE-0000000004-changelog-2 to offset
>> > 20202847
>> > Restoring task 0_2's state store
>> KSTREAM-AGGREGATE-STATE-STORE-0000000004
>> > from beginning of the changelog
>> > myapp-id-KSTREAM-AGGREGATE-STATE-STORE-0000000004-changelog-2
>> >
>> > At first I thought it's because I don't persist the aggregate store
>> > changelog as I do with the "LastValueStore" store which has
>> > "withLoggingEnabled()", but even that store has:
>> >
>> > Resetting offset for partition myapp-id-LastValueStore-changelog-0 to
>> > offset 403910
>> > Restoring task 0_0's state store LastValueStore from beginning of the
>> > changelog myapp-id-LastValueStore-changelog-0
>> >
>> > Thank you everyone in advance
>> >
>> > --
>> > Alessandro Tagliapietra
>> >
>> > On Thu, Jun 6, 2019 at 11:37 AM Alessandro Tagliapietra <
>> > tagliapietra.alessan...@gmail.com> wrote:
>> >
>> > > I'm not sure, one thing I know for sure is that on the cloud control
>> > > panel, in the consumer lag page, the offset didn't reset on the input
>> > > topic, so it was probably something after that.
>> > >
>> > > Anyway, thanks a lot for helping, if we experience that again I'll
>> try to
>> > > add more verbose logging to better understand what's going on.
>> > >
>> > > Have a great day
>> > >
>> > > --
>> > > Alessandro Tagliapietra
>> > >
>> > >
>> > > On Thu, Jun 6, 2019 at 11:27 AM Guozhang Wang <wangg...@gmail.com>
>> wrote:
>> > >
>> > >> Honestly I cannot think of an issue that fixed in 2.2.1 but not in
>> 2.2.0
>> > >> which could be correlated to your observations:
>> > >>
>> > >>
>> > >>
>> https://issues.apache.org/jira/issues/?filter=-1&jql=project%20%3D%20KAFKA%20AND%20resolution%20%3D%20Fixed%20AND%20fixVersion%20%3D%202.2.1%20AND%20component%20%3D%20streams%20order%20by%20updated%20DESC
>> > >>
>> > >> If you observed that on the cloud, both partitions of the source
>> topic
>> > >> gets
>> > >> re-processed from the beginning, then it means the committed offsets
>> were
>> > >> somehow lost, and hence has to start reading the source topic from
>> > >> scratch.
>> > >> If this is a re-producible issue maybe there are some lurking things
>> in
>> > >> 2.2.0.
>> > >>
>> > >> On Thu, Jun 6, 2019 at 11:10 AM Alessandro Tagliapietra <
>> > >> tagliapietra.alessan...@gmail.com> wrote:
>> > >>
>> > >> > Yes that's right,
>> > >> >
>> > >> > could that be the problem? Anyway, so far after upgrading to 2.2.1
>> from
>> > >> > 2.2.0 we didn't experience that problem anymore.
>> > >> >
>> > >> > Regards
>> > >> >
>> > >> > --
>> > >> > Alessandro Tagliapietra
>> > >> >
>> > >> >
>> > >> > On Thu, Jun 6, 2019 at 10:50 AM Guozhang Wang <wangg...@gmail.com>
>> > >> wrote:
>> > >> >
>> > >> > > That's right, but local state is used as a "materialized view"
>> of your
>> > >> > > changelog topics: if you have nothing locally, then it has to
>> > >> bootstrap
>> > >> > > from the beginning of your changelog topic.
>> > >> > >
>> > >> > > But I think your question was about the source "sensors-input"
>> topic,
>> > >> not
>> > >> > > the changelog topic. I looked at the logs from two runs, and it
>> seems
>> > >> > > locally your sensors-input has one partition, but on the cloud
>> your
>> > >> > > sensors-input has two partitions. Is that right?
>> > >> > >
>> > >> > >
>> > >> > > Guozhang
>> > >> > >
>> > >> > >
>> > >> > > On Thu, Jun 6, 2019 at 10:23 AM Alessandro Tagliapietra <
>> > >> > > tagliapietra.alessan...@gmail.com> wrote:
>> > >> > >
>> > >> > > > Isn't the windowing state stored in the additional state store
>> > >> topics
>> > >> > > that
>> > >> > > > I had to additionally create?
>> > >> > > >
>> > >> > > > Like these I have here:
>> > >> > > >
>> > >> > > >
>> sensors-pipeline-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog
>> > >> > > >
>> sensors-pipeline-KTABLE-SUPPRESS-STATE-STORE-0000000004-changelog
>> > >> > > >
>> > >> > > > Thank you
>> > >> > > >
>> > >> > > > --
>> > >> > > > Alessandro Tagliapietra
>> > >> > > >
>> > >> > > >
>> > >> > > > On Thu, Jun 6, 2019 at 10:13 AM Guozhang Wang <
>> wangg...@gmail.com>
>> > >> > > wrote:
>> > >> > > >
>> > >> > > > > If you deploy your streams app into a docker container, you'd
>> > >> need to
>> > >> > > > make
>> > >> > > > > sure local state directories are preserved, since otherwise
>> > >> whenever
>> > >> > > you
>> > >> > > > > restart all the state would be lost and Streams then has to
>> > >> bootstrap
>> > >> > > > from
>> > >> > > > > scratch. E.g. if you are using K8s for cluster management,
>> you'd
>> > >> > better
>> > >> > > > use
>> > >> > > > > stateful sets to make sure local states are preserves across
>> > >> > > > re-deployment.
>> > >> > > > >
>> > >> > > > >
>> > >> > > > > Guozhang
>> > >> > > > >
>> > >> > > > > On Wed, Jun 5, 2019 at 4:52 PM Alessandro Tagliapietra <
>> > >> > > > > tagliapietra.alessan...@gmail.com> wrote:
>> > >> > > > >
>> > >> > > > > > Hi Guozhang,
>> > >> > > > > >
>> > >> > > > > > sorry, by "app" i mean the stream processor app, the one
>> shown
>> > >> in
>> > >> > > > > > pipeline.kt.
>> > >> > > > > >
>> > >> > > > > > The app reads a topic of data sent by a sensor each second
>> and
>> > >> > > > generates
>> > >> > > > > a
>> > >> > > > > > 20 second window output to another topic.
>> > >> > > > > > My "problem" is that when running locally with my local
>> kafka
>> > >> > setup,
>> > >> > > > > let's
>> > >> > > > > > say I stop it and start it again, it continues processing
>> the
>> > >> last
>> > >> > > > > window.
>> > >> > > > > > When deploying the app into a docker container and using
>> the
>> > >> > > confluent
>> > >> > > > > > cloud as broker, every time I restart the app it starts
>> > >> processing
>> > >> > > > again
>> > >> > > > > > from the beginning of the input topic and generates again
>> old
>> > >> > windows
>> > >> > > > it
>> > >> > > > > > already processed.
>> > >> > > > > >
>> > >> > > > > > In the meantime I'm trying to upgrade to kafka 2.2.1 to
>> see if I
>> > >> > get
>> > >> > > > any
>> > >> > > > > > improvement.
>> > >> > > > > >
>> > >> > > > > > --
>> > >> > > > > > Alessandro Tagliapietra
>> > >> > > > > >
>> > >> > > > > >
>> > >> > > > > > On Wed, Jun 5, 2019 at 4:45 PM Guozhang Wang <
>> > >> wangg...@gmail.com>
>> > >> > > > wrote:
>> > >> > > > > >
>> > >> > > > > > > Hello Alessandro,
>> > >> > > > > > >
>> > >> > > > > > > What did you do for `restarting the app online`? I'm not
>> sure
>> > >> I
>> > >> > > > follow
>> > >> > > > > > the
>> > >> > > > > > > difference between "restart the streams app" and
>> "restart the
>> > >> app
>> > >> > > > > online"
>> > >> > > > > > > from your description.
>> > >> > > > > > >
>> > >> > > > > > >
>> > >> > > > > > > Guozhang
>> > >> > > > > > >
>> > >> > > > > > >
>> > >> > > > > > > On Wed, Jun 5, 2019 at 10:42 AM Alessandro Tagliapietra <
>> > >> > > > > > > tagliapietra.alessan...@gmail.com> wrote:
>> > >> > > > > > > >
>> > >> > > > > > > > Hello everyone,
>> > >> > > > > > > >
>> > >> > > > > > > > I've a small streams app, the configuration and part
>> of the
>> > >> > code
>> > >> > > > I'm
>> > >> > > > > > > using
>> > >> > > > > > > > can be found here
>> > >> > > > > > > >
>> > >> > https://gist.github.com/alex88/6b7b31c2b008817a24f63246557099bc
>> > >> > > > > > > > There's also the log when the app is started locally
>> and
>> > >> when
>> > >> > the
>> > >> > > > app
>> > >> > > > > > is
>> > >> > > > > > > > started on our servers connecting to the confluent
>> cloud
>> > >> kafka
>> > >> > > > > broker.
>> > >> > > > > > > >
>> > >> > > > > > > > The problem is that locally everything is working
>> properly,
>> > >> if
>> > >> > I
>> > >> > > > > > restart
>> > >> > > > > > > > the streams app it just continues where it left, if I
>> > >> restart
>> > >> > the
>> > >> > > > app
>> > >> > > > > > > > online it reprocesses the whole topic.
>> > >> > > > > > > >
>> > >> > > > > > > > That shouldn't happen right?
>> > >> > > > > > > >
>> > >> > > > > > > > Thanks in advance
>> > >> > > > > > > >
>> > >> > > > > > > > --
>> > >> > > > > > > > Alessandro Tagliapietra
>> > >> > > > > > >
>> > >> > > > > > >
>> > >> > > > > > >
>> > >> > > > > > > --
>> > >> > > > > > > -- Guozhang
>> > >> > > > > > >
>> > >> > > > > >
>> > >> > > > >
>> > >> > > > >
>> > >> > > > > --
>> > >> > > > > -- Guozhang
>> > >> > > > >
>> > >> > > >
>> > >> > >
>> > >> > >
>> > >> > > --
>> > >> > > -- Guozhang
>> > >> > >
>> > >> >
>> > >>
>> > >>
>> > >> --
>> > >> -- Guozhang
>> > >>
>> > >
>>
>

Reply via email to