Re: Reducing streams startup bandwidth usage

2019-12-14 Thread Alessandro Tagliapietra
Just an update, on staging with the new code the issue might have been on our side, I'm still not sure but I'm now monitoring the aggregation list size to confirm that incoming data, even without a change in rate, was the real issue. (this however wasn't happening in production and the cache

Re: Reducing streams startup bandwidth usage

2019-12-13 Thread Alessandro Tagliapietra
Hi Sophie, thanks for explaining that. So yeah it seems that since I'm using the default grace period of 24 hours, that's might cause the records to be sent to the changelog after ~24 hours. However, I'm switching the regular windowing system to the custom one, and while for the regular windows

Re: Reducing streams startup bandwidth usage

2019-12-12 Thread Sophie Blee-Goldman
Thanks for collecting all these metrics. It might be that as the length of the lists increases over time, the cache is able to hold fewer unique keys and eventually has to start evicting things. This would explain why the cache hit rate starts to decrease, and likely why latency starts to go up.

Re: Reducing streams startup bandwidth usage

2019-12-10 Thread Alessandro Tagliapietra
Just an update since it has been happening again now and I have some more metrics to show, the topology is this: Topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-00 (topics: [sensors]) --> KSTREAM-TRANSFORMVALUES-01 Processor: KSTREAM-TRANSFORMVALUES-01

Re: Reducing streams startup bandwidth usage

2019-12-09 Thread Alessandro Tagliapietra
You're saying that with a 100ms commit interval, caching won't help because it would still send the compacted changes to the changelog every 100ms? Regarding the custom state store I'll look into that because I didn't go much further than transformers and stores in my kafka experience so I'll

Re: Reducing streams startup bandwidth usage

2019-12-09 Thread Sophie Blee-Goldman
Alright, well I see why you have so much data being sent to the changelog if each update involves appending to a list and then writing in the whole list. And with 340 records/minute I'm actually not sure how the cache could really help at all when it's being flushed every 100ms. Here's kind of a

Re: Reducing streams startup bandwidth usage

2019-12-09 Thread Alessandro Tagliapietra
Hi Sophie, Just to give a better context, yes we use EOS and the problem happens in our aggregation store. Basically when windowing data we append each record into a list that's stored in the aggregation store. We have 2 versions, in production we use the kafka streams windowing API, in staging

Re: Reducing streams startup bandwidth usage

2019-12-09 Thread Sophie Blee-Goldman
It's an LRU cache, so once it gets full new records will cause older ones to be evicted (and thus sent downstream). Of course this should only apply to records of a different key, otherwise it will just cause an update of that key in the cache. I missed that you were using EOS, given the short

Re: Reducing streams startup bandwidth usage

2019-12-09 Thread Alessandro Tagliapietra
Hi Sophie, thanks fo helping. By eviction of older records you mean they get flushed to the changelog topic? Or the cache is just full and so all new records go to the changelog topic until the old ones are evicted? Regarding the timing, what timing do you mean? Between when the cache stops and

Re: Reducing streams startup bandwidth usage

2019-12-09 Thread Sophie Blee-Goldman
It might be that the cache appears to "stop working" because it gets full, and each new update causes an eviction (of some older record). This would also explain the opposite behavior, that it "starts working" again after some time without being restarted, since the cache is completely flushed on

Re: Reducing streams startup bandwidth usage

2019-12-09 Thread Alessandro Tagliapietra
And it seems that for some reason after a while caching works again without a restart of the streams application [image: Screen Shot 2019-12-08 at 11.59.30 PM.png] I'll try to enable debug metrics and see if I can find something useful there. Any idea is appreciated in the meantime :) --

Re: Reducing streams startup bandwidth usage

2019-12-08 Thread Alessandro Tagliapietra
It seems that even with caching enabled, after a while the sent bytes stil go up [image: Screen Shot 2019-12-08 at 12.52.31 PM.png] you can see the deploy when I've enabled caching but it looks like it's still a temporary solution. -- Alessandro Tagliapietra On Sat, Dec 7, 2019 at 10:08 AM

Re: Reducing streams startup bandwidth usage

2019-12-07 Thread Alessandro Tagliapietra
Could be, but since we have a limite amount of input keys (~30), windowing generates new keys but old ones are never touched again since the data per key is in order, I assume it shouldn't be a big deal for it to handle 30 keys I'll have a look at cache metrics and see if something pops out

Re: Reducing streams startup bandwidth usage

2019-12-07 Thread John Roesler
Hmm, that’s a good question. Now that we’re talking about caching, I wonder if the cache was just too small. It’s not very big by default. On Sat, Dec 7, 2019, at 11:16, Alessandro Tagliapietra wrote: > Ok I'll check on that! > > Now I can see that with caching we went from 3-4MB/s to 400KB/s,

Re: Reducing streams startup bandwidth usage

2019-12-07 Thread Alessandro Tagliapietra
Ok I'll check on that! Now I can see that with caching we went from 3-4MB/s to 400KB/s, that will help with the bill. Last question, any reason why after a while the regular windowed stream starts sending every update instead of caching? Could it be because it doesn't have any more memory

Re: Reducing streams startup bandwidth usage

2019-12-07 Thread Alessandro Tagliapietra
Never mind I've found out I can use `.withCachingEnabled` on the store builder to achieve the same thing as the windowing example as `Materialized.as` turns that on by default. Does caching in any way reduces the EOS guarantees? -- Alessandro Tagliapietra On Sat, Dec 7, 2019 at 1:12 AM

Re: Reducing streams startup bandwidth usage

2019-12-07 Thread Alessandro Tagliapietra
Seems my journey with this isn't done just yet, This seems very complicated to me but I'll try to explain it as best I can. To better understand the streams network usage I've used prometheus with the JMX exporter to export kafka metrics. To check the amount of data we use I'm looking at the

Re: Reducing streams startup bandwidth usage

2019-12-04 Thread John Roesler
Oh, good! On Tue, Dec 3, 2019, at 23:29, Alessandro Tagliapietra wrote: > Testing on staging shows that a restart on exception is much faster and the > stream starts right away which I think means we're reading way less data > than before! > > What I was referring to is that, in Streams, the

Re: Reducing streams startup bandwidth usage

2019-12-03 Thread Alessandro Tagliapietra
Testing on staging shows that a restart on exception is much faster and the stream starts right away which I think means we're reading way less data than before! What I was referring to is that, in Streams, the keys for window > aggregation state is actually composed of both the window itself and

Re: Reducing streams startup bandwidth usage

2019-12-03 Thread John Roesler
Oh, yeah, I remember that conversation! Yes, then, I agree, if you're only storing state of the most recent window for each key, and the key you use for that state is actually the key of the records, then an aggressive compaction policy plus your custom transformer seems like a good way

Re: Reducing streams startup bandwidth usage

2019-12-03 Thread Alessandro Tagliapietra
Hi John, afaik grace period uses stream time https://kafka.apache.org/21/javadoc/org/apache/kafka/streams/kstream/Windows.html which is per partition, unfortunately we process data that's not in sync between keys so each key needs to be independent and a key can have much older data than the

Re: Reducing streams startup bandwidth usage

2019-12-03 Thread John Roesler
Hey Alessandro, That sounds also like it would work. I'm wondering if it would actually change what you observe w.r.t. recovery behavior, though. Streams already sets the retention time on the changelog to equal the retention time of the windows, for windowed aggregations, so you shouldn't be

Re: Reducing streams startup bandwidth usage

2019-12-03 Thread Alessandro Tagliapietra
Thanks John for the explanation, I thought that with EOS enabled (which we have) it would in the worst case find a valid checkpoint and start the restore from there until it reached the last committed status, not completely from scratch. What you say definitely makes sense now. Since we don't

Re: Reducing streams startup bandwidth usage

2019-12-03 Thread John Roesler
Hi Alessandro, To take a stab at your question, maybe it first doesn't find it, but then restores some data, writes the checkpoint, and then later on, it has to re-initialize the task for some reason, and that's why it does find a checkpoint then? More to the heart of the issue, if you have

Re: Reducing streams startup bandwidth usage

2019-12-03 Thread Alessandro Tagliapietra
Hi John, thanks a lot for helping, regarding your message: - no we only have 1 instance of the stream application, and it always re-uses the same state folder - yes we're seeing most issues when restarting not gracefully due exception I've enabled trace logging and filtering by a single state

Re: Reducing streams startup bandwidth usage

2019-12-02 Thread John Roesler
Hi Alessandro, I'm sorry to hear that. The restore process only takes one factor into account: the current offset position of the changelog topic is stored in a local file alongside the state stores. On startup, the app checks if the recorded position lags the latest offset in the changelog.

Reducing streams startup bandwidth usage

2019-12-01 Thread Alessandro Tagliapietra
Hello everyone, we're having a problem with bandwidth usage on streams application startup, our current setup does this: ... .groupByKey() .windowedBy(TimeWindows.of(Duration.ofMinutes(1))) .aggregate( { MetricSequenceList(ArrayList()) }, { key, value, aggregate ->