It seems that even with caching enabled, after a while the sent bytes stil go up
[image: Screen Shot 2019-12-08 at 12.52.31 PM.png] you can see the deploy when I've enabled caching but it looks like it's still a temporary solution. -- Alessandro Tagliapietra On Sat, Dec 7, 2019 at 10:08 AM Alessandro Tagliapietra < tagliapietra.alessan...@gmail.com> wrote: > Could be, but since we have a limite amount of input keys (~30), windowing > generates new keys but old ones are never touched again since the data per > key is in order, I assume it shouldn't be a big deal for it to handle 30 > keys > I'll have a look at cache metrics and see if something pops out > > Thanks > > -- > Alessandro Tagliapietra > > > On Sat, Dec 7, 2019 at 10:02 AM John Roesler <vvcep...@apache.org> wrote: > >> Hmm, that’s a good question. Now that we’re talking about caching, I >> wonder if the cache was just too small. It’s not very big by default. >> >> On Sat, Dec 7, 2019, at 11:16, Alessandro Tagliapietra wrote: >> > Ok I'll check on that! >> > >> > Now I can see that with caching we went from 3-4MB/s to 400KB/s, that >> will >> > help with the bill. >> > >> > Last question, any reason why after a while the regular windowed stream >> > starts sending every update instead of caching? >> > Could it be because it doesn't have any more memory available? Any other >> > possible reason? >> > >> > Thank you so much for your help >> > >> > -- >> > Alessandro Tagliapietra >> > >> > >> > On Sat, Dec 7, 2019 at 9:14 AM John Roesler <vvcep...@apache.org> >> wrote: >> > >> > > Ah, yes. Glad you figured it out! >> > > >> > > Caching does not reduce EOS guarantees at all. I highly recommend >> using >> > > it. You might even want to take a look at the caching metrics to make >> sure >> > > you have a good hit ratio. >> > > >> > > -John >> > > >> > > On Sat, Dec 7, 2019, at 10:51, Alessandro Tagliapietra wrote: >> > > > Never mind I've found out I can use `.withCachingEnabled` on the >> store >> > > > builder to achieve the same thing as the windowing example as >> > > > `Materialized.as` turns that on by default. >> > > > >> > > > Does caching in any way reduces the EOS guarantees? >> > > > >> > > > -- >> > > > Alessandro Tagliapietra >> > > > >> > > > >> > > > On Sat, Dec 7, 2019 at 1:12 AM Alessandro Tagliapietra < >> > > > tagliapietra.alessan...@gmail.com> wrote: >> > > > >> > > > > Seems my journey with this isn't done just yet, >> > > > > >> > > > > This seems very complicated to me but I'll try to explain it as >> best I >> > > can. >> > > > > To better understand the streams network usage I've used >> prometheus >> > > with >> > > > > the JMX exporter to export kafka metrics. >> > > > > To check the amount of data we use I'm looking at the increments >> > > > > of kafka_producer_topic_metrics_byte_total and >> > > > > kafka_producer_producer_topic_metrics_record_send_total, >> > > > > >> > > > > Our current (before the change mentioned above) code looks like >> this: >> > > > > >> > > > > // This transformers just pairs a value with the previous one >> storing >> > > the >> > > > > temporary one in a store >> > > > > val pairsStream = metricStream >> > > > > .transformValues(ValueTransformerWithKeySupplier { >> PairTransformer() >> > > }, >> > > > > "LastValueStore") >> > > > > .filter { _, value: MetricSequence? -> value != null } >> > > > > >> > > > > // Create a store to store suppressed windows until a new one is >> > > received >> > > > > val suppressStoreSupplier = >> > > > > >> > > >> Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("suppress-store"), >> > > > > ...... >> > > > > >> > > > > // Window and aggregate data in 1 minute intervals >> > > > > val aggregatedStream = pairsStream >> > > > > .groupByKey() >> > > > > .windowedBy<TimeWindow>(TimeWindows.of(Duration.ofMinutes(1))) >> > > > > .aggregate( >> > > > > { MetricSequenceList(ArrayList()) }, >> > > > > { key, value, aggregate -> >> > > > > aggregate.getRecords().add(value) >> > > > > aggregate >> > > > > }, >> > > > > Materialized.`as`<String, MetricSequenceList, >> > > WindowStore<Bytes, >> > > > > >> > > >> ByteArray>>("aggregate-store").withKeySerde(Serdes.String()).withValueSerde(Settings.getValueSpecificavroSerde()) >> > > > > ) >> > > > > .toStream() >> > > > > .flatTransform(TransformerSupplier { >> > > > > // This transformer basically waits until a new window is >> > > received >> > > > > to emit the previous one >> > > > > }, "suppress-store") >> > > > > .map { sensorId: String, suppressedOutput: SuppressedOutput -> >> > > > > .... etc .... >> > > > > >> > > > > >> > > > > Basically: >> > > > > - all data goes through LastValueStore store that stores each >> message >> > > and >> > > > > emits a pair with the previous one >> > > > > - the aggregate-store is used to store the per-window list of >> > > messages in >> > > > > the aggregate method >> > > > > - the suppress store is used to store each received window which >> is >> > > > > emitted only after a newer one is received >> > > > > >> > > > > What I'm experiencing is that: >> > > > > - during normal execution, the streams app sends to the lastvalue >> > > store >> > > > > changelog topic 5k messages/min, the aggregate and suppress store >> > > changelog >> > > > > topics only about 100 >> > > > > - at some point (after many hours of operation), the streams app >> > > starts >> > > > > sending to the aggregate and suppress store changelog topic the >> same >> > > amount >> > > > > of messages going through the lastvaluestore >> > > > > - if I restart the streams app it goes back to the initial >> behavior >> > > > > >> > > > > You can see the behavior in this graph https://imgur.com/dJcUNSf >> > > > > You can also see that after a restart everything goes back to >> normal >> > > > > levels. >> > > > > Regarding other metrics, process latency increases, poll latency >> > > > > decreases, poll rate decreases, commit rate stays the same while >> commit >> > > > > latency increases. >> > > > > >> > > > > Now, I've these questions: >> > > > > - why isn't the aggregate/suppress store changelog topic >> throughput >> > > the >> > > > > same as the LastValueStore? Shouldn't every time it aggregates >> send a >> > > > > record to the changelog? >> > > > > - is the windowing doing some internal caching like not sending >> every >> > > > > aggregation record until the window time is passed? (if so, where >> can I >> > > > > find that code since I would like to use that also for our new >> > > > > implementation) >> > > > > >> > > > > Thank you in advance >> > > > > >> > > > > -- >> > > > > Alessandro Tagliapietra >> > > > > >> > > > > >> > > > > On Wed, Dec 4, 2019 at 7:57 AM John Roesler <vvcep...@apache.org> >> > > wrote: >> > > > > >> > > > >> Oh, good! >> > > > >> >> > > > >> On Tue, Dec 3, 2019, at 23:29, Alessandro Tagliapietra wrote: >> > > > >> > Testing on staging shows that a restart on exception is much >> faster >> > > and >> > > > >> the >> > > > >> > stream starts right away which I think means we're reading way >> less >> > > data >> > > > >> > than before! >> > > > >> > >> > > > >> > What I was referring to is that, in Streams, the keys for >> window >> > > > >> > > aggregation state is actually composed of both the window >> itself >> > > and >> > > > >> the >> > > > >> > > key. In the DSL, it looks like "Windowed<K>". That results >> in the >> > > > >> store >> > > > >> > > having a unique key per window for each K, which is why we >> need >> > > > >> retention >> > > > >> > > as well as compaction for our changelogs. But for you, if >> you just >> > > > >> make the >> > > > >> > > key "K", then compaction alone should do the trick. >> > > > >> > >> > > > >> > Yes we had compact,delete as cleanup policy but probably it >> still >> > > had a >> > > > >> too >> > > > >> > long retention value, also the rocksdb store is probably much >> > > faster now >> > > > >> > having only one key per key instead of one key per window per >> key. >> > > > >> > >> > > > >> > Thanks a lot for helping! I'm now going to setup a >> prometheus-jmx >> > > > >> > monitoring so we can keep better track of what's going on :) >> > > > >> > >> > > > >> > -- >> > > > >> > Alessandro Tagliapietra >> > > > >> > >> > > > >> > >> > > > >> > On Tue, Dec 3, 2019 at 9:12 PM John Roesler < >> vvcep...@apache.org> >> > > > >> wrote: >> > > > >> > >> > > > >> > > Oh, yeah, I remember that conversation! >> > > > >> > > >> > > > >> > > Yes, then, I agree, if you're only storing state of the most >> > > recent >> > > > >> window >> > > > >> > > for each key, and the key you use for that state is actually >> the >> > > key >> > > > >> of the >> > > > >> > > records, then an aggressive compaction policy plus your >> custom >> > > > >> transformer >> > > > >> > > seems like a good way forward. >> > > > >> > > >> > > > >> > > What I was referring to is that, in Streams, the keys for >> window >> > > > >> > > aggregation state is actually composed of both the window >> itself >> > > and >> > > > >> the >> > > > >> > > key. In the DSL, it looks like "Windowed<K>". That results >> in the >> > > > >> store >> > > > >> > > having a unique key per window for each K, which is why we >> need >> > > > >> retention >> > > > >> > > as well as compaction for our changelogs. But for you, if >> you just >> > > > >> make the >> > > > >> > > key "K", then compaction alone should do the trick. >> > > > >> > > >> > > > >> > > And yes, if you manage the topic yourself, then Streams won't >> > > adjust >> > > > >> the >> > > > >> > > retention time. I think it might validate that the retention >> > > isn't too >> > > > >> > > short, but I don't remember offhand. >> > > > >> > > >> > > > >> > > Cheers, and let me know how it goes! >> > > > >> > > -John >> > > > >> > > >> > > > >> > > On Tue, Dec 3, 2019, at 23:03, Alessandro Tagliapietra wrote: >> > > > >> > > > Hi John, >> > > > >> > > > >> > > > >> > > > afaik grace period uses stream time >> > > > >> > > > >> > > > >> > > >> > > > >> >> > > >> https://kafka.apache.org/21/javadoc/org/apache/kafka/streams/kstream/Windows.html >> > > > >> > > > which is >> > > > >> > > > per partition, unfortunately we process data that's not in >> sync >> > > > >> between >> > > > >> > > > keys so each key needs to be independent and a key can >> have much >> > > > >> older >> > > > >> > > > data >> > > > >> > > > than the other. >> > > > >> > > > >> > > > >> > > > Having a small grace period would probably close old >> windows >> > > sooner >> > > > >> than >> > > > >> > > > expected. That's also why in my use case a custom store >> that >> > > just >> > > > >> stores >> > > > >> > > > the last window data for each key might work better. I had >> the >> > > same >> > > > >> issue >> > > > >> > > > with suppression and it has been reported here >> > > > >> > > > https://issues.apache.org/jira/browse/KAFKA-8769 >> > > > >> > > > Oh I just saw that you're the one that helped me on slack >> and >> > > > >> created the >> > > > >> > > > issue (thanks again for that). >> > > > >> > > > >> > > > >> > > > The behavior that you mention about streams setting >> changelog >> > > > >> retention >> > > > >> > > > time is something they do on creation of the topic when the >> > > broker >> > > > >> has >> > > > >> > > auto >> > > > >> > > > creation enabled? Because we're using confluent cloud and >> I had >> > > to >> > > > >> create >> > > > >> > > > it manually. >> > > > >> > > > Regarding the change in the recovery behavior, with compact >> > > cleanup >> > > > >> > > policy >> > > > >> > > > shouldn't the changelog only keep the last value? That >> would >> > > make >> > > > >> the >> > > > >> > > > recovery faster and "cheaper" as it would only need to >> read a >> > > single >> > > > >> > > value >> > > > >> > > > per key (if the cleanup just happened) right? >> > > > >> > > > >> > > > >> > > > -- >> > > > >> > > > Alessandro Tagliapietra >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > On Tue, Dec 3, 2019 at 8:51 PM John Roesler < >> > > vvcep...@apache.org> >> > > > >> wrote: >> > > > >> > > > >> > > > >> > > > > Hey Alessandro, >> > > > >> > > > > >> > > > >> > > > > That sounds also like it would work. I'm wondering if it >> would >> > > > >> actually >> > > > >> > > > > change what you observe w.r.t. recovery behavior, though. >> > > Streams >> > > > >> > > already >> > > > >> > > > > sets the retention time on the changelog to equal the >> > > retention >> > > > >> time >> > > > >> > > of the >> > > > >> > > > > windows, for windowed aggregations, so you shouldn't be >> > > loading a >> > > > >> lot >> > > > >> > > of >> > > > >> > > > > window data for old windows you no longer care about. >> > > > >> > > > > >> > > > >> > > > > Have you set the "grace period" on your window >> definition? By >> > > > >> default, >> > > > >> > > it >> > > > >> > > > > is set to 24 hours, but you can set it as low as you >> like. >> > > E.g., >> > > > >> if you >> > > > >> > > > > want to commit to having in-order data only, then you >> can set >> > > the >> > > > >> grace >> > > > >> > > > > period to zero. This _should_ let the broker clean up the >> > > > >> changelog >> > > > >> > > records >> > > > >> > > > > as soon as the window ends. >> > > > >> > > > > >> > > > >> > > > > Of course, the log cleaner doesn't run all the time, so >> > > there's >> > > > >> some >> > > > >> > > extra >> > > > >> > > > > delay in which "expired" data would still be visible in >> the >> > > > >> changelog, >> > > > >> > > but >> > > > >> > > > > it would actually be just the same as if you manage the >> store >> > > > >> yourself. >> > > > >> > > > > >> > > > >> > > > > Hope this helps! >> > > > >> > > > > -John >> > > > >> > > > > >> > > > >> > > > > On Tue, Dec 3, 2019, at 22:22, Alessandro Tagliapietra >> wrote: >> > > > >> > > > > > Thanks John for the explanation, >> > > > >> > > > > > >> > > > >> > > > > > I thought that with EOS enabled (which we have) it >> would in >> > > the >> > > > >> worst >> > > > >> > > > > case >> > > > >> > > > > > find a valid checkpoint and start the restore from >> there >> > > until >> > > > >> it >> > > > >> > > reached >> > > > >> > > > > > the last committed status, not completely from >> scratch. What >> > > > >> you say >> > > > >> > > > > > definitely makes sense now. >> > > > >> > > > > > Since we don't really need old time windows and we >> ensure >> > > data >> > > > >> is >> > > > >> > > ordered >> > > > >> > > > > > when processed I think I"ll just write a custom >> transformer >> > > to >> > > > >> keep >> > > > >> > > only >> > > > >> > > > > > the last window, store intermediate aggregation >> results in >> > > the >> > > > >> store >> > > > >> > > and >> > > > >> > > > > > emit a new value only when we receive data belonging >> to a >> > > new >> > > > >> window. >> > > > >> > > > > > That with a compact only changelog topic should keep >> the >> > > rebuild >> > > > >> > > data to >> > > > >> > > > > > the minimum as it would have only the last value for >> each >> > > key. >> > > > >> > > > > > >> > > > >> > > > > > Hope that makes sense >> > > > >> > > > > > >> > > > >> > > > > > Thanks again >> > > > >> > > > > > >> > > > >> > > > > > -- >> > > > >> > > > > > Alessandro Tagliapietra >> > > > >> > > > > > >> > > > >> > > > > > >> > > > >> > > > > > On Tue, Dec 3, 2019 at 3:04 PM John Roesler < >> > > > >> vvcep...@apache.org> >> > > > >> > > wrote: >> > > > >> > > > > > >> > > > >> > > > > > > Hi Alessandro, >> > > > >> > > > > > > >> > > > >> > > > > > > To take a stab at your question, maybe it first >> doesn't >> > > find >> > > > >> it, >> > > > >> > > but >> > > > >> > > > > then >> > > > >> > > > > > > restores some data, writes the checkpoint, and then >> later >> > > on, >> > > > >> it >> > > > >> > > has to >> > > > >> > > > > > > re-initialize the task for some reason, and that's >> why it >> > > does >> > > > >> > > find a >> > > > >> > > > > > > checkpoint then? >> > > > >> > > > > > > >> > > > >> > > > > > > More to the heart of the issue, if you have EOS >> enabled, >> > > > >> Streams >> > > > >> > > _only_ >> > > > >> > > > > > > records the checkpoint when the store is in a >> > > known-consistent >> > > > >> > > state. >> > > > >> > > > > For >> > > > >> > > > > > > example, if you have a graceful shutdown, Streams >> will >> > > flush >> > > > >> all >> > > > >> > > the >> > > > >> > > > > > > stores, commit all the transactions, and then write >> the >> > > > >> checkpoint >> > > > >> > > > > file. >> > > > >> > > > > > > Then, on re-start, it will pick up from that >> checkpoint. >> > > > >> > > > > > > >> > > > >> > > > > > > But as soon as it starts processing records, it >> removes >> > > the >> > > > >> > > checkpoint >> > > > >> > > > > > > file, so if it crashes while it was processing, >> there is >> > > no >> > > > >> > > checkpoint >> > > > >> > > > > file >> > > > >> > > > > > > there, and it will have to restore from the >> beginning of >> > > the >> > > > >> > > changelog. >> > > > >> > > > > > > >> > > > >> > > > > > > This design is there on purpose, because otherwise we >> > > cannot >> > > > >> > > actually >> > > > >> > > > > > > guarantee correctness... For example, if you are >> > > maintaining a >> > > > >> > > count >> > > > >> > > > > > > operation, and we process an input record i, >> increment the >> > > > >> count >> > > > >> > > and >> > > > >> > > > > write >> > > > >> > > > > > > it to the state store, and to the changelog topic. >> But we >> > > > >> crash >> > > > >> > > before >> > > > >> > > > > we >> > > > >> > > > > > > commit that transaction. Then, the write to the >> changelog >> > > > >> would be >> > > > >> > > > > aborted, >> > > > >> > > > > > > and we would re-process record i . However, we've >> already >> > > > >> updated >> > > > >> > > the >> > > > >> > > > > local >> > > > >> > > > > > > state store, so when we increment it again, it >> results in >> > > > >> > > > > double-counting >> > > > >> > > > > > > i. The key point here is that there's no way to do an >> > > atomic >> > > > >> > > operation >> > > > >> > > > > > > across two different systems (local state store and >> the >> > > > >> changelog >> > > > >> > > > > topic). >> > > > >> > > > > > > Since we can't guarantee that we roll back the >> incremented >> > > > >> count >> > > > >> > > when >> > > > >> > > > > the >> > > > >> > > > > > > changelog transaction is aborted, we can't keep the >> local >> > > > >> store >> > > > >> > > > > consistent >> > > > >> > > > > > > with the changelog. >> > > > >> > > > > > > >> > > > >> > > > > > > After a crash, the only way to ensure the local >> store is >> > > > >> consistent >> > > > >> > > > > with >> > > > >> > > > > > > the changelog is to discard the entire thing and >> rebuild >> > > it. >> > > > >> This >> > > > >> > > is >> > > > >> > > > > why we >> > > > >> > > > > > > have an invariant that the checkpoint file only >> exists >> > > when we >> > > > >> > > _know_ >> > > > >> > > > > that >> > > > >> > > > > > > the local store is consistent with the changelog, and >> > > this is >> > > > >> why >> > > > >> > > > > you're >> > > > >> > > > > > > seeing so much bandwidth when re-starting from an >> unclean >> > > > >> shutdown. >> > > > >> > > > > > > >> > > > >> > > > > > > Note that it's definitely possible to do better than >> this, >> > > > >> and we >> > > > >> > > would >> > > > >> > > > > > > very much like to improve it in the future. >> > > > >> > > > > > > >> > > > >> > > > > > > Thanks, >> > > > >> > > > > > > -John >> > > > >> > > > > > > >> > > > >> > > > > > > On Tue, Dec 3, 2019, at 16:16, Alessandro >> Tagliapietra >> > > wrote: >> > > > >> > > > > > > > Hi John, >> > > > >> > > > > > > > >> > > > >> > > > > > > > thanks a lot for helping, regarding your message: >> > > > >> > > > > > > > - no we only have 1 instance of the stream >> application, >> > > > >> and it >> > > > >> > > > > always >> > > > >> > > > > > > > re-uses the same state folder >> > > > >> > > > > > > > - yes we're seeing most issues when restarting not >> > > > >> gracefully >> > > > >> > > due >> > > > >> > > > > > > exception >> > > > >> > > > > > > > >> > > > >> > > > > > > > I've enabled trace logging and filtering by a >> single >> > > state >> > > > >> store >> > > > >> > > the >> > > > >> > > > > > > > StoreChangelogReader messages are: >> > > > >> > > > > > > > >> > > > >> > > > > > > > Added restorer for changelog >> > > > >> > > > > sensors-stream-aggregate-store-changelog-0 >> > > > >> > > > > > > > Added restorer for changelog >> > > > >> > > > > sensors-stream-aggregate-store-changelog-1 >> > > > >> > > > > > > > Added restorer for changelog >> > > > >> > > > > sensors-stream-aggregate-store-changelog-2 >> > > > >> > > > > > > > Did not find checkpoint from changelog >> > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-2 for >> store >> > > > >> > > aggregate-store, >> > > > >> > > > > > > > rewinding to beginning. >> > > > >> > > > > > > > Did not find checkpoint from changelog >> > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-1 for >> store >> > > > >> > > aggregate-store, >> > > > >> > > > > > > > rewinding to beginning. >> > > > >> > > > > > > > Did not find checkpoint from changelog >> > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-0 for >> store >> > > > >> > > aggregate-store, >> > > > >> > > > > > > > rewinding to beginning. >> > > > >> > > > > > > > No checkpoint found for task 0_2 state store >> > > aggregate-store >> > > > >> > > > > changelog >> > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-2 with EOS >> > > turned >> > > > >> on. >> > > > >> > > > > > > > Reinitializing the task and restore its state from >> the >> > > > >> beginning. >> > > > >> > > > > > > > No checkpoint found for task 0_1 state store >> > > aggregate-store >> > > > >> > > > > changelog >> > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-1 with EOS >> > > turned >> > > > >> on. >> > > > >> > > > > > > > Reinitializing the task and restore its state from >> the >> > > > >> beginning. >> > > > >> > > > > > > > No checkpoint found for task 0_0 state store >> > > aggregate-store >> > > > >> > > > > changelog >> > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-0 with EOS >> > > turned >> > > > >> on. >> > > > >> > > > > > > > Reinitializing the task and restore its state from >> the >> > > > >> beginning. >> > > > >> > > > > > > > Found checkpoint 709937 from changelog >> > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-2 for >> store >> > > > >> > > aggregate-store. >> > > > >> > > > > > > > Restoring partition >> > > > >> sensors-stream-aggregate-store-changelog-2 >> > > > >> > > from >> > > > >> > > > > > > offset >> > > > >> > > > > > > > 709937 to endOffset 742799 >> > > > >> > > > > > > > Found checkpoint 3024234 from changelog >> > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-1 for >> store >> > > > >> > > aggregate-store. >> > > > >> > > > > > > > Restoring partition >> > > > >> sensors-stream-aggregate-store-changelog-1 >> > > > >> > > from >> > > > >> > > > > > > offset >> > > > >> > > > > > > > 3024234 to endOffset 3131513 >> > > > >> > > > > > > > Found checkpoint 14514072 from changelog >> > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-0 for >> store >> > > > >> > > aggregate-store. >> > > > >> > > > > > > > Restoring partition >> > > > >> sensors-stream-aggregate-store-changelog-0 >> > > > >> > > from >> > > > >> > > > > > > offset >> > > > >> > > > > > > > 14514072 to endOffset 17116574 >> > > > >> > > > > > > > Restored from >> > > sensors-stream-aggregate-store-changelog-2 to >> > > > >> > > > > > > aggregate-store >> > > > >> > > > > > > > with 966 records, ending offset is 711432, next >> starting >> > > > >> > > position is >> > > > >> > > > > > > 711434 >> > > > >> > > > > > > > Restored from >> > > sensors-stream-aggregate-store-changelog-2 to >> > > > >> > > > > > > aggregate-store >> > > > >> > > > > > > > with 914 records, ending offset is 712711, next >> starting >> > > > >> > > position is >> > > > >> > > > > > > 712713 >> > > > >> > > > > > > > Restored from >> > > sensors-stream-aggregate-store-changelog-1 to >> > > > >> > > > > > > aggregate-store >> > > > >> > > > > > > > with 18 records, ending offset is 3024261, next >> starting >> > > > >> > > position is >> > > > >> > > > > > > 3024262 >> > > > >> > > > > > > > >> > > > >> > > > > > > > >> > > > >> > > > > > > > why it first says it didn't find the checkpoint and >> > > then it >> > > > >> does >> > > > >> > > > > find it? >> > > > >> > > > > > > > It seems it loaded about 2.7M records (sum of >> offset >> > > > >> difference >> > > > >> > > in >> > > > >> > > > > the >> > > > >> > > > > > > > "restorting partition ...." messages) right? >> > > > >> > > > > > > > Maybe should I try to reduce the checkpoint >> interval? >> > > > >> > > > > > > > >> > > > >> > > > > > > > Regards >> > > > >> > > > > > > > >> > > > >> > > > > > > > -- >> > > > >> > > > > > > > Alessandro Tagliapietra >> > > > >> > > > > > > > >> > > > >> > > > > > > > >> > > > >> > > > > > > > On Mon, Dec 2, 2019 at 9:18 AM John Roesler < >> > > > >> vvcep...@apache.org >> > > > >> > > > >> > > > >> > > > > wrote: >> > > > >> > > > > > > > >> > > > >> > > > > > > > > Hi Alessandro, >> > > > >> > > > > > > > > >> > > > >> > > > > > > > > I'm sorry to hear that. >> > > > >> > > > > > > > > >> > > > >> > > > > > > > > The restore process only takes one factor into >> > > account: >> > > > >> the >> > > > >> > > current >> > > > >> > > > > > > offset >> > > > >> > > > > > > > > position of the changelog topic is stored in a >> local >> > > file >> > > > >> > > > > alongside the >> > > > >> > > > > > > > > state stores. On startup, the app checks if the >> > > recorded >> > > > >> > > position >> > > > >> > > > > lags >> > > > >> > > > > > > the >> > > > >> > > > > > > > > latest offset in the changelog. If so, then it >> reads >> > > the >> > > > >> > > missing >> > > > >> > > > > > > changelog >> > > > >> > > > > > > > > records before starting processing. >> > > > >> > > > > > > > > >> > > > >> > > > > > > > > Thus, it would not restore any old window data. >> > > > >> > > > > > > > > >> > > > >> > > > > > > > > There might be a few different things going on to >> > > explain >> > > > >> your >> > > > >> > > > > > > observation: >> > > > >> > > > > > > > > * if there is more than one instance in your >> Streams >> > > > >> cluster, >> > > > >> > > > > maybe the >> > > > >> > > > > > > > > task is "flopping" between instances, so each >> instance >> > > > >> still >> > > > >> > > has to >> > > > >> > > > > > > recover >> > > > >> > > > > > > > > state, since it wasn't the last one actively >> > > processing >> > > > >> it. >> > > > >> > > > > > > > > * if the application isn't stopped gracefully, it >> > > might >> > > > >> not >> > > > >> > > get a >> > > > >> > > > > > > chance >> > > > >> > > > > > > > > to record its offset in that local file, so on >> > > restart it >> > > > >> has >> > > > >> > > to >> > > > >> > > > > > > restore >> > > > >> > > > > > > > > some or all of the state store from changelog. >> > > > >> > > > > > > > > >> > > > >> > > > > > > > > Or it could be something else; that's just what >> comes >> > > to >> > > > >> mind. >> > > > >> > > > > > > > > >> > > > >> > > > > > > > > If you want to get to the bottom of it, you can >> take a >> > > > >> look at >> > > > >> > > the >> > > > >> > > > > > > logs, >> > > > >> > > > > > > > > paying close attention to which tasks are >> assigned to >> > > > >> which >> > > > >> > > > > instances >> > > > >> > > > > > > after >> > > > >> > > > > > > > > each restart. You can also look into the logs >> from >> > > > >> > > > > > > > > >> > > > >> > > >> > > `org.apache.kafka.streams.processor.internals.StoreChangelogReader` >> > > > >> > > > > > > (might >> > > > >> > > > > > > > > want to set it to DEBUG or TRACE level to really >> see >> > > > >> what's >> > > > >> > > > > happening). >> > > > >> > > > > > > > > >> > > > >> > > > > > > > > I hope this helps! >> > > > >> > > > > > > > > -John >> > > > >> > > > > > > > > >> > > > >> > > > > > > > > On Sun, Dec 1, 2019, at 21:25, Alessandro >> Tagliapietra >> > > > >> wrote: >> > > > >> > > > > > > > > > Hello everyone, >> > > > >> > > > > > > > > > >> > > > >> > > > > > > > > > we're having a problem with bandwidth usage on >> > > streams >> > > > >> > > > > application >> > > > >> > > > > > > > > startup, >> > > > >> > > > > > > > > > our current setup does this: >> > > > >> > > > > > > > > > >> > > > >> > > > > > > > > > ... >> > > > >> > > > > > > > > > .groupByKey() >> > > > >> > > > > > > > > > >> > > > >> > > >> .windowedBy<TimeWindow>(TimeWindows.of(Duration.ofMinutes(1))) >> > > > >> > > > > > > > > > .aggregate( >> > > > >> > > > > > > > > > { MetricSequenceList(ArrayList()) }, >> > > > >> > > > > > > > > > { key, value, aggregate -> >> > > > >> > > > > > > > > > aggregate.getRecords().add(value) >> > > > >> > > > > > > > > > aggregate >> > > > >> > > > > > > > > > }, >> > > > >> > > > > > > > > > Materialized.`as`<String, >> > > MetricSequenceList, >> > > > >> > > > > > > WindowStore<Bytes, >> > > > >> > > > > > > > > > >> > > > >> > > > > > > > > >> > > > >> > > > > > > >> > > > >> > > > > >> > > > >> > > >> > > > >> >> > > >> ByteArray>>("aggregate-store").withKeySerde(Serdes.String()).withValueSerde(Settings.getValueSpecificavroSerde()) >> > > > >> > > > > > > > > > ) >> > > > >> > > > > > > > > > .toStream() >> > > > >> > > > > > > > > > .flatTransform(TransformerSupplier { >> > > > >> > > > > > > > > > ... >> > > > >> > > > > > > > > > >> > > > >> > > > > > > > > > basically in each window we append the new >> values >> > > and >> > > > >> then do >> > > > >> > > > > some >> > > > >> > > > > > > other >> > > > >> > > > > > > > > > logic with the array of windowed values. >> > > > >> > > > > > > > > > The aggregate-store changelog topic >> configuration >> > > uses >> > > > >> > > > > > > compact,delete as >> > > > >> > > > > > > > > > cleanup policy and has 12 hours of retention. >> > > > >> > > > > > > > > > >> > > > >> > > > > > > > > > What we've seen is that on application startup >> it >> > > takes >> > > > >> a >> > > > >> > > couple >> > > > >> > > > > > > minutes >> > > > >> > > > > > > > > to >> > > > >> > > > > > > > > > rebuild the state store, even if the state >> store >> > > > >> directory is >> > > > >> > > > > > > persisted >> > > > >> > > > > > > > > > across restarts. That along with an exception >> that >> > > > >> caused the >> > > > >> > > > > docker >> > > > >> > > > > > > > > > container to be restarted a couple hundreds >> times >> > > > >> caused a >> > > > >> > > big >> > > > >> > > > > > > confluent >> > > > >> > > > > > > > > > cloud bill compared to what we usually spend >> (1/4 >> > > of a >> > > > >> full >> > > > >> > > > > month in >> > > > >> > > > > > > 1 >> > > > >> > > > > > > > > day). >> > > > >> > > > > > > > > > >> > > > >> > > > > > > > > > What I think is happening is that the topic is >> > > keeping >> > > > >> all >> > > > >> > > the >> > > > >> > > > > > > previous >> > > > >> > > > > > > > > > windows even with the compacting policy >> because each >> > > > >> key is >> > > > >> > > the >> > > > >> > > > > > > original >> > > > >> > > > > > > > > > key + the timestamp not just the key. Since we >> don't >> > > > >> care >> > > > >> > > about >> > > > >> > > > > > > previous >> > > > >> > > > > > > > > > windows as the flatTransform after the >> toStream() >> > > makes >> > > > >> sure >> > > > >> > > > > that we >> > > > >> > > > > > > > > don't >> > > > >> > > > > > > > > > process old windows (a custom suppressor >> basically) >> > > is >> > > > >> there >> > > > >> > > a >> > > > >> > > > > way to >> > > > >> > > > > > > > > only >> > > > >> > > > > > > > > > keep the last window so that the store >> rebuilding >> > > goes >> > > > >> > > faster and >> > > > >> > > > > > > without >> > > > >> > > > > > > > > > rebuilding old windows too? Or should I create >> a >> > > custom >> > > > >> > > window >> > > > >> > > > > using >> > > > >> > > > > > > the >> > > > >> > > > > > > > > > original key as key so that the compaction >> keeps >> > > only >> > > > >> the >> > > > >> > > last >> > > > >> > > > > window >> > > > >> > > > > > > > > data? >> > > > >> > > > > > > > > > >> > > > >> > > > > > > > > > Thank you >> > > > >> > > > > > > > > > >> > > > >> > > > > > > > > > -- >> > > > >> > > > > > > > > > Alessandro Tagliapietra >> > > > >> > > > > > > > > > >> > > > >> > > > > > > > > >> > > > >> > > > > > > > >> > > > >> > > > > > > >> > > > >> > > > > > >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > >> > > > >> > >> > > > >> >> > > > > >> > > > >> > > >> > >> >