I've just noticed that the store topic created automatically by our streams app have different cleanup.policy. I think that's the main reason I'm seeing that big read/write IO, having a compact policy instead of delete would make the topic much smaller. I'll try that to also see the impact on our storage usage.
-- Alessandro Tagliapietra On Tue, Jul 9, 2019 at 6:06 AM Alessandro Tagliapietra < tagliapietra.alessan...@gmail.com> wrote: > Hi Bruno, > > Oh I see, I'll try to add a persistent disk where the local stores are. > I've other questions then: > - why is it also writing that much? > - how do I specify the retention period of the data? Just by setting the > max retention time for the changelog topic? > - wouldn't be possible, for example for my LastValueStore to compact the > changelog and keep only the last value stored for each key? Because that's > all I would need for my use case > > Thank you very much for your help > > On Tue, Jul 9, 2019, 4:00 AM Bruno Cadonna <br...@confluent.io> wrote: > >> Hi Alessandro, >> >> I am not sure I understand your issue completely. If you start your >> streams app in a new container without any existing local state, then >> it is expected that the changelog topics are read from the beginning >> to restore the local state stores. Am I misunderstanding you? >> >> Best, >> Bruno >> >> On Tue, Jul 9, 2019 at 6:42 AM Alessandro Tagliapietra >> <tagliapietra.alessan...@gmail.com> wrote: >> > >> > I think I'm having again this issue, this time though it only happens on >> > some state stores. >> > >> > Here you can find the code and the logs >> > https://gist.github.com/alex88/1d60f63f0eee9f1568d89d5e1900fffc >> > We first seen that our confluent cloud bill went up 10x, then seen that >> our >> > streams processor was restarted 12 times (kubernetes pod), checking >> > confluent cloud usage it seems that the writes/reads went up from the >> usual >> > 1-2 KB/s to 12-20 MB/s during app restarts. >> > >> > I've then deployed a new version on a new container (no local >> store/state) >> > to see what happened: >> > - first, it logs everything up to line 460 of the log file in the gist >> > - at this point confluent cloud reports high read usage and the >> consumer >> > lag starts to increase, the app is accumulating messages behind >> > - after a certain point, writes go up as well >> > - when streams app transition to RUNNING state, the final aggregation >> > function resumes back to where it stopped (without reprocessing old >> data) >> > - consumer lag goes back to 0 >> > >> > What makes me think it's re-reading everything are these lines: >> > >> > Resetting offset for partition >> > myapp-id-KSTREAM-AGGREGATE-STATE-STORE-0000000004-changelog-2 to offset >> > 20202847 >> > Restoring task 0_2's state store >> KSTREAM-AGGREGATE-STATE-STORE-0000000004 >> > from beginning of the changelog >> > myapp-id-KSTREAM-AGGREGATE-STATE-STORE-0000000004-changelog-2 >> > >> > At first I thought it's because I don't persist the aggregate store >> > changelog as I do with the "LastValueStore" store which has >> > "withLoggingEnabled()", but even that store has: >> > >> > Resetting offset for partition myapp-id-LastValueStore-changelog-0 to >> > offset 403910 >> > Restoring task 0_0's state store LastValueStore from beginning of the >> > changelog myapp-id-LastValueStore-changelog-0 >> > >> > Thank you everyone in advance >> > >> > -- >> > Alessandro Tagliapietra >> > >> > On Thu, Jun 6, 2019 at 11:37 AM Alessandro Tagliapietra < >> > tagliapietra.alessan...@gmail.com> wrote: >> > >> > > I'm not sure, one thing I know for sure is that on the cloud control >> > > panel, in the consumer lag page, the offset didn't reset on the input >> > > topic, so it was probably something after that. >> > > >> > > Anyway, thanks a lot for helping, if we experience that again I'll >> try to >> > > add more verbose logging to better understand what's going on. >> > > >> > > Have a great day >> > > >> > > -- >> > > Alessandro Tagliapietra >> > > >> > > >> > > On Thu, Jun 6, 2019 at 11:27 AM Guozhang Wang <wangg...@gmail.com> >> wrote: >> > > >> > >> Honestly I cannot think of an issue that fixed in 2.2.1 but not in >> 2.2.0 >> > >> which could be correlated to your observations: >> > >> >> > >> >> > >> >> https://issues.apache.org/jira/issues/?filter=-1&jql=project%20%3D%20KAFKA%20AND%20resolution%20%3D%20Fixed%20AND%20fixVersion%20%3D%202.2.1%20AND%20component%20%3D%20streams%20order%20by%20updated%20DESC >> > >> >> > >> If you observed that on the cloud, both partitions of the source >> topic >> > >> gets >> > >> re-processed from the beginning, then it means the committed offsets >> were >> > >> somehow lost, and hence has to start reading the source topic from >> > >> scratch. >> > >> If this is a re-producible issue maybe there are some lurking things >> in >> > >> 2.2.0. >> > >> >> > >> On Thu, Jun 6, 2019 at 11:10 AM Alessandro Tagliapietra < >> > >> tagliapietra.alessan...@gmail.com> wrote: >> > >> >> > >> > Yes that's right, >> > >> > >> > >> > could that be the problem? Anyway, so far after upgrading to 2.2.1 >> from >> > >> > 2.2.0 we didn't experience that problem anymore. >> > >> > >> > >> > Regards >> > >> > >> > >> > -- >> > >> > Alessandro Tagliapietra >> > >> > >> > >> > >> > >> > On Thu, Jun 6, 2019 at 10:50 AM Guozhang Wang <wangg...@gmail.com> >> > >> wrote: >> > >> > >> > >> > > That's right, but local state is used as a "materialized view" >> of your >> > >> > > changelog topics: if you have nothing locally, then it has to >> > >> bootstrap >> > >> > > from the beginning of your changelog topic. >> > >> > > >> > >> > > But I think your question was about the source "sensors-input" >> topic, >> > >> not >> > >> > > the changelog topic. I looked at the logs from two runs, and it >> seems >> > >> > > locally your sensors-input has one partition, but on the cloud >> your >> > >> > > sensors-input has two partitions. Is that right? >> > >> > > >> > >> > > >> > >> > > Guozhang >> > >> > > >> > >> > > >> > >> > > On Thu, Jun 6, 2019 at 10:23 AM Alessandro Tagliapietra < >> > >> > > tagliapietra.alessan...@gmail.com> wrote: >> > >> > > >> > >> > > > Isn't the windowing state stored in the additional state store >> > >> topics >> > >> > > that >> > >> > > > I had to additionally create? >> > >> > > > >> > >> > > > Like these I have here: >> > >> > > > >> > >> > > > >> sensors-pipeline-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog >> > >> > > > >> sensors-pipeline-KTABLE-SUPPRESS-STATE-STORE-0000000004-changelog >> > >> > > > >> > >> > > > Thank you >> > >> > > > >> > >> > > > -- >> > >> > > > Alessandro Tagliapietra >> > >> > > > >> > >> > > > >> > >> > > > On Thu, Jun 6, 2019 at 10:13 AM Guozhang Wang < >> wangg...@gmail.com> >> > >> > > wrote: >> > >> > > > >> > >> > > > > If you deploy your streams app into a docker container, you'd >> > >> need to >> > >> > > > make >> > >> > > > > sure local state directories are preserved, since otherwise >> > >> whenever >> > >> > > you >> > >> > > > > restart all the state would be lost and Streams then has to >> > >> bootstrap >> > >> > > > from >> > >> > > > > scratch. E.g. if you are using K8s for cluster management, >> you'd >> > >> > better >> > >> > > > use >> > >> > > > > stateful sets to make sure local states are preserves across >> > >> > > > re-deployment. >> > >> > > > > >> > >> > > > > >> > >> > > > > Guozhang >> > >> > > > > >> > >> > > > > On Wed, Jun 5, 2019 at 4:52 PM Alessandro Tagliapietra < >> > >> > > > > tagliapietra.alessan...@gmail.com> wrote: >> > >> > > > > >> > >> > > > > > Hi Guozhang, >> > >> > > > > > >> > >> > > > > > sorry, by "app" i mean the stream processor app, the one >> shown >> > >> in >> > >> > > > > > pipeline.kt. >> > >> > > > > > >> > >> > > > > > The app reads a topic of data sent by a sensor each second >> and >> > >> > > > generates >> > >> > > > > a >> > >> > > > > > 20 second window output to another topic. >> > >> > > > > > My "problem" is that when running locally with my local >> kafka >> > >> > setup, >> > >> > > > > let's >> > >> > > > > > say I stop it and start it again, it continues processing >> the >> > >> last >> > >> > > > > window. >> > >> > > > > > When deploying the app into a docker container and using >> the >> > >> > > confluent >> > >> > > > > > cloud as broker, every time I restart the app it starts >> > >> processing >> > >> > > > again >> > >> > > > > > from the beginning of the input topic and generates again >> old >> > >> > windows >> > >> > > > it >> > >> > > > > > already processed. >> > >> > > > > > >> > >> > > > > > In the meantime I'm trying to upgrade to kafka 2.2.1 to >> see if I >> > >> > get >> > >> > > > any >> > >> > > > > > improvement. >> > >> > > > > > >> > >> > > > > > -- >> > >> > > > > > Alessandro Tagliapietra >> > >> > > > > > >> > >> > > > > > >> > >> > > > > > On Wed, Jun 5, 2019 at 4:45 PM Guozhang Wang < >> > >> wangg...@gmail.com> >> > >> > > > wrote: >> > >> > > > > > >> > >> > > > > > > Hello Alessandro, >> > >> > > > > > > >> > >> > > > > > > What did you do for `restarting the app online`? I'm not >> sure >> > >> I >> > >> > > > follow >> > >> > > > > > the >> > >> > > > > > > difference between "restart the streams app" and >> "restart the >> > >> app >> > >> > > > > online" >> > >> > > > > > > from your description. >> > >> > > > > > > >> > >> > > > > > > >> > >> > > > > > > Guozhang >> > >> > > > > > > >> > >> > > > > > > >> > >> > > > > > > On Wed, Jun 5, 2019 at 10:42 AM Alessandro Tagliapietra < >> > >> > > > > > > tagliapietra.alessan...@gmail.com> wrote: >> > >> > > > > > > > >> > >> > > > > > > > Hello everyone, >> > >> > > > > > > > >> > >> > > > > > > > I've a small streams app, the configuration and part >> of the >> > >> > code >> > >> > > > I'm >> > >> > > > > > > using >> > >> > > > > > > > can be found here >> > >> > > > > > > > >> > >> > https://gist.github.com/alex88/6b7b31c2b008817a24f63246557099bc >> > >> > > > > > > > There's also the log when the app is started locally >> and >> > >> when >> > >> > the >> > >> > > > app >> > >> > > > > > is >> > >> > > > > > > > started on our servers connecting to the confluent >> cloud >> > >> kafka >> > >> > > > > broker. >> > >> > > > > > > > >> > >> > > > > > > > The problem is that locally everything is working >> properly, >> > >> if >> > >> > I >> > >> > > > > > restart >> > >> > > > > > > > the streams app it just continues where it left, if I >> > >> restart >> > >> > the >> > >> > > > app >> > >> > > > > > > > online it reprocesses the whole topic. >> > >> > > > > > > > >> > >> > > > > > > > That shouldn't happen right? >> > >> > > > > > > > >> > >> > > > > > > > Thanks in advance >> > >> > > > > > > > >> > >> > > > > > > > -- >> > >> > > > > > > > Alessandro Tagliapietra >> > >> > > > > > > >> > >> > > > > > > >> > >> > > > > > > >> > >> > > > > > > -- >> > >> > > > > > > -- Guozhang >> > >> > > > > > > >> > >> > > > > > >> > >> > > > > >> > >> > > > > >> > >> > > > > -- >> > >> > > > > -- Guozhang >> > >> > > > > >> > >> > > > >> > >> > > >> > >> > > >> > >> > > -- >> > >> > > -- Guozhang >> > >> > > >> > >> > >> > >> >> > >> >> > >> -- >> > >> -- Guozhang >> > >> >> > > >> >