Just an update since it has been happening again now and I have some more metrics to show, the topology is this:
Topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-0000000000 (topics: [sensors]) --> KSTREAM-TRANSFORMVALUES-0000000001 Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [new-data-store]) --> KSTREAM-FLATMAPVALUES-0000000002 <-- KSTREAM-SOURCE-0000000000 Processor: KSTREAM-FLATMAPVALUES-0000000002 (stores: []) --> KSTREAM-TRANSFORMVALUES-0000000003 <-- KSTREAM-TRANSFORMVALUES-0000000001 Processor: KSTREAM-TRANSFORMVALUES-0000000003 (stores: [LastValueStore]) --> KSTREAM-FILTER-0000000004 <-- KSTREAM-FLATMAPVALUES-0000000002 Processor: KSTREAM-FILTER-0000000004 (stores: []) --> KSTREAM-AGGREGATE-0000000005 <-- KSTREAM-TRANSFORMVALUES-0000000003 Processor: KSTREAM-AGGREGATE-0000000005 (stores: [aggregate-store]) --> KTABLE-TOSTREAM-0000000006 <-- KSTREAM-FILTER-0000000004 Processor: KTABLE-TOSTREAM-0000000006 (stores: []) --> KSTREAM-TRANSFORM-0000000007 <-- KSTREAM-AGGREGATE-0000000005 Processor: KSTREAM-TRANSFORM-0000000007 (stores: [suppress-store]) --> KSTREAM-MAP-0000000008 <-- KTABLE-TOSTREAM-0000000006 Processor: KSTREAM-MAP-0000000008 (stores: []) --> KSTREAM-PRINTER-0000000009, KSTREAM-SINK-0000000010 <-- KSTREAM-TRANSFORM-0000000007 Processor: KSTREAM-PRINTER-0000000009 (stores: []) --> none <-- KSTREAM-MAP-0000000008 Sink: KSTREAM-SINK-0000000010 (topic: sensors-output) <-- KSTREAM-MAP-0000000008 - https://imgur.com/R3Pqypo this shows that the input source topic has the same rate of messages - https://imgur.com/BTwq09p this is the number of records processed by each processor node, at first there are 3 processor nodes kstream-transformvalues-3, kstream-filter-4, kstream-aggregate-5 processing 4/5k records/min, then ktable-tostream-6 and kstream-transform-7 rump up and the previous ones slow down due the higher load - https://imgur.com/5eXpf8l the state stores cache rate starts to decrease - https://imgur.com/dwFOb2g put and fetch operations of the window store almost remain the same (maybe lowers due higher load) - https://imgur.com/1XZmMW5 commit latency increases - https://imgur.com/UdBpOVU commit rate stays almost the same - https://imgur.com/UJ3JB4f process latency increases - https://imgur.com/55YVmy2 process rate stays the same - https://imgur.com/GMJ3eGV sent records increase because of aggregate and suppress store changelog records - https://imgur.com/XDm2kX6 sent bytes for those changelog topics increase (full album https://imgur.com/a/tXlJJEO) Any other metric that might be important? It seems that the issue is between the aggregate and Ktable.toStream() After a restart as expected usage go back to normal values -- Alessandro Tagliapietra On Mon, Dec 9, 2019 at 7:22 PM Alessandro Tagliapietra < tagliapietra.alessan...@gmail.com> wrote: > You're saying that with a 100ms commit interval, caching won't help > because it would still send the compacted changes to the changelog every > 100ms? > > Regarding the custom state store I'll look into that because I didn't go > much further than transformers and stores in my kafka experience so I'll > need to understand better what that implies. > > Yeah I only have one window per key in the store. > > The only thing I don't understand is why cache works 80% of the time and > then suddenly the changelog sent bytes increase 90x. > I mean, if cache wasn't working, why enabling it in our pipeline decreased > the sent bytes from 30-40MB/minute to 400KB/minute? > > I'll look into the custom state store tho. > > Thanks > > -- > Alessandro Tagliapietra > > > > On Mon, Dec 9, 2019 at 7:02 PM Sophie Blee-Goldman <sop...@confluent.io> > wrote: > >> Alright, well I see why you have so much data being sent to the changelog >> if each >> update involves appending to a list and then writing in the whole list. >> And >> with 340 >> records/minute I'm actually not sure how the cache could really help at >> all >> when it's >> being flushed every 100ms. >> >> Here's kind of a wild idea, if you really only need append semantics: what >> if you wrote >> a custom StateStore that wrapped the normal RocksDBStore (or >> RocksDBWindowStore) >> and did the append for you under the hood? The changelogging layer sits >> between the >> layer that you would call #put on in your transformer and the final layer >> that actually writes >> to the underlying storage engine. If you insert an extra layer and modify >> your transformer >> to only call put on the new data (rather than the entire list) then only >> this new data will get >> sent to the changelog. Your custom storage layer will know it's actually >> append semantics, >> and add the new data to the existing list before sending it on to RocksDB. >> >> Since you only ever have one window per key in the store (right?) you just >> need to make >> sure that nothing from the current window gets deleted prematurely. You'd >> want to turn off >> compaction on the changelog and caching on the store of course, and maybe >> give the >> changelog some extra retention time to be safe. >> >> Obviously I haven't thoroughly verified this alternative, but it seems >> like >> this approach (or >> something to its effect) could help you cut down on the changelog data. >> WDYT? >> >> On Mon, Dec 9, 2019 at 4:35 PM Alessandro Tagliapietra < >> tagliapietra.alessan...@gmail.com> wrote: >> >> > Hi Sophie, >> > >> > Just to give a better context, yes we use EOS and the problem happens in >> > our aggregation store. >> > Basically when windowing data we append each record into a list that's >> > stored in the aggregation store. >> > We have 2 versions, in production we use the kafka streams windowing >> API, >> > in staging we manually calculate the window end timestamp and aggregate >> > using that timestamp. >> > >> > To give you an example of the staging code, it's a simple transformer >> that: >> > - if incoming data fits in the same window as the data in store, append >> > the data to the existing store list overwriting the same key and >> nothing is >> > sent downstream >> > - if incoming data has a timestamp smaller than the existing store >> data, >> > discard the record >> > - if incoming data has a timestamp bigger than the existing store data, >> > send the stored list downstream and store the new window data into the >> > store >> > >> > This way we don't use multiple keys (kafka streams instead uses a store >> > where each key is stream-key + window key) as we overwrite the store >> data >> > using the same key over and over. >> > So what I would expect is that since we're overwriting the same keys >> there >> > isn't more and more data to be cached as the number of keys are always >> the >> > same and we don't really need to cache more data over time. >> > >> > To respond to your questions: >> > - yes when I say that cache "stopped/started" working I mean that at >> some >> > point the store started sending more and more data to che changelog >> topic >> > and then suddenly stopped again even without a restart (a restart always >> > fixes the problem). >> > - Yes there are no density changes in the input stream, I've checked >> the >> > number of records sent to the stream input topic and there is a >> variation >> > of ~10-20 records per minute on an average of 340 records per minute. >> Most >> > of the records are also generated by simulators with very predictable >> > output rate. >> > >> > In the meantime I've enabled reporting of debug metrics (so including >> cache >> > hit ratio) to hopefully get better insights the next time it happens. >> > >> > Thank you in advance >> > >> > -- >> > Alessandro Tagliapietra >> > >> > On Mon, Dec 9, 2019 at 3:57 PM Sophie Blee-Goldman <sop...@confluent.io >> > >> > wrote: >> > >> > > It's an LRU cache, so once it gets full new records will cause older >> ones >> > > to be evicted (and thus sent >> > > downstream). Of course this should only apply to records of a >> different >> > > key, otherwise it will just cause >> > > an update of that key in the cache. >> > > >> > > I missed that you were using EOS, given the short commit interval it's >> > hard >> > > to see those effects. >> > > When you say that it stopped working and then appeared to start >> working >> > > again, are you just >> > > referring to the amount of data being sent to the changelog? And you >> can >> > > definitely rule out differences >> > > in the density of updates in the input stream? >> > > >> > > >> > > >> > > On Mon, Dec 9, 2019 at 12:26 PM Alessandro Tagliapietra < >> > > tagliapietra.alessan...@gmail.com> wrote: >> > > >> > > > Hi Sophie, >> > > > >> > > > thanks fo helping. >> > > > >> > > > By eviction of older records you mean they get flushed to the >> changelog >> > > > topic? >> > > > Or the cache is just full and so all new records go to the changelog >> > > topic >> > > > until the old ones are evicted? >> > > > >> > > > Regarding the timing, what timing do you mean? Between when the >> cache >> > > stops >> > > > and starts working again? We're using EOS os I believe the commit >> > > interval >> > > > is every 100ms. >> > > > >> > > > Regards >> > > > >> > > > -- >> > > > Alessandro Tagliapietra >> > > > >> > > > >> > > > >> > > > On Mon, Dec 9, 2019 at 12:15 PM Sophie Blee-Goldman < >> > sop...@confluent.io >> > > > >> > > > wrote: >> > > > >> > > > > It might be that the cache appears to "stop working" because it >> gets >> > > > full, >> > > > > and each >> > > > > new update causes an eviction (of some older record). This would >> also >> > > > > explain the >> > > > > opposite behavior, that it "starts working" again after some time >> > > without >> > > > > being restarted, >> > > > > since the cache is completely flushed on commit. Does the timing >> seem >> > > to >> > > > > align with your >> > > > > commit interval (default is 30s)? >> > > > > >> > > > > On Mon, Dec 9, 2019 at 12:03 AM Alessandro Tagliapietra < >> > > > > tagliapietra.alessan...@gmail.com> wrote: >> > > > > >> > > > > > And it seems that for some reason after a while caching works >> again >> > > > > > without a restart of the streams application >> > > > > > >> > > > > > [image: Screen Shot 2019-12-08 at 11.59.30 PM.png] >> > > > > > >> > > > > > I'll try to enable debug metrics and see if I can find something >> > > useful >> > > > > > there. >> > > > > > Any idea is appreciated in the meantime :) >> > > > > > >> > > > > > -- >> > > > > > Alessandro Tagliapietra >> > > > > > >> > > > > > On Sun, Dec 8, 2019 at 12:54 PM Alessandro Tagliapietra < >> > > > > > tagliapietra.alessan...@gmail.com> wrote: >> > > > > > >> > > > > >> It seems that even with caching enabled, after a while the sent >> > > bytes >> > > > > >> stil go up >> > > > > >> >> > > > > >> [image: Screen Shot 2019-12-08 at 12.52.31 PM.png] >> > > > > >> >> > > > > >> you can see the deploy when I've enabled caching but it looks >> like >> > > > it's >> > > > > >> still a temporary solution. >> > > > > >> >> > > > > >> -- >> > > > > >> Alessandro Tagliapietra >> > > > > >> >> > > > > >> >> > > > > >> On Sat, Dec 7, 2019 at 10:08 AM Alessandro Tagliapietra < >> > > > > >> tagliapietra.alessan...@gmail.com> wrote: >> > > > > >> >> > > > > >>> Could be, but since we have a limite amount of input keys >> (~30), >> > > > > >>> windowing generates new keys but old ones are never touched >> again >> > > > > since the >> > > > > >>> data per key is in order, I assume it shouldn't be a big deal >> for >> > > it >> > > > to >> > > > > >>> handle 30 keys >> > > > > >>> I'll have a look at cache metrics and see if something pops >> out >> > > > > >>> >> > > > > >>> Thanks >> > > > > >>> >> > > > > >>> -- >> > > > > >>> Alessandro Tagliapietra >> > > > > >>> >> > > > > >>> >> > > > > >>> On Sat, Dec 7, 2019 at 10:02 AM John Roesler < >> > vvcep...@apache.org> >> > > > > >>> wrote: >> > > > > >>> >> > > > > >>>> Hmm, that’s a good question. Now that we’re talking about >> > > caching, I >> > > > > >>>> wonder if the cache was just too small. It’s not very big by >> > > > default. >> > > > > >>>> >> > > > > >>>> On Sat, Dec 7, 2019, at 11:16, Alessandro Tagliapietra wrote: >> > > > > >>>> > Ok I'll check on that! >> > > > > >>>> > >> > > > > >>>> > Now I can see that with caching we went from 3-4MB/s to >> > 400KB/s, >> > > > > that >> > > > > >>>> will >> > > > > >>>> > help with the bill. >> > > > > >>>> > >> > > > > >>>> > Last question, any reason why after a while the regular >> > windowed >> > > > > >>>> stream >> > > > > >>>> > starts sending every update instead of caching? >> > > > > >>>> > Could it be because it doesn't have any more memory >> available? >> > > Any >> > > > > >>>> other >> > > > > >>>> > possible reason? >> > > > > >>>> > >> > > > > >>>> > Thank you so much for your help >> > > > > >>>> > >> > > > > >>>> > -- >> > > > > >>>> > Alessandro Tagliapietra >> > > > > >>>> > >> > > > > >>>> > >> > > > > >>>> > On Sat, Dec 7, 2019 at 9:14 AM John Roesler < >> > > vvcep...@apache.org> >> > > > > >>>> wrote: >> > > > > >>>> > >> > > > > >>>> > > Ah, yes. Glad you figured it out! >> > > > > >>>> > > >> > > > > >>>> > > Caching does not reduce EOS guarantees at all. I highly >> > > > recommend >> > > > > >>>> using >> > > > > >>>> > > it. You might even want to take a look at the caching >> > metrics >> > > to >> > > > > >>>> make sure >> > > > > >>>> > > you have a good hit ratio. >> > > > > >>>> > > >> > > > > >>>> > > -John >> > > > > >>>> > > >> > > > > >>>> > > On Sat, Dec 7, 2019, at 10:51, Alessandro Tagliapietra >> > wrote: >> > > > > >>>> > > > Never mind I've found out I can use >> `.withCachingEnabled` >> > on >> > > > the >> > > > > >>>> store >> > > > > >>>> > > > builder to achieve the same thing as the windowing >> example >> > > as >> > > > > >>>> > > > `Materialized.as` turns that on by default. >> > > > > >>>> > > > >> > > > > >>>> > > > Does caching in any way reduces the EOS guarantees? >> > > > > >>>> > > > >> > > > > >>>> > > > -- >> > > > > >>>> > > > Alessandro Tagliapietra >> > > > > >>>> > > > >> > > > > >>>> > > > >> > > > > >>>> > > > On Sat, Dec 7, 2019 at 1:12 AM Alessandro Tagliapietra >> < >> > > > > >>>> > > > tagliapietra.alessan...@gmail.com> wrote: >> > > > > >>>> > > > >> > > > > >>>> > > > > Seems my journey with this isn't done just yet, >> > > > > >>>> > > > > >> > > > > >>>> > > > > This seems very complicated to me but I'll try to >> > explain >> > > it >> > > > > as >> > > > > >>>> best I >> > > > > >>>> > > can. >> > > > > >>>> > > > > To better understand the streams network usage I've >> used >> > > > > >>>> prometheus >> > > > > >>>> > > with >> > > > > >>>> > > > > the JMX exporter to export kafka metrics. >> > > > > >>>> > > > > To check the amount of data we use I'm looking at the >> > > > > increments >> > > > > >>>> > > > > of kafka_producer_topic_metrics_byte_total and >> > > > > >>>> > > > > >> kafka_producer_producer_topic_metrics_record_send_total, >> > > > > >>>> > > > > >> > > > > >>>> > > > > Our current (before the change mentioned above) code >> > looks >> > > > > like >> > > > > >>>> this: >> > > > > >>>> > > > > >> > > > > >>>> > > > > // This transformers just pairs a value with the >> > previous >> > > > one >> > > > > >>>> storing >> > > > > >>>> > > the >> > > > > >>>> > > > > temporary one in a store >> > > > > >>>> > > > > val pairsStream = metricStream >> > > > > >>>> > > > > .transformValues(ValueTransformerWithKeySupplier { >> > > > > >>>> PairTransformer() >> > > > > >>>> > > }, >> > > > > >>>> > > > > "LastValueStore") >> > > > > >>>> > > > > .filter { _, value: MetricSequence? -> value != >> null } >> > > > > >>>> > > > > >> > > > > >>>> > > > > // Create a store to store suppressed windows until a >> > new >> > > > one >> > > > > is >> > > > > >>>> > > received >> > > > > >>>> > > > > val suppressStoreSupplier = >> > > > > >>>> > > > > >> > > > > >>>> > > >> > > > > >>>> >> > > > > >> > > > >> > > >> > >> Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("suppress-store"), >> > > > > >>>> > > > > ...... >> > > > > >>>> > > > > >> > > > > >>>> > > > > // Window and aggregate data in 1 minute intervals >> > > > > >>>> > > > > val aggregatedStream = pairsStream >> > > > > >>>> > > > > .groupByKey() >> > > > > >>>> > > > > >> > > > > .windowedBy<TimeWindow>(TimeWindows.of(Duration.ofMinutes(1))) >> > > > > >>>> > > > > .aggregate( >> > > > > >>>> > > > > { MetricSequenceList(ArrayList()) }, >> > > > > >>>> > > > > { key, value, aggregate -> >> > > > > >>>> > > > > aggregate.getRecords().add(value) >> > > > > >>>> > > > > aggregate >> > > > > >>>> > > > > }, >> > > > > >>>> > > > > Materialized.`as`<String, >> MetricSequenceList, >> > > > > >>>> > > WindowStore<Bytes, >> > > > > >>>> > > > > >> > > > > >>>> > > >> > > > > >>>> >> > > > > >> > > > >> > > >> > >> ByteArray>>("aggregate-store").withKeySerde(Serdes.String()).withValueSerde(Settings.getValueSpecificavroSerde()) >> > > > > >>>> > > > > ) >> > > > > >>>> > > > > .toStream() >> > > > > >>>> > > > > .flatTransform(TransformerSupplier { >> > > > > >>>> > > > > // This transformer basically waits until a new >> > > window >> > > > > is >> > > > > >>>> > > received >> > > > > >>>> > > > > to emit the previous one >> > > > > >>>> > > > > }, "suppress-store") >> > > > > >>>> > > > > .map { sensorId: String, suppressedOutput: >> > > > SuppressedOutput >> > > > > -> >> > > > > >>>> > > > > .... etc .... >> > > > > >>>> > > > > >> > > > > >>>> > > > > >> > > > > >>>> > > > > Basically: >> > > > > >>>> > > > > - all data goes through LastValueStore store that >> > stores >> > > > each >> > > > > >>>> message >> > > > > >>>> > > and >> > > > > >>>> > > > > emits a pair with the previous one >> > > > > >>>> > > > > - the aggregate-store is used to store the >> per-window >> > > list >> > > > of >> > > > > >>>> > > messages in >> > > > > >>>> > > > > the aggregate method >> > > > > >>>> > > > > - the suppress store is used to store each received >> > > window >> > > > > >>>> which is >> > > > > >>>> > > > > emitted only after a newer one is received >> > > > > >>>> > > > > >> > > > > >>>> > > > > What I'm experiencing is that: >> > > > > >>>> > > > > - during normal execution, the streams app sends to >> the >> > > > > >>>> lastvalue >> > > > > >>>> > > store >> > > > > >>>> > > > > changelog topic 5k messages/min, the aggregate and >> > > suppress >> > > > > >>>> store >> > > > > >>>> > > changelog >> > > > > >>>> > > > > topics only about 100 >> > > > > >>>> > > > > - at some point (after many hours of operation), the >> > > > streams >> > > > > >>>> app >> > > > > >>>> > > starts >> > > > > >>>> > > > > sending to the aggregate and suppress store changelog >> > > topic >> > > > > the >> > > > > >>>> same >> > > > > >>>> > > amount >> > > > > >>>> > > > > of messages going through the lastvaluestore >> > > > > >>>> > > > > - if I restart the streams app it goes back to the >> > > initial >> > > > > >>>> behavior >> > > > > >>>> > > > > >> > > > > >>>> > > > > You can see the behavior in this graph >> > > > > >>>> https://imgur.com/dJcUNSf >> > > > > >>>> > > > > You can also see that after a restart everything goes >> > back >> > > > to >> > > > > >>>> normal >> > > > > >>>> > > > > levels. >> > > > > >>>> > > > > Regarding other metrics, process latency increases, >> poll >> > > > > latency >> > > > > >>>> > > > > decreases, poll rate decreases, commit rate stays the >> > same >> > > > > >>>> while commit >> > > > > >>>> > > > > latency increases. >> > > > > >>>> > > > > >> > > > > >>>> > > > > Now, I've these questions: >> > > > > >>>> > > > > - why isn't the aggregate/suppress store changelog >> > topic >> > > > > >>>> throughput >> > > > > >>>> > > the >> > > > > >>>> > > > > same as the LastValueStore? Shouldn't every time it >> > > > aggregates >> > > > > >>>> send a >> > > > > >>>> > > > > record to the changelog? >> > > > > >>>> > > > > - is the windowing doing some internal caching like >> not >> > > > > >>>> sending every >> > > > > >>>> > > > > aggregation record until the window time is passed? >> (if >> > > so, >> > > > > >>>> where can I >> > > > > >>>> > > > > find that code since I would like to use that also >> for >> > our >> > > > new >> > > > > >>>> > > > > implementation) >> > > > > >>>> > > > > >> > > > > >>>> > > > > Thank you in advance >> > > > > >>>> > > > > >> > > > > >>>> > > > > -- >> > > > > >>>> > > > > Alessandro Tagliapietra >> > > > > >>>> > > > > >> > > > > >>>> > > > > >> > > > > >>>> > > > > On Wed, Dec 4, 2019 at 7:57 AM John Roesler < >> > > > > >>>> vvcep...@apache.org> >> > > > > >>>> > > wrote: >> > > > > >>>> > > > > >> > > > > >>>> > > > >> Oh, good! >> > > > > >>>> > > > >> >> > > > > >>>> > > > >> On Tue, Dec 3, 2019, at 23:29, Alessandro >> Tagliapietra >> > > > wrote: >> > > > > >>>> > > > >> > Testing on staging shows that a restart on >> exception >> > is >> > > > > much >> > > > > >>>> faster >> > > > > >>>> > > and >> > > > > >>>> > > > >> the >> > > > > >>>> > > > >> > stream starts right away which I think means we're >> > > > reading >> > > > > >>>> way less >> > > > > >>>> > > data >> > > > > >>>> > > > >> > than before! >> > > > > >>>> > > > >> > >> > > > > >>>> > > > >> > What I was referring to is that, in Streams, the >> keys >> > > for >> > > > > >>>> window >> > > > > >>>> > > > >> > > aggregation state is actually composed of both >> the >> > > > window >> > > > > >>>> itself >> > > > > >>>> > > and >> > > > > >>>> > > > >> the >> > > > > >>>> > > > >> > > key. In the DSL, it looks like "Windowed<K>". >> That >> > > > > results >> > > > > >>>> in the >> > > > > >>>> > > > >> store >> > > > > >>>> > > > >> > > having a unique key per window for each K, >> which is >> > > why >> > > > > we >> > > > > >>>> need >> > > > > >>>> > > > >> retention >> > > > > >>>> > > > >> > > as well as compaction for our changelogs. But >> for >> > > you, >> > > > if >> > > > > >>>> you just >> > > > > >>>> > > > >> make the >> > > > > >>>> > > > >> > > key "K", then compaction alone should do the >> trick. >> > > > > >>>> > > > >> > >> > > > > >>>> > > > >> > Yes we had compact,delete as cleanup policy but >> > > probably >> > > > it >> > > > > >>>> still >> > > > > >>>> > > had a >> > > > > >>>> > > > >> too >> > > > > >>>> > > > >> > long retention value, also the rocksdb store is >> > > probably >> > > > > much >> > > > > >>>> > > faster now >> > > > > >>>> > > > >> > having only one key per key instead of one key per >> > > window >> > > > > >>>> per key. >> > > > > >>>> > > > >> > >> > > > > >>>> > > > >> > Thanks a lot for helping! I'm now going to setup a >> > > > > >>>> prometheus-jmx >> > > > > >>>> > > > >> > monitoring so we can keep better track of what's >> > going >> > > on >> > > > > :) >> > > > > >>>> > > > >> > >> > > > > >>>> > > > >> > -- >> > > > > >>>> > > > >> > Alessandro Tagliapietra >> > > > > >>>> > > > >> > >> > > > > >>>> > > > >> > >> > > > > >>>> > > > >> > On Tue, Dec 3, 2019 at 9:12 PM John Roesler < >> > > > > >>>> vvcep...@apache.org> >> > > > > >>>> > > > >> wrote: >> > > > > >>>> > > > >> > >> > > > > >>>> > > > >> > > Oh, yeah, I remember that conversation! >> > > > > >>>> > > > >> > > >> > > > > >>>> > > > >> > > Yes, then, I agree, if you're only storing >> state of >> > > the >> > > > > >>>> most >> > > > > >>>> > > recent >> > > > > >>>> > > > >> window >> > > > > >>>> > > > >> > > for each key, and the key you use for that >> state is >> > > > > >>>> actually the >> > > > > >>>> > > key >> > > > > >>>> > > > >> of the >> > > > > >>>> > > > >> > > records, then an aggressive compaction policy >> plus >> > > your >> > > > > >>>> custom >> > > > > >>>> > > > >> transformer >> > > > > >>>> > > > >> > > seems like a good way forward. >> > > > > >>>> > > > >> > > >> > > > > >>>> > > > >> > > What I was referring to is that, in Streams, the >> > keys >> > > > for >> > > > > >>>> window >> > > > > >>>> > > > >> > > aggregation state is actually composed of both >> the >> > > > window >> > > > > >>>> itself >> > > > > >>>> > > and >> > > > > >>>> > > > >> the >> > > > > >>>> > > > >> > > key. In the DSL, it looks like "Windowed<K>". >> That >> > > > > results >> > > > > >>>> in the >> > > > > >>>> > > > >> store >> > > > > >>>> > > > >> > > having a unique key per window for each K, >> which is >> > > why >> > > > > we >> > > > > >>>> need >> > > > > >>>> > > > >> retention >> > > > > >>>> > > > >> > > as well as compaction for our changelogs. But >> for >> > > you, >> > > > if >> > > > > >>>> you just >> > > > > >>>> > > > >> make the >> > > > > >>>> > > > >> > > key "K", then compaction alone should do the >> trick. >> > > > > >>>> > > > >> > > >> > > > > >>>> > > > >> > > And yes, if you manage the topic yourself, then >> > > Streams >> > > > > >>>> won't >> > > > > >>>> > > adjust >> > > > > >>>> > > > >> the >> > > > > >>>> > > > >> > > retention time. I think it might validate that >> the >> > > > > >>>> retention >> > > > > >>>> > > isn't too >> > > > > >>>> > > > >> > > short, but I don't remember offhand. >> > > > > >>>> > > > >> > > >> > > > > >>>> > > > >> > > Cheers, and let me know how it goes! >> > > > > >>>> > > > >> > > -John >> > > > > >>>> > > > >> > > >> > > > > >>>> > > > >> > > On Tue, Dec 3, 2019, at 23:03, Alessandro >> > > Tagliapietra >> > > > > >>>> wrote: >> > > > > >>>> > > > >> > > > Hi John, >> > > > > >>>> > > > >> > > > >> > > > > >>>> > > > >> > > > afaik grace period uses stream time >> > > > > >>>> > > > >> > > > >> > > > > >>>> > > > >> > > >> > > > > >>>> > > > >> >> > > > > >>>> > > >> > > > > >>>> >> > > > > >> > > > >> > > >> > >> https://kafka.apache.org/21/javadoc/org/apache/kafka/streams/kstream/Windows.html >> > > > > >>>> > > > >> > > > which is >> > > > > >>>> > > > >> > > > per partition, unfortunately we process data >> > that's >> > > > not >> > > > > >>>> in sync >> > > > > >>>> > > > >> between >> > > > > >>>> > > > >> > > > keys so each key needs to be independent and a >> > key >> > > > can >> > > > > >>>> have much >> > > > > >>>> > > > >> older >> > > > > >>>> > > > >> > > > data >> > > > > >>>> > > > >> > > > than the other. >> > > > > >>>> > > > >> > > > >> > > > > >>>> > > > >> > > > Having a small grace period would probably >> close >> > > old >> > > > > >>>> windows >> > > > > >>>> > > sooner >> > > > > >>>> > > > >> than >> > > > > >>>> > > > >> > > > expected. That's also why in my use case a >> custom >> > > > store >> > > > > >>>> that >> > > > > >>>> > > just >> > > > > >>>> > > > >> stores >> > > > > >>>> > > > >> > > > the last window data for each key might work >> > > better. >> > > > I >> > > > > >>>> had the >> > > > > >>>> > > same >> > > > > >>>> > > > >> issue >> > > > > >>>> > > > >> > > > with suppression and it has been reported here >> > > > > >>>> > > > >> > > > >> https://issues.apache.org/jira/browse/KAFKA-8769 >> > > > > >>>> > > > >> > > > Oh I just saw that you're the one that helped >> me >> > on >> > > > > >>>> slack and >> > > > > >>>> > > > >> created the >> > > > > >>>> > > > >> > > > issue (thanks again for that). >> > > > > >>>> > > > >> > > > >> > > > > >>>> > > > >> > > > The behavior that you mention about streams >> > setting >> > > > > >>>> changelog >> > > > > >>>> > > > >> retention >> > > > > >>>> > > > >> > > > time is something they do on creation of the >> > topic >> > > > when >> > > > > >>>> the >> > > > > >>>> > > broker >> > > > > >>>> > > > >> has >> > > > > >>>> > > > >> > > auto >> > > > > >>>> > > > >> > > > creation enabled? Because we're using >> confluent >> > > cloud >> > > > > >>>> and I had >> > > > > >>>> > > to >> > > > > >>>> > > > >> create >> > > > > >>>> > > > >> > > > it manually. >> > > > > >>>> > > > >> > > > Regarding the change in the recovery behavior, >> > with >> > > > > >>>> compact >> > > > > >>>> > > cleanup >> > > > > >>>> > > > >> > > policy >> > > > > >>>> > > > >> > > > shouldn't the changelog only keep the last >> value? >> > > > That >> > > > > >>>> would >> > > > > >>>> > > make >> > > > > >>>> > > > >> the >> > > > > >>>> > > > >> > > > recovery faster and "cheaper" as it would only >> > need >> > > > to >> > > > > >>>> read a >> > > > > >>>> > > single >> > > > > >>>> > > > >> > > value >> > > > > >>>> > > > >> > > > per key (if the cleanup just happened) right? >> > > > > >>>> > > > >> > > > >> > > > > >>>> > > > >> > > > -- >> > > > > >>>> > > > >> > > > Alessandro Tagliapietra >> > > > > >>>> > > > >> > > > >> > > > > >>>> > > > >> > > > >> > > > > >>>> > > > >> > > > On Tue, Dec 3, 2019 at 8:51 PM John Roesler < >> > > > > >>>> > > vvcep...@apache.org> >> > > > > >>>> > > > >> wrote: >> > > > > >>>> > > > >> > > > >> > > > > >>>> > > > >> > > > > Hey Alessandro, >> > > > > >>>> > > > >> > > > > >> > > > > >>>> > > > >> > > > > That sounds also like it would work. I'm >> > > wondering >> > > > if >> > > > > >>>> it would >> > > > > >>>> > > > >> actually >> > > > > >>>> > > > >> > > > > change what you observe w.r.t. recovery >> > behavior, >> > > > > >>>> though. >> > > > > >>>> > > Streams >> > > > > >>>> > > > >> > > already >> > > > > >>>> > > > >> > > > > sets the retention time on the changelog to >> > equal >> > > > the >> > > > > >>>> > > retention >> > > > > >>>> > > > >> time >> > > > > >>>> > > > >> > > of the >> > > > > >>>> > > > >> > > > > windows, for windowed aggregations, so you >> > > > shouldn't >> > > > > be >> > > > > >>>> > > loading a >> > > > > >>>> > > > >> lot >> > > > > >>>> > > > >> > > of >> > > > > >>>> > > > >> > > > > window data for old windows you no longer >> care >> > > > about. >> > > > > >>>> > > > >> > > > > >> > > > > >>>> > > > >> > > > > Have you set the "grace period" on your >> window >> > > > > >>>> definition? By >> > > > > >>>> > > > >> default, >> > > > > >>>> > > > >> > > it >> > > > > >>>> > > > >> > > > > is set to 24 hours, but you can set it as >> low >> > as >> > > > you >> > > > > >>>> like. >> > > > > >>>> > > E.g., >> > > > > >>>> > > > >> if you >> > > > > >>>> > > > >> > > > > want to commit to having in-order data only, >> > then >> > > > you >> > > > > >>>> can set >> > > > > >>>> > > the >> > > > > >>>> > > > >> grace >> > > > > >>>> > > > >> > > > > period to zero. This _should_ let the broker >> > > clean >> > > > up >> > > > > >>>> the >> > > > > >>>> > > > >> changelog >> > > > > >>>> > > > >> > > records >> > > > > >>>> > > > >> > > > > as soon as the window ends. >> > > > > >>>> > > > >> > > > > >> > > > > >>>> > > > >> > > > > Of course, the log cleaner doesn't run all >> the >> > > > time, >> > > > > so >> > > > > >>>> > > there's >> > > > > >>>> > > > >> some >> > > > > >>>> > > > >> > > extra >> > > > > >>>> > > > >> > > > > delay in which "expired" data would still be >> > > > visible >> > > > > >>>> in the >> > > > > >>>> > > > >> changelog, >> > > > > >>>> > > > >> > > but >> > > > > >>>> > > > >> > > > > it would actually be just the same as if you >> > > manage >> > > > > >>>> the store >> > > > > >>>> > > > >> yourself. >> > > > > >>>> > > > >> > > > > >> > > > > >>>> > > > >> > > > > Hope this helps! >> > > > > >>>> > > > >> > > > > -John >> > > > > >>>> > > > >> > > > > >> > > > > >>>> > > > >> > > > > On Tue, Dec 3, 2019, at 22:22, Alessandro >> > > > > Tagliapietra >> > > > > >>>> wrote: >> > > > > >>>> > > > >> > > > > > Thanks John for the explanation, >> > > > > >>>> > > > >> > > > > > >> > > > > >>>> > > > >> > > > > > I thought that with EOS enabled (which we >> > have) >> > > > it >> > > > > >>>> would in >> > > > > >>>> > > the >> > > > > >>>> > > > >> worst >> > > > > >>>> > > > >> > > > > case >> > > > > >>>> > > > >> > > > > > find a valid checkpoint and start the >> restore >> > > > from >> > > > > >>>> there >> > > > > >>>> > > until >> > > > > >>>> > > > >> it >> > > > > >>>> > > > >> > > reached >> > > > > >>>> > > > >> > > > > > the last committed status, not completely >> > from >> > > > > >>>> scratch. What >> > > > > >>>> > > > >> you say >> > > > > >>>> > > > >> > > > > > definitely makes sense now. >> > > > > >>>> > > > >> > > > > > Since we don't really need old time >> windows >> > and >> > > > we >> > > > > >>>> ensure >> > > > > >>>> > > data >> > > > > >>>> > > > >> is >> > > > > >>>> > > > >> > > ordered >> > > > > >>>> > > > >> > > > > > when processed I think I"ll just write a >> > custom >> > > > > >>>> transformer >> > > > > >>>> > > to >> > > > > >>>> > > > >> keep >> > > > > >>>> > > > >> > > only >> > > > > >>>> > > > >> > > > > > the last window, store intermediate >> > aggregation >> > > > > >>>> results in >> > > > > >>>> > > the >> > > > > >>>> > > > >> store >> > > > > >>>> > > > >> > > and >> > > > > >>>> > > > >> > > > > > emit a new value only when we receive data >> > > > > belonging >> > > > > >>>> to a >> > > > > >>>> > > new >> > > > > >>>> > > > >> window. >> > > > > >>>> > > > >> > > > > > That with a compact only changelog topic >> > should >> > > > > keep >> > > > > >>>> the >> > > > > >>>> > > rebuild >> > > > > >>>> > > > >> > > data to >> > > > > >>>> > > > >> > > > > > the minimum as it would have only the last >> > > value >> > > > > for >> > > > > >>>> each >> > > > > >>>> > > key. >> > > > > >>>> > > > >> > > > > > >> > > > > >>>> > > > >> > > > > > Hope that makes sense >> > > > > >>>> > > > >> > > > > > >> > > > > >>>> > > > >> > > > > > Thanks again >> > > > > >>>> > > > >> > > > > > >> > > > > >>>> > > > >> > > > > > -- >> > > > > >>>> > > > >> > > > > > Alessandro Tagliapietra >> > > > > >>>> > > > >> > > > > > >> > > > > >>>> > > > >> > > > > > >> > > > > >>>> > > > >> > > > > > On Tue, Dec 3, 2019 at 3:04 PM John >> Roesler < >> > > > > >>>> > > > >> vvcep...@apache.org> >> > > > > >>>> > > > >> > > wrote: >> > > > > >>>> > > > >> > > > > > >> > > > > >>>> > > > >> > > > > > > Hi Alessandro, >> > > > > >>>> > > > >> > > > > > > >> > > > > >>>> > > > >> > > > > > > To take a stab at your question, maybe >> it >> > > first >> > > > > >>>> doesn't >> > > > > >>>> > > find >> > > > > >>>> > > > >> it, >> > > > > >>>> > > > >> > > but >> > > > > >>>> > > > >> > > > > then >> > > > > >>>> > > > >> > > > > > > restores some data, writes the >> checkpoint, >> > > and >> > > > > >>>> then later >> > > > > >>>> > > on, >> > > > > >>>> > > > >> it >> > > > > >>>> > > > >> > > has to >> > > > > >>>> > > > >> > > > > > > re-initialize the task for some reason, >> and >> > > > > that's >> > > > > >>>> why it >> > > > > >>>> > > does >> > > > > >>>> > > > >> > > find a >> > > > > >>>> > > > >> > > > > > > checkpoint then? >> > > > > >>>> > > > >> > > > > > > >> > > > > >>>> > > > >> > > > > > > More to the heart of the issue, if you >> have >> > > EOS >> > > > > >>>> enabled, >> > > > > >>>> > > > >> Streams >> > > > > >>>> > > > >> > > _only_ >> > > > > >>>> > > > >> > > > > > > records the checkpoint when the store is >> > in a >> > > > > >>>> > > known-consistent >> > > > > >>>> > > > >> > > state. >> > > > > >>>> > > > >> > > > > For >> > > > > >>>> > > > >> > > > > > > example, if you have a graceful >> shutdown, >> > > > Streams >> > > > > >>>> will >> > > > > >>>> > > flush >> > > > > >>>> > > > >> all >> > > > > >>>> > > > >> > > the >> > > > > >>>> > > > >> > > > > > > stores, commit all the transactions, and >> > then >> > > > > >>>> write the >> > > > > >>>> > > > >> checkpoint >> > > > > >>>> > > > >> > > > > file. >> > > > > >>>> > > > >> > > > > > > Then, on re-start, it will pick up from >> > that >> > > > > >>>> checkpoint. >> > > > > >>>> > > > >> > > > > > > >> > > > > >>>> > > > >> > > > > > > But as soon as it starts processing >> > records, >> > > it >> > > > > >>>> removes >> > > > > >>>> > > the >> > > > > >>>> > > > >> > > checkpoint >> > > > > >>>> > > > >> > > > > > > file, so if it crashes while it was >> > > processing, >> > > > > >>>> there is >> > > > > >>>> > > no >> > > > > >>>> > > > >> > > checkpoint >> > > > > >>>> > > > >> > > > > file >> > > > > >>>> > > > >> > > > > > > there, and it will have to restore from >> the >> > > > > >>>> beginning of >> > > > > >>>> > > the >> > > > > >>>> > > > >> > > changelog. >> > > > > >>>> > > > >> > > > > > > >> > > > > >>>> > > > >> > > > > > > This design is there on purpose, because >> > > > > otherwise >> > > > > >>>> we >> > > > > >>>> > > cannot >> > > > > >>>> > > > >> > > actually >> > > > > >>>> > > > >> > > > > > > guarantee correctness... For example, if >> > you >> > > > are >> > > > > >>>> > > maintaining a >> > > > > >>>> > > > >> > > count >> > > > > >>>> > > > >> > > > > > > operation, and we process an input >> record >> > i, >> > > > > >>>> increment the >> > > > > >>>> > > > >> count >> > > > > >>>> > > > >> > > and >> > > > > >>>> > > > >> > > > > write >> > > > > >>>> > > > >> > > > > > > it to the state store, and to the >> changelog >> > > > > topic. >> > > > > >>>> But we >> > > > > >>>> > > > >> crash >> > > > > >>>> > > > >> > > before >> > > > > >>>> > > > >> > > > > we >> > > > > >>>> > > > >> > > > > > > commit that transaction. Then, the >> write to >> > > the >> > > > > >>>> changelog >> > > > > >>>> > > > >> would be >> > > > > >>>> > > > >> > > > > aborted, >> > > > > >>>> > > > >> > > > > > > and we would re-process record i . >> However, >> > > > we've >> > > > > >>>> already >> > > > > >>>> > > > >> updated >> > > > > >>>> > > > >> > > the >> > > > > >>>> > > > >> > > > > local >> > > > > >>>> > > > >> > > > > > > state store, so when we increment it >> again, >> > > it >> > > > > >>>> results in >> > > > > >>>> > > > >> > > > > double-counting >> > > > > >>>> > > > >> > > > > > > i. The key point here is that there's no >> > way >> > > to >> > > > > do >> > > > > >>>> an >> > > > > >>>> > > atomic >> > > > > >>>> > > > >> > > operation >> > > > > >>>> > > > >> > > > > > > across two different systems (local >> state >> > > store >> > > > > >>>> and the >> > > > > >>>> > > > >> changelog >> > > > > >>>> > > > >> > > > > topic). >> > > > > >>>> > > > >> > > > > > > Since we can't guarantee that we roll >> back >> > > the >> > > > > >>>> incremented >> > > > > >>>> > > > >> count >> > > > > >>>> > > > >> > > when >> > > > > >>>> > > > >> > > > > the >> > > > > >>>> > > > >> > > > > > > changelog transaction is aborted, we >> can't >> > > keep >> > > > > >>>> the local >> > > > > >>>> > > > >> store >> > > > > >>>> > > > >> > > > > consistent >> > > > > >>>> > > > >> > > > > > > with the changelog. >> > > > > >>>> > > > >> > > > > > > >> > > > > >>>> > > > >> > > > > > > After a crash, the only way to ensure >> the >> > > local >> > > > > >>>> store is >> > > > > >>>> > > > >> consistent >> > > > > >>>> > > > >> > > > > with >> > > > > >>>> > > > >> > > > > > > the changelog is to discard the entire >> > thing >> > > > and >> > > > > >>>> rebuild >> > > > > >>>> > > it. >> > > > > >>>> > > > >> This >> > > > > >>>> > > > >> > > is >> > > > > >>>> > > > >> > > > > why we >> > > > > >>>> > > > >> > > > > > > have an invariant that the checkpoint >> file >> > > only >> > > > > >>>> exists >> > > > > >>>> > > when we >> > > > > >>>> > > > >> > > _know_ >> > > > > >>>> > > > >> > > > > that >> > > > > >>>> > > > >> > > > > > > the local store is consistent with the >> > > > changelog, >> > > > > >>>> and >> > > > > >>>> > > this is >> > > > > >>>> > > > >> why >> > > > > >>>> > > > >> > > > > you're >> > > > > >>>> > > > >> > > > > > > seeing so much bandwidth when >> re-starting >> > > from >> > > > an >> > > > > >>>> unclean >> > > > > >>>> > > > >> shutdown. >> > > > > >>>> > > > >> > > > > > > >> > > > > >>>> > > > >> > > > > > > Note that it's definitely possible to do >> > > better >> > > > > >>>> than this, >> > > > > >>>> > > > >> and we >> > > > > >>>> > > > >> > > would >> > > > > >>>> > > > >> > > > > > > very much like to improve it in the >> future. >> > > > > >>>> > > > >> > > > > > > >> > > > > >>>> > > > >> > > > > > > Thanks, >> > > > > >>>> > > > >> > > > > > > -John >> > > > > >>>> > > > >> > > > > > > >> > > > > >>>> > > > >> > > > > > > On Tue, Dec 3, 2019, at 16:16, >> Alessandro >> > > > > >>>> Tagliapietra >> > > > > >>>> > > wrote: >> > > > > >>>> > > > >> > > > > > > > Hi John, >> > > > > >>>> > > > >> > > > > > > > >> > > > > >>>> > > > >> > > > > > > > thanks a lot for helping, regarding >> your >> > > > > message: >> > > > > >>>> > > > >> > > > > > > > - no we only have 1 instance of the >> > stream >> > > > > >>>> application, >> > > > > >>>> > > > >> and it >> > > > > >>>> > > > >> > > > > always >> > > > > >>>> > > > >> > > > > > > > re-uses the same state folder >> > > > > >>>> > > > >> > > > > > > > - yes we're seeing most issues when >> > > > restarting >> > > > > >>>> not >> > > > > >>>> > > > >> gracefully >> > > > > >>>> > > > >> > > due >> > > > > >>>> > > > >> > > > > > > exception >> > > > > >>>> > > > >> > > > > > > > >> > > > > >>>> > > > >> > > > > > > > I've enabled trace logging and >> filtering >> > > by a >> > > > > >>>> single >> > > > > >>>> > > state >> > > > > >>>> > > > >> store >> > > > > >>>> > > > >> > > the >> > > > > >>>> > > > >> > > > > > > > StoreChangelogReader messages are: >> > > > > >>>> > > > >> > > > > > > > >> > > > > >>>> > > > >> > > > > > > > Added restorer for changelog >> > > > > >>>> > > > >> > > > > sensors-stream-aggregate-store-changelog-0 >> > > > > >>>> > > > >> > > > > > > > Added restorer for changelog >> > > > > >>>> > > > >> > > > > sensors-stream-aggregate-store-changelog-1 >> > > > > >>>> > > > >> > > > > > > > Added restorer for changelog >> > > > > >>>> > > > >> > > > > sensors-stream-aggregate-store-changelog-2 >> > > > > >>>> > > > >> > > > > > > > Did not find checkpoint from changelog >> > > > > >>>> > > > >> > > > > > > > >> > sensors-stream-aggregate-store-changelog-2 >> > > > for >> > > > > >>>> store >> > > > > >>>> > > > >> > > aggregate-store, >> > > > > >>>> > > > >> > > > > > > > rewinding to beginning. >> > > > > >>>> > > > >> > > > > > > > Did not find checkpoint from changelog >> > > > > >>>> > > > >> > > > > > > > >> > sensors-stream-aggregate-store-changelog-1 >> > > > for >> > > > > >>>> store >> > > > > >>>> > > > >> > > aggregate-store, >> > > > > >>>> > > > >> > > > > > > > rewinding to beginning. >> > > > > >>>> > > > >> > > > > > > > Did not find checkpoint from changelog >> > > > > >>>> > > > >> > > > > > > > >> > sensors-stream-aggregate-store-changelog-0 >> > > > for >> > > > > >>>> store >> > > > > >>>> > > > >> > > aggregate-store, >> > > > > >>>> > > > >> > > > > > > > rewinding to beginning. >> > > > > >>>> > > > >> > > > > > > > No checkpoint found for task 0_2 state >> > > store >> > > > > >>>> > > aggregate-store >> > > > > >>>> > > > >> > > > > changelog >> > > > > >>>> > > > >> > > > > > > > >> > sensors-stream-aggregate-store-changelog-2 >> > > > with >> > > > > >>>> EOS >> > > > > >>>> > > turned >> > > > > >>>> > > > >> on. >> > > > > >>>> > > > >> > > > > > > > Reinitializing the task and restore >> its >> > > state >> > > > > >>>> from the >> > > > > >>>> > > > >> beginning. >> > > > > >>>> > > > >> > > > > > > > No checkpoint found for task 0_1 state >> > > store >> > > > > >>>> > > aggregate-store >> > > > > >>>> > > > >> > > > > changelog >> > > > > >>>> > > > >> > > > > > > > >> > sensors-stream-aggregate-store-changelog-1 >> > > > with >> > > > > >>>> EOS >> > > > > >>>> > > turned >> > > > > >>>> > > > >> on. >> > > > > >>>> > > > >> > > > > > > > Reinitializing the task and restore >> its >> > > state >> > > > > >>>> from the >> > > > > >>>> > > > >> beginning. >> > > > > >>>> > > > >> > > > > > > > No checkpoint found for task 0_0 state >> > > store >> > > > > >>>> > > aggregate-store >> > > > > >>>> > > > >> > > > > changelog >> > > > > >>>> > > > >> > > > > > > > >> > sensors-stream-aggregate-store-changelog-0 >> > > > with >> > > > > >>>> EOS >> > > > > >>>> > > turned >> > > > > >>>> > > > >> on. >> > > > > >>>> > > > >> > > > > > > > Reinitializing the task and restore >> its >> > > state >> > > > > >>>> from the >> > > > > >>>> > > > >> beginning. >> > > > > >>>> > > > >> > > > > > > > Found checkpoint 709937 from changelog >> > > > > >>>> > > > >> > > > > > > > >> > sensors-stream-aggregate-store-changelog-2 >> > > > for >> > > > > >>>> store >> > > > > >>>> > > > >> > > aggregate-store. >> > > > > >>>> > > > >> > > > > > > > Restoring partition >> > > > > >>>> > > > >> sensors-stream-aggregate-store-changelog-2 >> > > > > >>>> > > > >> > > from >> > > > > >>>> > > > >> > > > > > > offset >> > > > > >>>> > > > >> > > > > > > > 709937 to endOffset 742799 >> > > > > >>>> > > > >> > > > > > > > Found checkpoint 3024234 from >> changelog >> > > > > >>>> > > > >> > > > > > > > >> > sensors-stream-aggregate-store-changelog-1 >> > > > for >> > > > > >>>> store >> > > > > >>>> > > > >> > > aggregate-store. >> > > > > >>>> > > > >> > > > > > > > Restoring partition >> > > > > >>>> > > > >> sensors-stream-aggregate-store-changelog-1 >> > > > > >>>> > > > >> > > from >> > > > > >>>> > > > >> > > > > > > offset >> > > > > >>>> > > > >> > > > > > > > 3024234 to endOffset 3131513 >> > > > > >>>> > > > >> > > > > > > > Found checkpoint 14514072 from >> changelog >> > > > > >>>> > > > >> > > > > > > > >> > sensors-stream-aggregate-store-changelog-0 >> > > > for >> > > > > >>>> store >> > > > > >>>> > > > >> > > aggregate-store. >> > > > > >>>> > > > >> > > > > > > > Restoring partition >> > > > > >>>> > > > >> sensors-stream-aggregate-store-changelog-0 >> > > > > >>>> > > > >> > > from >> > > > > >>>> > > > >> > > > > > > offset >> > > > > >>>> > > > >> > > > > > > > 14514072 to endOffset 17116574 >> > > > > >>>> > > > >> > > > > > > > Restored from >> > > > > >>>> > > sensors-stream-aggregate-store-changelog-2 to >> > > > > >>>> > > > >> > > > > > > aggregate-store >> > > > > >>>> > > > >> > > > > > > > with 966 records, ending offset is >> > 711432, >> > > > next >> > > > > >>>> starting >> > > > > >>>> > > > >> > > position is >> > > > > >>>> > > > >> > > > > > > 711434 >> > > > > >>>> > > > >> > > > > > > > Restored from >> > > > > >>>> > > sensors-stream-aggregate-store-changelog-2 to >> > > > > >>>> > > > >> > > > > > > aggregate-store >> > > > > >>>> > > > >> > > > > > > > with 914 records, ending offset is >> > 712711, >> > > > next >> > > > > >>>> starting >> > > > > >>>> > > > >> > > position is >> > > > > >>>> > > > >> > > > > > > 712713 >> > > > > >>>> > > > >> > > > > > > > Restored from >> > > > > >>>> > > sensors-stream-aggregate-store-changelog-1 to >> > > > > >>>> > > > >> > > > > > > aggregate-store >> > > > > >>>> > > > >> > > > > > > > with 18 records, ending offset is >> > 3024261, >> > > > next >> > > > > >>>> starting >> > > > > >>>> > > > >> > > position is >> > > > > >>>> > > > >> > > > > > > 3024262 >> > > > > >>>> > > > >> > > > > > > > >> > > > > >>>> > > > >> > > > > > > > >> > > > > >>>> > > > >> > > > > > > > why it first says it didn't find the >> > > > checkpoint >> > > > > >>>> and >> > > > > >>>> > > then it >> > > > > >>>> > > > >> does >> > > > > >>>> > > > >> > > > > find it? >> > > > > >>>> > > > >> > > > > > > > It seems it loaded about 2.7M records >> > (sum >> > > > of >> > > > > >>>> offset >> > > > > >>>> > > > >> difference >> > > > > >>>> > > > >> > > in >> > > > > >>>> > > > >> > > > > the >> > > > > >>>> > > > >> > > > > > > > "restorting partition ...." messages) >> > > right? >> > > > > >>>> > > > >> > > > > > > > Maybe should I try to reduce the >> > checkpoint >> > > > > >>>> interval? >> > > > > >>>> > > > >> > > > > > > > >> > > > > >>>> > > > >> > > > > > > > Regards >> > > > > >>>> > > > >> > > > > > > > >> > > > > >>>> > > > >> > > > > > > > -- >> > > > > >>>> > > > >> > > > > > > > Alessandro Tagliapietra >> > > > > >>>> > > > >> > > > > > > > >> > > > > >>>> > > > >> > > > > > > > >> > > > > >>>> > > > >> > > > > > > > On Mon, Dec 2, 2019 at 9:18 AM John >> > > Roesler < >> > > > > >>>> > > > >> vvcep...@apache.org >> > > > > >>>> > > > >> > > > >> > > > > >>>> > > > >> > > > > wrote: >> > > > > >>>> > > > >> > > > > > > > >> > > > > >>>> > > > >> > > > > > > > > Hi Alessandro, >> > > > > >>>> > > > >> > > > > > > > > >> > > > > >>>> > > > >> > > > > > > > > I'm sorry to hear that. >> > > > > >>>> > > > >> > > > > > > > > >> > > > > >>>> > > > >> > > > > > > > > The restore process only takes one >> > factor >> > > > > into >> > > > > >>>> > > account: >> > > > > >>>> > > > >> the >> > > > > >>>> > > > >> > > current >> > > > > >>>> > > > >> > > > > > > offset >> > > > > >>>> > > > >> > > > > > > > > position of the changelog topic is >> > stored >> > > > in >> > > > > a >> > > > > >>>> local >> > > > > >>>> > > file >> > > > > >>>> > > > >> > > > > alongside the >> > > > > >>>> > > > >> > > > > > > > > state stores. On startup, the app >> > checks >> > > if >> > > > > the >> > > > > >>>> > > recorded >> > > > > >>>> > > > >> > > position >> > > > > >>>> > > > >> > > > > lags >> > > > > >>>> > > > >> > > > > > > the >> > > > > >>>> > > > >> > > > > > > > > latest offset in the changelog. If >> so, >> > > then >> > > > > it >> > > > > >>>> reads >> > > > > >>>> > > the >> > > > > >>>> > > > >> > > missing >> > > > > >>>> > > > >> > > > > > > changelog >> > > > > >>>> > > > >> > > > > > > > > records before starting processing. >> > > > > >>>> > > > >> > > > > > > > > >> > > > > >>>> > > > >> > > > > > > > > Thus, it would not restore any old >> > window >> > > > > data. >> > > > > >>>> > > > >> > > > > > > > > >> > > > > >>>> > > > >> > > > > > > > > There might be a few different >> things >> > > going >> > > > > on >> > > > > >>>> to >> > > > > >>>> > > explain >> > > > > >>>> > > > >> your >> > > > > >>>> > > > >> > > > > > > observation: >> > > > > >>>> > > > >> > > > > > > > > * if there is more than one >> instance in >> > > > your >> > > > > >>>> Streams >> > > > > >>>> > > > >> cluster, >> > > > > >>>> > > > >> > > > > maybe the >> > > > > >>>> > > > >> > > > > > > > > task is "flopping" between >> instances, >> > so >> > > > each >> > > > > >>>> instance >> > > > > >>>> > > > >> still >> > > > > >>>> > > > >> > > has to >> > > > > >>>> > > > >> > > > > > > recover >> > > > > >>>> > > > >> > > > > > > > > state, since it wasn't the last one >> > > > actively >> > > > > >>>> > > processing >> > > > > >>>> > > > >> it. >> > > > > >>>> > > > >> > > > > > > > > * if the application isn't stopped >> > > > > gracefully, >> > > > > >>>> it >> > > > > >>>> > > might >> > > > > >>>> > > > >> not >> > > > > >>>> > > > >> > > get a >> > > > > >>>> > > > >> > > > > > > chance >> > > > > >>>> > > > >> > > > > > > > > to record its offset in that local >> > file, >> > > so >> > > > > on >> > > > > >>>> > > restart it >> > > > > >>>> > > > >> has >> > > > > >>>> > > > >> > > to >> > > > > >>>> > > > >> > > > > > > restore >> > > > > >>>> > > > >> > > > > > > > > some or all of the state store from >> > > > > changelog. >> > > > > >>>> > > > >> > > > > > > > > >> > > > > >>>> > > > >> > > > > > > > > Or it could be something else; >> that's >> > > just >> > > > > >>>> what comes >> > > > > >>>> > > to >> > > > > >>>> > > > >> mind. >> > > > > >>>> > > > >> > > > > > > > > >> > > > > >>>> > > > >> > > > > > > > > If you want to get to the bottom of >> it, >> > > you >> > > > > >>>> can take a >> > > > > >>>> > > > >> look at >> > > > > >>>> > > > >> > > the >> > > > > >>>> > > > >> > > > > > > logs, >> > > > > >>>> > > > >> > > > > > > > > paying close attention to which >> tasks >> > are >> > > > > >>>> assigned to >> > > > > >>>> > > > >> which >> > > > > >>>> > > > >> > > > > instances >> > > > > >>>> > > > >> > > > > > > after >> > > > > >>>> > > > >> > > > > > > > > each restart. You can also look into >> > the >> > > > logs >> > > > > >>>> from >> > > > > >>>> > > > >> > > > > > > > > >> > > > > >>>> > > > >> > > >> > > > > >>>> > > >> > > > > >> `org.apache.kafka.streams.processor.internals.StoreChangelogReader` >> > > > > >>>> > > > >> > > > > > > (might >> > > > > >>>> > > > >> > > > > > > > > want to set it to DEBUG or TRACE >> level >> > to >> > > > > >>>> really see >> > > > > >>>> > > > >> what's >> > > > > >>>> > > > >> > > > > happening). >> > > > > >>>> > > > >> > > > > > > > > >> > > > > >>>> > > > >> > > > > > > > > I hope this helps! >> > > > > >>>> > > > >> > > > > > > > > -John >> > > > > >>>> > > > >> > > > > > > > > >> > > > > >>>> > > > >> > > > > > > > > On Sun, Dec 1, 2019, at 21:25, >> > Alessandro >> > > > > >>>> Tagliapietra >> > > > > >>>> > > > >> wrote: >> > > > > >>>> > > > >> > > > > > > > > > Hello everyone, >> > > > > >>>> > > > >> > > > > > > > > > >> > > > > >>>> > > > >> > > > > > > > > > we're having a problem with >> bandwidth >> > > > usage >> > > > > >>>> on >> > > > > >>>> > > streams >> > > > > >>>> > > > >> > > > > application >> > > > > >>>> > > > >> > > > > > > > > startup, >> > > > > >>>> > > > >> > > > > > > > > > our current setup does this: >> > > > > >>>> > > > >> > > > > > > > > > >> > > > > >>>> > > > >> > > > > > > > > > ... >> > > > > >>>> > > > >> > > > > > > > > > .groupByKey() >> > > > > >>>> > > > >> > > > > > > > > > >> > > > > >>>> > > > >> > > >> > > > > >>>> >> .windowedBy<TimeWindow>(TimeWindows.of(Duration.ofMinutes(1))) >> > > > > >>>> > > > >> > > > > > > > > > .aggregate( >> > > > > >>>> > > > >> > > > > > > > > > { >> > > MetricSequenceList(ArrayList()) >> > > > > }, >> > > > > >>>> > > > >> > > > > > > > > > { key, value, aggregate -> >> > > > > >>>> > > > >> > > > > > > > > > >> > > > > aggregate.getRecords().add(value) >> > > > > >>>> > > > >> > > > > > > > > > aggregate >> > > > > >>>> > > > >> > > > > > > > > > }, >> > > > > >>>> > > > >> > > > > > > > > > Materialized.`as`<String, >> > > > > >>>> > > MetricSequenceList, >> > > > > >>>> > > > >> > > > > > > WindowStore<Bytes, >> > > > > >>>> > > > >> > > > > > > > > > >> > > > > >>>> > > > >> > > > > > > > > >> > > > > >>>> > > > >> > > > > > > >> > > > > >>>> > > > >> > > > > >> > > > > >>>> > > > >> > > >> > > > > >>>> > > > >> >> > > > > >>>> > > >> > > > > >>>> >> > > > > >> > > > >> > > >> > >> ByteArray>>("aggregate-store").withKeySerde(Serdes.String()).withValueSerde(Settings.getValueSpecificavroSerde()) >> > > > > >>>> > > > >> > > > > > > > > > ) >> > > > > >>>> > > > >> > > > > > > > > > .toStream() >> > > > > >>>> > > > >> > > > > > > > > > >> .flatTransform(TransformerSupplier { >> > > > > >>>> > > > >> > > > > > > > > > ... >> > > > > >>>> > > > >> > > > > > > > > > >> > > > > >>>> > > > >> > > > > > > > > > basically in each window we append >> > the >> > > > new >> > > > > >>>> values >> > > > > >>>> > > and >> > > > > >>>> > > > >> then do >> > > > > >>>> > > > >> > > > > some >> > > > > >>>> > > > >> > > > > > > other >> > > > > >>>> > > > >> > > > > > > > > > logic with the array of windowed >> > > values. >> > > > > >>>> > > > >> > > > > > > > > > The aggregate-store changelog >> topic >> > > > > >>>> configuration >> > > > > >>>> > > uses >> > > > > >>>> > > > >> > > > > > > compact,delete as >> > > > > >>>> > > > >> > > > > > > > > > cleanup policy and has 12 hours of >> > > > > retention. >> > > > > >>>> > > > >> > > > > > > > > > >> > > > > >>>> > > > >> > > > > > > > > > What we've seen is that on >> > application >> > > > > >>>> startup it >> > > > > >>>> > > takes >> > > > > >>>> > > > >> a >> > > > > >>>> > > > >> > > couple >> > > > > >>>> > > > >> > > > > > > minutes >> > > > > >>>> > > > >> > > > > > > > > to >> > > > > >>>> > > > >> > > > > > > > > > rebuild the state store, even if >> the >> > > > state >> > > > > >>>> store >> > > > > >>>> > > > >> directory is >> > > > > >>>> > > > >> > > > > > > persisted >> > > > > >>>> > > > >> > > > > > > > > > across restarts. That along with >> an >> > > > > >>>> exception that >> > > > > >>>> > > > >> caused the >> > > > > >>>> > > > >> > > > > docker >> > > > > >>>> > > > >> > > > > > > > > > container to be restarted a couple >> > > > hundreds >> > > > > >>>> times >> > > > > >>>> > > > >> caused a >> > > > > >>>> > > > >> > > big >> > > > > >>>> > > > >> > > > > > > confluent >> > > > > >>>> > > > >> > > > > > > > > > cloud bill compared to what we >> > usually >> > > > > spend >> > > > > >>>> (1/4 >> > > > > >>>> > > of a >> > > > > >>>> > > > >> full >> > > > > >>>> > > > >> > > > > month in >> > > > > >>>> > > > >> > > > > > > 1 >> > > > > >>>> > > > >> > > > > > > > > day). >> > > > > >>>> > > > >> > > > > > > > > > >> > > > > >>>> > > > >> > > > > > > > > > What I think is happening is that >> the >> > > > topic >> > > > > >>>> is >> > > > > >>>> > > keeping >> > > > > >>>> > > > >> all >> > > > > >>>> > > > >> > > the >> > > > > >>>> > > > >> > > > > > > previous >> > > > > >>>> > > > >> > > > > > > > > > windows even with the compacting >> > policy >> > > > > >>>> because each >> > > > > >>>> > > > >> key is >> > > > > >>>> > > > >> > > the >> > > > > >>>> > > > >> > > > > > > original >> > > > > >>>> > > > >> > > > > > > > > > key + the timestamp not just the >> key. >> > > > Since >> > > > > >>>> we don't >> > > > > >>>> > > > >> care >> > > > > >>>> > > > >> > > about >> > > > > >>>> > > > >> > > > > > > previous >> > > > > >>>> > > > >> > > > > > > > > > windows as the flatTransform after >> > the >> > > > > >>>> toStream() >> > > > > >>>> > > makes >> > > > > >>>> > > > >> sure >> > > > > >>>> > > > >> > > > > that we >> > > > > >>>> > > > >> > > > > > > > > don't >> > > > > >>>> > > > >> > > > > > > > > > process old windows (a custom >> > > suppressor >> > > > > >>>> basically) >> > > > > >>>> > > is >> > > > > >>>> > > > >> there >> > > > > >>>> > > > >> > > a >> > > > > >>>> > > > >> > > > > way to >> > > > > >>>> > > > >> > > > > > > > > only >> > > > > >>>> > > > >> > > > > > > > > > keep the last window so that the >> > store >> > > > > >>>> rebuilding >> > > > > >>>> > > goes >> > > > > >>>> > > > >> > > faster and >> > > > > >>>> > > > >> > > > > > > without >> > > > > >>>> > > > >> > > > > > > > > > rebuilding old windows too? Or >> > should I >> > > > > >>>> create a >> > > > > >>>> > > custom >> > > > > >>>> > > > >> > > window >> > > > > >>>> > > > >> > > > > using >> > > > > >>>> > > > >> > > > > > > the >> > > > > >>>> > > > >> > > > > > > > > > original key as key so that the >> > > > compaction >> > > > > >>>> keeps >> > > > > >>>> > > only >> > > > > >>>> > > > >> the >> > > > > >>>> > > > >> > > last >> > > > > >>>> > > > >> > > > > window >> > > > > >>>> > > > >> > > > > > > > > data? >> > > > > >>>> > > > >> > > > > > > > > > >> > > > > >>>> > > > >> > > > > > > > > > Thank you >> > > > > >>>> > > > >> > > > > > > > > > >> > > > > >>>> > > > >> > > > > > > > > > -- >> > > > > >>>> > > > >> > > > > > > > > > Alessandro Tagliapietra >> > > > > >>>> > > > >> > > > > > > > > > >> > > > > >>>> > > > >> > > > > > > > > >> > > > > >>>> > > > >> > > > > > > > >> > > > > >>>> > > > >> > > > > > > >> > > > > >>>> > > > >> > > > > > >> > > > > >>>> > > > >> > > > > >> > > > > >>>> > > > >> > > > >> > > > > >>>> > > > >> > > >> > > > > >>>> > > > >> > >> > > > > >>>> > > > >> >> > > > > >>>> > > > > >> > > > > >>>> > > > >> > > > > >>>> > > >> > > > > >>>> > >> > > > > >>>> >> > > > > >>> >> > > > > >> > > > >> > > >> > >> >