It might be that the cache appears to "stop working" because it gets full,
and each
new update causes an eviction (of some older record). This would also
explain the
opposite behavior, that it "starts working" again after some time without
being restarted,
since the cache is completely flushed on commit. Does the timing seem to
align with your
commit interval (default is 30s)?

On Mon, Dec 9, 2019 at 12:03 AM Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:

> And it seems that for some reason after a while caching works again
> without a restart of the streams application
>
> [image: Screen Shot 2019-12-08 at 11.59.30 PM.png]
>
> I'll try to enable debug metrics and see if I can find something useful
> there.
> Any idea is appreciated in the meantime :)
>
> --
> Alessandro Tagliapietra
>
> On Sun, Dec 8, 2019 at 12:54 PM Alessandro Tagliapietra <
> tagliapietra.alessan...@gmail.com> wrote:
>
>> It seems that even with caching enabled, after a while the sent bytes
>> stil go up
>>
>> [image: Screen Shot 2019-12-08 at 12.52.31 PM.png]
>>
>> you can see the deploy when I've enabled caching but it looks like it's
>> still a temporary solution.
>>
>> --
>> Alessandro Tagliapietra
>>
>>
>> On Sat, Dec 7, 2019 at 10:08 AM Alessandro Tagliapietra <
>> tagliapietra.alessan...@gmail.com> wrote:
>>
>>> Could be, but since we have a limite amount of input keys (~30),
>>> windowing generates new keys but old ones are never touched again since the
>>> data per key is in order, I assume it shouldn't be a big deal for it to
>>> handle 30 keys
>>> I'll have a look at cache metrics and see if something pops out
>>>
>>> Thanks
>>>
>>> --
>>> Alessandro Tagliapietra
>>>
>>>
>>> On Sat, Dec 7, 2019 at 10:02 AM John Roesler <vvcep...@apache.org>
>>> wrote:
>>>
>>>> Hmm, that’s a good question. Now that we’re talking about caching, I
>>>> wonder if the cache was just too small. It’s not very big by default.
>>>>
>>>> On Sat, Dec 7, 2019, at 11:16, Alessandro Tagliapietra wrote:
>>>> > Ok I'll check on that!
>>>> >
>>>> > Now I can see that with caching we went from 3-4MB/s to 400KB/s, that
>>>> will
>>>> > help with the bill.
>>>> >
>>>> > Last question, any reason why after a while the regular windowed
>>>> stream
>>>> > starts sending every update instead of caching?
>>>> > Could it be because it doesn't have any more memory available? Any
>>>> other
>>>> > possible reason?
>>>> >
>>>> > Thank you so much for your help
>>>> >
>>>> > --
>>>> > Alessandro Tagliapietra
>>>> >
>>>> >
>>>> > On Sat, Dec 7, 2019 at 9:14 AM John Roesler <vvcep...@apache.org>
>>>> wrote:
>>>> >
>>>> > > Ah, yes. Glad you figured it out!
>>>> > >
>>>> > > Caching does not reduce EOS guarantees at all. I highly recommend
>>>> using
>>>> > > it. You might even want to take a look at the caching metrics to
>>>> make sure
>>>> > > you have a good hit ratio.
>>>> > >
>>>> > > -John
>>>> > >
>>>> > > On Sat, Dec 7, 2019, at 10:51, Alessandro Tagliapietra wrote:
>>>> > > > Never mind I've found out I can use `.withCachingEnabled` on the
>>>> store
>>>> > > > builder to achieve the same thing as the windowing example as
>>>> > > > `Materialized.as` turns that on by default.
>>>> > > >
>>>> > > > Does caching in any way reduces the EOS guarantees?
>>>> > > >
>>>> > > > --
>>>> > > > Alessandro Tagliapietra
>>>> > > >
>>>> > > >
>>>> > > > On Sat, Dec 7, 2019 at 1:12 AM Alessandro Tagliapietra <
>>>> > > > tagliapietra.alessan...@gmail.com> wrote:
>>>> > > >
>>>> > > > > Seems my journey with this isn't done just yet,
>>>> > > > >
>>>> > > > > This seems very complicated to me but I'll try to explain it as
>>>> best I
>>>> > > can.
>>>> > > > > To better understand the streams network usage I've used
>>>> prometheus
>>>> > > with
>>>> > > > > the JMX exporter to export kafka metrics.
>>>> > > > > To check the amount of data we use I'm looking at the increments
>>>> > > > > of kafka_producer_topic_metrics_byte_total and
>>>> > > > > kafka_producer_producer_topic_metrics_record_send_total,
>>>> > > > >
>>>> > > > > Our current (before the change mentioned above) code looks like
>>>> this:
>>>> > > > >
>>>> > > > > // This transformers just pairs a value with the previous one
>>>> storing
>>>> > > the
>>>> > > > > temporary one in a store
>>>> > > > > val pairsStream = metricStream
>>>> > > > >   .transformValues(ValueTransformerWithKeySupplier {
>>>> PairTransformer()
>>>> > > },
>>>> > > > > "LastValueStore")
>>>> > > > >   .filter { _, value: MetricSequence? -> value != null }
>>>> > > > >
>>>> > > > > // Create a store to store suppressed windows until a new one is
>>>> > > received
>>>> > > > > val suppressStoreSupplier =
>>>> > > > >
>>>> > >
>>>> Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("suppress-store"),
>>>> > > > > ......
>>>> > > > >
>>>> > > > > // Window and aggregate data in 1 minute intervals
>>>> > > > > val aggregatedStream = pairsStream
>>>> > > > >   .groupByKey()
>>>> > > > >   .windowedBy<TimeWindow>(TimeWindows.of(Duration.ofMinutes(1)))
>>>> > > > >   .aggregate(
>>>> > > > >           { MetricSequenceList(ArrayList()) },
>>>> > > > >           { key, value, aggregate ->
>>>> > > > >               aggregate.getRecords().add(value)
>>>> > > > >               aggregate
>>>> > > > >           },
>>>> > > > >           Materialized.`as`<String, MetricSequenceList,
>>>> > > WindowStore<Bytes,
>>>> > > > >
>>>> > >
>>>> ByteArray>>("aggregate-store").withKeySerde(Serdes.String()).withValueSerde(Settings.getValueSpecificavroSerde())
>>>> > > > >   )
>>>> > > > >   .toStream()
>>>> > > > >   .flatTransform(TransformerSupplier {
>>>> > > > >       // This transformer basically waits until a new window is
>>>> > > received
>>>> > > > > to emit the previous one
>>>> > > > >   }, "suppress-store")
>>>> > > > >   .map { sensorId: String, suppressedOutput: SuppressedOutput ->
>>>> > > > >       .... etc ....
>>>> > > > >
>>>> > > > >
>>>> > > > > Basically:
>>>> > > > >  - all data goes through LastValueStore store that stores each
>>>> message
>>>> > > and
>>>> > > > > emits a pair with the previous one
>>>> > > > >  - the aggregate-store is used to store the per-window list of
>>>> > > messages in
>>>> > > > > the aggregate method
>>>> > > > >  - the suppress store is used to store each received window
>>>> which is
>>>> > > > > emitted only after a newer one is received
>>>> > > > >
>>>> > > > > What I'm experiencing is that:
>>>> > > > >  - during normal execution, the streams app sends to the
>>>> lastvalue
>>>> > > store
>>>> > > > > changelog topic 5k messages/min, the aggregate and suppress
>>>> store
>>>> > > changelog
>>>> > > > > topics only about 100
>>>> > > > >  - at some point (after many hours of operation), the streams
>>>> app
>>>> > > starts
>>>> > > > > sending to the aggregate and suppress store changelog topic the
>>>> same
>>>> > > amount
>>>> > > > > of messages going through the lastvaluestore
>>>> > > > >  - if I restart the streams app it goes back to the initial
>>>> behavior
>>>> > > > >
>>>> > > > > You can see the behavior in this graph
>>>> https://imgur.com/dJcUNSf
>>>> > > > > You can also see that after a restart everything goes back to
>>>> normal
>>>> > > > > levels.
>>>> > > > > Regarding other metrics, process latency increases, poll latency
>>>> > > > > decreases, poll rate decreases, commit rate stays the same
>>>> while commit
>>>> > > > > latency increases.
>>>> > > > >
>>>> > > > > Now, I've these questions:
>>>> > > > >  - why isn't the aggregate/suppress store changelog topic
>>>> throughput
>>>> > > the
>>>> > > > > same as the LastValueStore? Shouldn't every time it aggregates
>>>> send a
>>>> > > > > record to the changelog?
>>>> > > > >  - is the windowing doing some internal caching like not
>>>> sending every
>>>> > > > > aggregation record until the window time is passed? (if so,
>>>> where can I
>>>> > > > > find that code since I would like to use that also for our new
>>>> > > > > implementation)
>>>> > > > >
>>>> > > > > Thank you in advance
>>>> > > > >
>>>> > > > > --
>>>> > > > > Alessandro Tagliapietra
>>>> > > > >
>>>> > > > >
>>>> > > > > On Wed, Dec 4, 2019 at 7:57 AM John Roesler <
>>>> vvcep...@apache.org>
>>>> > > wrote:
>>>> > > > >
>>>> > > > >> Oh, good!
>>>> > > > >>
>>>> > > > >> On Tue, Dec 3, 2019, at 23:29, Alessandro Tagliapietra wrote:
>>>> > > > >> > Testing on staging shows that a restart on exception is much
>>>> faster
>>>> > > and
>>>> > > > >> the
>>>> > > > >> > stream starts right away which I think means we're reading
>>>> way less
>>>> > > data
>>>> > > > >> > than before!
>>>> > > > >> >
>>>> > > > >> > What I was referring to is that, in Streams, the keys for
>>>> window
>>>> > > > >> > > aggregation state is actually composed of both the window
>>>> itself
>>>> > > and
>>>> > > > >> the
>>>> > > > >> > > key. In the DSL, it looks like "Windowed<K>". That results
>>>> in the
>>>> > > > >> store
>>>> > > > >> > > having a unique key per window for each K, which is why we
>>>> need
>>>> > > > >> retention
>>>> > > > >> > > as well as compaction for our changelogs. But for you, if
>>>> you just
>>>> > > > >> make the
>>>> > > > >> > > key "K", then compaction alone should do the trick.
>>>> > > > >> >
>>>> > > > >> > Yes we had compact,delete as cleanup policy but probably it
>>>> still
>>>> > > had a
>>>> > > > >> too
>>>> > > > >> > long retention value, also the rocksdb store is probably much
>>>> > > faster now
>>>> > > > >> > having only one key per key instead of one key per window
>>>> per key.
>>>> > > > >> >
>>>> > > > >> > Thanks a lot for helping! I'm now going to setup a
>>>> prometheus-jmx
>>>> > > > >> > monitoring so we can keep better track of what's going on :)
>>>> > > > >> >
>>>> > > > >> > --
>>>> > > > >> > Alessandro Tagliapietra
>>>> > > > >> >
>>>> > > > >> >
>>>> > > > >> > On Tue, Dec 3, 2019 at 9:12 PM John Roesler <
>>>> vvcep...@apache.org>
>>>> > > > >> wrote:
>>>> > > > >> >
>>>> > > > >> > > Oh, yeah, I remember that conversation!
>>>> > > > >> > >
>>>> > > > >> > > Yes, then, I agree, if you're only storing state of the
>>>> most
>>>> > > recent
>>>> > > > >> window
>>>> > > > >> > > for each key, and the key you use for that state is
>>>> actually the
>>>> > > key
>>>> > > > >> of the
>>>> > > > >> > > records, then an aggressive compaction policy plus your
>>>> custom
>>>> > > > >> transformer
>>>> > > > >> > > seems like a good way forward.
>>>> > > > >> > >
>>>> > > > >> > > What I was referring to is that, in Streams, the keys for
>>>> window
>>>> > > > >> > > aggregation state is actually composed of both the window
>>>> itself
>>>> > > and
>>>> > > > >> the
>>>> > > > >> > > key. In the DSL, it looks like "Windowed<K>". That results
>>>> in the
>>>> > > > >> store
>>>> > > > >> > > having a unique key per window for each K, which is why we
>>>> need
>>>> > > > >> retention
>>>> > > > >> > > as well as compaction for our changelogs. But for you, if
>>>> you just
>>>> > > > >> make the
>>>> > > > >> > > key "K", then compaction alone should do the trick.
>>>> > > > >> > >
>>>> > > > >> > > And yes, if you manage the topic yourself, then Streams
>>>> won't
>>>> > > adjust
>>>> > > > >> the
>>>> > > > >> > > retention time. I think it might validate that the
>>>> retention
>>>> > > isn't too
>>>> > > > >> > > short, but I don't remember offhand.
>>>> > > > >> > >
>>>> > > > >> > > Cheers, and let me know how it goes!
>>>> > > > >> > > -John
>>>> > > > >> > >
>>>> > > > >> > > On Tue, Dec 3, 2019, at 23:03, Alessandro Tagliapietra
>>>> wrote:
>>>> > > > >> > > > Hi John,
>>>> > > > >> > > >
>>>> > > > >> > > > afaik grace period uses stream time
>>>> > > > >> > > >
>>>> > > > >> > >
>>>> > > > >>
>>>> > >
>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/streams/kstream/Windows.html
>>>> > > > >> > > > which is
>>>> > > > >> > > > per partition, unfortunately we process data that's not
>>>> in sync
>>>> > > > >> between
>>>> > > > >> > > > keys so each key needs to be independent and a key can
>>>> have much
>>>> > > > >> older
>>>> > > > >> > > > data
>>>> > > > >> > > > than the other.
>>>> > > > >> > > >
>>>> > > > >> > > > Having a small grace period would probably close old
>>>> windows
>>>> > > sooner
>>>> > > > >> than
>>>> > > > >> > > > expected. That's also why in my use case a custom store
>>>> that
>>>> > > just
>>>> > > > >> stores
>>>> > > > >> > > > the last window data for each key might work better. I
>>>> had the
>>>> > > same
>>>> > > > >> issue
>>>> > > > >> > > > with suppression and it has been reported here
>>>> > > > >> > > > https://issues.apache.org/jira/browse/KAFKA-8769
>>>> > > > >> > > > Oh I just saw that you're the one that helped me on
>>>> slack and
>>>> > > > >> created the
>>>> > > > >> > > > issue (thanks again for that).
>>>> > > > >> > > >
>>>> > > > >> > > > The behavior that you mention about streams setting
>>>> changelog
>>>> > > > >> retention
>>>> > > > >> > > > time is something they do on creation of the topic when
>>>> the
>>>> > > broker
>>>> > > > >> has
>>>> > > > >> > > auto
>>>> > > > >> > > > creation enabled? Because we're using confluent cloud
>>>> and I had
>>>> > > to
>>>> > > > >> create
>>>> > > > >> > > > it manually.
>>>> > > > >> > > > Regarding the change in the recovery behavior, with
>>>> compact
>>>> > > cleanup
>>>> > > > >> > > policy
>>>> > > > >> > > > shouldn't the changelog only keep the last value? That
>>>> would
>>>> > > make
>>>> > > > >> the
>>>> > > > >> > > > recovery faster and "cheaper" as it would only need to
>>>> read a
>>>> > > single
>>>> > > > >> > > value
>>>> > > > >> > > > per key (if the cleanup just happened) right?
>>>> > > > >> > > >
>>>> > > > >> > > > --
>>>> > > > >> > > > Alessandro Tagliapietra
>>>> > > > >> > > >
>>>> > > > >> > > >
>>>> > > > >> > > > On Tue, Dec 3, 2019 at 8:51 PM John Roesler <
>>>> > > vvcep...@apache.org>
>>>> > > > >> wrote:
>>>> > > > >> > > >
>>>> > > > >> > > > > Hey Alessandro,
>>>> > > > >> > > > >
>>>> > > > >> > > > > That sounds also like it would work. I'm wondering if
>>>> it would
>>>> > > > >> actually
>>>> > > > >> > > > > change what you observe w.r.t. recovery behavior,
>>>> though.
>>>> > > Streams
>>>> > > > >> > > already
>>>> > > > >> > > > > sets the retention time on the changelog to equal the
>>>> > > retention
>>>> > > > >> time
>>>> > > > >> > > of the
>>>> > > > >> > > > > windows, for windowed aggregations, so you shouldn't be
>>>> > > loading a
>>>> > > > >> lot
>>>> > > > >> > > of
>>>> > > > >> > > > > window data for old windows you no longer care about.
>>>> > > > >> > > > >
>>>> > > > >> > > > > Have you set the "grace period" on your window
>>>> definition? By
>>>> > > > >> default,
>>>> > > > >> > > it
>>>> > > > >> > > > > is set to 24 hours, but you can set it as low as you
>>>> like.
>>>> > > E.g.,
>>>> > > > >> if you
>>>> > > > >> > > > > want to commit to having in-order data only, then you
>>>> can set
>>>> > > the
>>>> > > > >> grace
>>>> > > > >> > > > > period to zero. This _should_ let the broker clean up
>>>> the
>>>> > > > >> changelog
>>>> > > > >> > > records
>>>> > > > >> > > > > as soon as the window ends.
>>>> > > > >> > > > >
>>>> > > > >> > > > > Of course, the log cleaner doesn't run all the time, so
>>>> > > there's
>>>> > > > >> some
>>>> > > > >> > > extra
>>>> > > > >> > > > > delay in which "expired" data would still be visible
>>>> in the
>>>> > > > >> changelog,
>>>> > > > >> > > but
>>>> > > > >> > > > > it would actually be just the same as if you manage
>>>> the store
>>>> > > > >> yourself.
>>>> > > > >> > > > >
>>>> > > > >> > > > > Hope this helps!
>>>> > > > >> > > > > -John
>>>> > > > >> > > > >
>>>> > > > >> > > > > On Tue, Dec 3, 2019, at 22:22, Alessandro Tagliapietra
>>>> wrote:
>>>> > > > >> > > > > > Thanks John for the explanation,
>>>> > > > >> > > > > >
>>>> > > > >> > > > > > I thought that with EOS enabled (which we have) it
>>>> would in
>>>> > > the
>>>> > > > >> worst
>>>> > > > >> > > > > case
>>>> > > > >> > > > > > find a valid checkpoint and start the restore from
>>>> there
>>>> > > until
>>>> > > > >> it
>>>> > > > >> > > reached
>>>> > > > >> > > > > > the last committed status, not completely from
>>>> scratch. What
>>>> > > > >> you say
>>>> > > > >> > > > > > definitely makes sense now.
>>>> > > > >> > > > > > Since we don't really need old time windows and we
>>>> ensure
>>>> > > data
>>>> > > > >> is
>>>> > > > >> > > ordered
>>>> > > > >> > > > > > when processed I think I"ll just write a custom
>>>> transformer
>>>> > > to
>>>> > > > >> keep
>>>> > > > >> > > only
>>>> > > > >> > > > > > the last window, store intermediate aggregation
>>>> results in
>>>> > > the
>>>> > > > >> store
>>>> > > > >> > > and
>>>> > > > >> > > > > > emit a new value only when we receive data belonging
>>>> to a
>>>> > > new
>>>> > > > >> window.
>>>> > > > >> > > > > > That with a compact only changelog topic should keep
>>>> the
>>>> > > rebuild
>>>> > > > >> > > data to
>>>> > > > >> > > > > > the minimum as it would have only the last value for
>>>> each
>>>> > > key.
>>>> > > > >> > > > > >
>>>> > > > >> > > > > > Hope that makes sense
>>>> > > > >> > > > > >
>>>> > > > >> > > > > > Thanks again
>>>> > > > >> > > > > >
>>>> > > > >> > > > > > --
>>>> > > > >> > > > > > Alessandro Tagliapietra
>>>> > > > >> > > > > >
>>>> > > > >> > > > > >
>>>> > > > >> > > > > > On Tue, Dec 3, 2019 at 3:04 PM John Roesler <
>>>> > > > >> vvcep...@apache.org>
>>>> > > > >> > > wrote:
>>>> > > > >> > > > > >
>>>> > > > >> > > > > > > Hi Alessandro,
>>>> > > > >> > > > > > >
>>>> > > > >> > > > > > > To take a stab at your question, maybe it first
>>>> doesn't
>>>> > > find
>>>> > > > >> it,
>>>> > > > >> > > but
>>>> > > > >> > > > > then
>>>> > > > >> > > > > > > restores some data, writes the checkpoint, and
>>>> then later
>>>> > > on,
>>>> > > > >> it
>>>> > > > >> > > has to
>>>> > > > >> > > > > > > re-initialize the task for some reason, and that's
>>>> why it
>>>> > > does
>>>> > > > >> > > find a
>>>> > > > >> > > > > > > checkpoint then?
>>>> > > > >> > > > > > >
>>>> > > > >> > > > > > > More to the heart of the issue, if you have EOS
>>>> enabled,
>>>> > > > >> Streams
>>>> > > > >> > > _only_
>>>> > > > >> > > > > > > records the checkpoint when the store is in a
>>>> > > known-consistent
>>>> > > > >> > > state.
>>>> > > > >> > > > > For
>>>> > > > >> > > > > > > example, if you have a graceful shutdown, Streams
>>>> will
>>>> > > flush
>>>> > > > >> all
>>>> > > > >> > > the
>>>> > > > >> > > > > > > stores, commit all the transactions, and then
>>>> write the
>>>> > > > >> checkpoint
>>>> > > > >> > > > > file.
>>>> > > > >> > > > > > > Then, on re-start, it will pick up from that
>>>> checkpoint.
>>>> > > > >> > > > > > >
>>>> > > > >> > > > > > > But as soon as it starts processing records, it
>>>> removes
>>>> > > the
>>>> > > > >> > > checkpoint
>>>> > > > >> > > > > > > file, so if it crashes while it was processing,
>>>> there is
>>>> > > no
>>>> > > > >> > > checkpoint
>>>> > > > >> > > > > file
>>>> > > > >> > > > > > > there, and it will have to restore from the
>>>> beginning of
>>>> > > the
>>>> > > > >> > > changelog.
>>>> > > > >> > > > > > >
>>>> > > > >> > > > > > > This design is there on purpose, because otherwise
>>>> we
>>>> > > cannot
>>>> > > > >> > > actually
>>>> > > > >> > > > > > > guarantee correctness... For example, if you are
>>>> > > maintaining a
>>>> > > > >> > > count
>>>> > > > >> > > > > > > operation, and we process an input record i,
>>>> increment the
>>>> > > > >> count
>>>> > > > >> > > and
>>>> > > > >> > > > > write
>>>> > > > >> > > > > > > it to the state store, and to the changelog topic.
>>>> But we
>>>> > > > >> crash
>>>> > > > >> > > before
>>>> > > > >> > > > > we
>>>> > > > >> > > > > > > commit that transaction. Then, the write to the
>>>> changelog
>>>> > > > >> would be
>>>> > > > >> > > > > aborted,
>>>> > > > >> > > > > > > and we would re-process record i . However, we've
>>>> already
>>>> > > > >> updated
>>>> > > > >> > > the
>>>> > > > >> > > > > local
>>>> > > > >> > > > > > > state store, so when we increment it again, it
>>>> results in
>>>> > > > >> > > > > double-counting
>>>> > > > >> > > > > > > i. The key point here is that there's no way to do
>>>> an
>>>> > > atomic
>>>> > > > >> > > operation
>>>> > > > >> > > > > > > across two different systems (local state store
>>>> and the
>>>> > > > >> changelog
>>>> > > > >> > > > > topic).
>>>> > > > >> > > > > > > Since we can't guarantee that we roll back the
>>>> incremented
>>>> > > > >> count
>>>> > > > >> > > when
>>>> > > > >> > > > > the
>>>> > > > >> > > > > > > changelog transaction is aborted, we can't keep
>>>> the local
>>>> > > > >> store
>>>> > > > >> > > > > consistent
>>>> > > > >> > > > > > > with the changelog.
>>>> > > > >> > > > > > >
>>>> > > > >> > > > > > > After a crash, the only way to ensure the local
>>>> store is
>>>> > > > >> consistent
>>>> > > > >> > > > > with
>>>> > > > >> > > > > > > the changelog is to discard the entire thing and
>>>> rebuild
>>>> > > it.
>>>> > > > >> This
>>>> > > > >> > > is
>>>> > > > >> > > > > why we
>>>> > > > >> > > > > > > have an invariant that the checkpoint file only
>>>> exists
>>>> > > when we
>>>> > > > >> > > _know_
>>>> > > > >> > > > > that
>>>> > > > >> > > > > > > the local store is consistent with the changelog,
>>>> and
>>>> > > this is
>>>> > > > >> why
>>>> > > > >> > > > > you're
>>>> > > > >> > > > > > > seeing so much bandwidth when re-starting from an
>>>> unclean
>>>> > > > >> shutdown.
>>>> > > > >> > > > > > >
>>>> > > > >> > > > > > > Note that it's definitely possible to do better
>>>> than this,
>>>> > > > >> and we
>>>> > > > >> > > would
>>>> > > > >> > > > > > > very much like to improve it in the future.
>>>> > > > >> > > > > > >
>>>> > > > >> > > > > > > Thanks,
>>>> > > > >> > > > > > > -John
>>>> > > > >> > > > > > >
>>>> > > > >> > > > > > > On Tue, Dec 3, 2019, at 16:16, Alessandro
>>>> Tagliapietra
>>>> > > wrote:
>>>> > > > >> > > > > > > > Hi John,
>>>> > > > >> > > > > > > >
>>>> > > > >> > > > > > > > thanks a lot for helping, regarding your message:
>>>> > > > >> > > > > > > >  - no we only have 1 instance of the stream
>>>> application,
>>>> > > > >> and it
>>>> > > > >> > > > > always
>>>> > > > >> > > > > > > > re-uses the same state folder
>>>> > > > >> > > > > > > >  - yes we're seeing most issues when restarting
>>>> not
>>>> > > > >> gracefully
>>>> > > > >> > > due
>>>> > > > >> > > > > > > exception
>>>> > > > >> > > > > > > >
>>>> > > > >> > > > > > > > I've enabled trace logging and filtering by a
>>>> single
>>>> > > state
>>>> > > > >> store
>>>> > > > >> > > the
>>>> > > > >> > > > > > > > StoreChangelogReader messages are:
>>>> > > > >> > > > > > > >
>>>> > > > >> > > > > > > > Added restorer for changelog
>>>> > > > >> > > > > sensors-stream-aggregate-store-changelog-0
>>>> > > > >> > > > > > > > Added restorer for changelog
>>>> > > > >> > > > > sensors-stream-aggregate-store-changelog-1
>>>> > > > >> > > > > > > > Added restorer for changelog
>>>> > > > >> > > > > sensors-stream-aggregate-store-changelog-2
>>>> > > > >> > > > > > > > Did not find checkpoint from changelog
>>>> > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-2 for
>>>> store
>>>> > > > >> > > aggregate-store,
>>>> > > > >> > > > > > > > rewinding to beginning.
>>>> > > > >> > > > > > > > Did not find checkpoint from changelog
>>>> > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-1 for
>>>> store
>>>> > > > >> > > aggregate-store,
>>>> > > > >> > > > > > > > rewinding to beginning.
>>>> > > > >> > > > > > > > Did not find checkpoint from changelog
>>>> > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-0 for
>>>> store
>>>> > > > >> > > aggregate-store,
>>>> > > > >> > > > > > > > rewinding to beginning.
>>>> > > > >> > > > > > > > No checkpoint found for task 0_2 state store
>>>> > > aggregate-store
>>>> > > > >> > > > > changelog
>>>> > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-2 with
>>>> EOS
>>>> > > turned
>>>> > > > >> on.
>>>> > > > >> > > > > > > > Reinitializing the task and restore its state
>>>> from the
>>>> > > > >> beginning.
>>>> > > > >> > > > > > > > No checkpoint found for task 0_1 state store
>>>> > > aggregate-store
>>>> > > > >> > > > > changelog
>>>> > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-1 with
>>>> EOS
>>>> > > turned
>>>> > > > >> on.
>>>> > > > >> > > > > > > > Reinitializing the task and restore its state
>>>> from the
>>>> > > > >> beginning.
>>>> > > > >> > > > > > > > No checkpoint found for task 0_0 state store
>>>> > > aggregate-store
>>>> > > > >> > > > > changelog
>>>> > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-0 with
>>>> EOS
>>>> > > turned
>>>> > > > >> on.
>>>> > > > >> > > > > > > > Reinitializing the task and restore its state
>>>> from the
>>>> > > > >> beginning.
>>>> > > > >> > > > > > > > Found checkpoint 709937 from changelog
>>>> > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-2 for
>>>> store
>>>> > > > >> > > aggregate-store.
>>>> > > > >> > > > > > > > Restoring partition
>>>> > > > >> sensors-stream-aggregate-store-changelog-2
>>>> > > > >> > > from
>>>> > > > >> > > > > > > offset
>>>> > > > >> > > > > > > > 709937 to endOffset 742799
>>>> > > > >> > > > > > > > Found checkpoint 3024234 from changelog
>>>> > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-1 for
>>>> store
>>>> > > > >> > > aggregate-store.
>>>> > > > >> > > > > > > > Restoring partition
>>>> > > > >> sensors-stream-aggregate-store-changelog-1
>>>> > > > >> > > from
>>>> > > > >> > > > > > > offset
>>>> > > > >> > > > > > > > 3024234 to endOffset 3131513
>>>> > > > >> > > > > > > > Found checkpoint 14514072 from changelog
>>>> > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-0 for
>>>> store
>>>> > > > >> > > aggregate-store.
>>>> > > > >> > > > > > > > Restoring partition
>>>> > > > >> sensors-stream-aggregate-store-changelog-0
>>>> > > > >> > > from
>>>> > > > >> > > > > > > offset
>>>> > > > >> > > > > > > > 14514072 to endOffset 17116574
>>>> > > > >> > > > > > > > Restored from
>>>> > > sensors-stream-aggregate-store-changelog-2 to
>>>> > > > >> > > > > > > aggregate-store
>>>> > > > >> > > > > > > > with 966 records, ending offset is 711432, next
>>>> starting
>>>> > > > >> > > position is
>>>> > > > >> > > > > > > 711434
>>>> > > > >> > > > > > > > Restored from
>>>> > > sensors-stream-aggregate-store-changelog-2 to
>>>> > > > >> > > > > > > aggregate-store
>>>> > > > >> > > > > > > > with 914 records, ending offset is 712711, next
>>>> starting
>>>> > > > >> > > position is
>>>> > > > >> > > > > > > 712713
>>>> > > > >> > > > > > > > Restored from
>>>> > > sensors-stream-aggregate-store-changelog-1 to
>>>> > > > >> > > > > > > aggregate-store
>>>> > > > >> > > > > > > > with 18 records, ending offset is 3024261, next
>>>> starting
>>>> > > > >> > > position is
>>>> > > > >> > > > > > > 3024262
>>>> > > > >> > > > > > > >
>>>> > > > >> > > > > > > >
>>>> > > > >> > > > > > > > why it first says it didn't find the checkpoint
>>>> and
>>>> > > then it
>>>> > > > >> does
>>>> > > > >> > > > > find it?
>>>> > > > >> > > > > > > > It seems it loaded about  2.7M records (sum of
>>>> offset
>>>> > > > >> difference
>>>> > > > >> > > in
>>>> > > > >> > > > > the
>>>> > > > >> > > > > > > > "restorting partition ...." messages) right?
>>>> > > > >> > > > > > > > Maybe should I try to reduce the checkpoint
>>>> interval?
>>>> > > > >> > > > > > > >
>>>> > > > >> > > > > > > > Regards
>>>> > > > >> > > > > > > >
>>>> > > > >> > > > > > > > --
>>>> > > > >> > > > > > > > Alessandro Tagliapietra
>>>> > > > >> > > > > > > >
>>>> > > > >> > > > > > > >
>>>> > > > >> > > > > > > > On Mon, Dec 2, 2019 at 9:18 AM John Roesler <
>>>> > > > >> vvcep...@apache.org
>>>> > > > >> > > >
>>>> > > > >> > > > > wrote:
>>>> > > > >> > > > > > > >
>>>> > > > >> > > > > > > > > Hi Alessandro,
>>>> > > > >> > > > > > > > >
>>>> > > > >> > > > > > > > > I'm sorry to hear that.
>>>> > > > >> > > > > > > > >
>>>> > > > >> > > > > > > > > The restore process only takes one factor into
>>>> > > account:
>>>> > > > >> the
>>>> > > > >> > > current
>>>> > > > >> > > > > > > offset
>>>> > > > >> > > > > > > > > position of the changelog topic is stored in a
>>>> local
>>>> > > file
>>>> > > > >> > > > > alongside the
>>>> > > > >> > > > > > > > > state stores. On startup, the app checks if the
>>>> > > recorded
>>>> > > > >> > > position
>>>> > > > >> > > > > lags
>>>> > > > >> > > > > > > the
>>>> > > > >> > > > > > > > > latest offset in the changelog. If so, then it
>>>> reads
>>>> > > the
>>>> > > > >> > > missing
>>>> > > > >> > > > > > > changelog
>>>> > > > >> > > > > > > > > records before starting processing.
>>>> > > > >> > > > > > > > >
>>>> > > > >> > > > > > > > > Thus, it would not restore any old window data.
>>>> > > > >> > > > > > > > >
>>>> > > > >> > > > > > > > > There might be a few different things going on
>>>> to
>>>> > > explain
>>>> > > > >> your
>>>> > > > >> > > > > > > observation:
>>>> > > > >> > > > > > > > > * if there is more than one instance in your
>>>> Streams
>>>> > > > >> cluster,
>>>> > > > >> > > > > maybe the
>>>> > > > >> > > > > > > > > task is "flopping" between instances, so each
>>>> instance
>>>> > > > >> still
>>>> > > > >> > > has to
>>>> > > > >> > > > > > > recover
>>>> > > > >> > > > > > > > > state, since it wasn't the last one actively
>>>> > > processing
>>>> > > > >> it.
>>>> > > > >> > > > > > > > > * if the application isn't stopped gracefully,
>>>> it
>>>> > > might
>>>> > > > >> not
>>>> > > > >> > > get a
>>>> > > > >> > > > > > > chance
>>>> > > > >> > > > > > > > > to record its offset in that local file, so on
>>>> > > restart it
>>>> > > > >> has
>>>> > > > >> > > to
>>>> > > > >> > > > > > > restore
>>>> > > > >> > > > > > > > > some or all of the state store from changelog.
>>>> > > > >> > > > > > > > >
>>>> > > > >> > > > > > > > > Or it could be something else; that's just
>>>> what comes
>>>> > > to
>>>> > > > >> mind.
>>>> > > > >> > > > > > > > >
>>>> > > > >> > > > > > > > > If you want to get to the bottom of it, you
>>>> can take a
>>>> > > > >> look at
>>>> > > > >> > > the
>>>> > > > >> > > > > > > logs,
>>>> > > > >> > > > > > > > > paying close attention to which tasks are
>>>> assigned to
>>>> > > > >> which
>>>> > > > >> > > > > instances
>>>> > > > >> > > > > > > after
>>>> > > > >> > > > > > > > > each restart. You can also look into the logs
>>>> from
>>>> > > > >> > > > > > > > >
>>>> > > > >> > >
>>>> > > `org.apache.kafka.streams.processor.internals.StoreChangelogReader`
>>>> > > > >> > > > > > > (might
>>>> > > > >> > > > > > > > > want to set it to DEBUG or TRACE level to
>>>> really see
>>>> > > > >> what's
>>>> > > > >> > > > > happening).
>>>> > > > >> > > > > > > > >
>>>> > > > >> > > > > > > > > I hope this helps!
>>>> > > > >> > > > > > > > > -John
>>>> > > > >> > > > > > > > >
>>>> > > > >> > > > > > > > > On Sun, Dec 1, 2019, at 21:25, Alessandro
>>>> Tagliapietra
>>>> > > > >> wrote:
>>>> > > > >> > > > > > > > > > Hello everyone,
>>>> > > > >> > > > > > > > > >
>>>> > > > >> > > > > > > > > > we're having a problem with bandwidth usage
>>>> on
>>>> > > streams
>>>> > > > >> > > > > application
>>>> > > > >> > > > > > > > > startup,
>>>> > > > >> > > > > > > > > > our current setup does this:
>>>> > > > >> > > > > > > > > >
>>>> > > > >> > > > > > > > > > ...
>>>> > > > >> > > > > > > > > > .groupByKey()
>>>> > > > >> > > > > > > > > >
>>>> > > > >> > >
>>>> .windowedBy<TimeWindow>(TimeWindows.of(Duration.ofMinutes(1)))
>>>> > > > >> > > > > > > > > > .aggregate(
>>>> > > > >> > > > > > > > > >         { MetricSequenceList(ArrayList()) },
>>>> > > > >> > > > > > > > > >         { key, value, aggregate ->
>>>> > > > >> > > > > > > > > >             aggregate.getRecords().add(value)
>>>> > > > >> > > > > > > > > >             aggregate
>>>> > > > >> > > > > > > > > >         },
>>>> > > > >> > > > > > > > > >         Materialized.`as`<String,
>>>> > > MetricSequenceList,
>>>> > > > >> > > > > > > WindowStore<Bytes,
>>>> > > > >> > > > > > > > > >
>>>> > > > >> > > > > > > > >
>>>> > > > >> > > > > > >
>>>> > > > >> > > > >
>>>> > > > >> > >
>>>> > > > >>
>>>> > >
>>>> ByteArray>>("aggregate-store").withKeySerde(Serdes.String()).withValueSerde(Settings.getValueSpecificavroSerde())
>>>> > > > >> > > > > > > > > > )
>>>> > > > >> > > > > > > > > > .toStream()
>>>> > > > >> > > > > > > > > > .flatTransform(TransformerSupplier {
>>>> > > > >> > > > > > > > > > ...
>>>> > > > >> > > > > > > > > >
>>>> > > > >> > > > > > > > > > basically in each window we append the new
>>>> values
>>>> > > and
>>>> > > > >> then do
>>>> > > > >> > > > > some
>>>> > > > >> > > > > > > other
>>>> > > > >> > > > > > > > > > logic with the array of windowed values.
>>>> > > > >> > > > > > > > > > The aggregate-store changelog topic
>>>> configuration
>>>> > > uses
>>>> > > > >> > > > > > > compact,delete as
>>>> > > > >> > > > > > > > > > cleanup policy and has 12 hours of retention.
>>>> > > > >> > > > > > > > > >
>>>> > > > >> > > > > > > > > > What we've seen is that on application
>>>> startup it
>>>> > > takes
>>>> > > > >> a
>>>> > > > >> > > couple
>>>> > > > >> > > > > > > minutes
>>>> > > > >> > > > > > > > > to
>>>> > > > >> > > > > > > > > > rebuild the state store, even if the state
>>>> store
>>>> > > > >> directory is
>>>> > > > >> > > > > > > persisted
>>>> > > > >> > > > > > > > > > across restarts. That along with an
>>>> exception that
>>>> > > > >> caused the
>>>> > > > >> > > > > docker
>>>> > > > >> > > > > > > > > > container to be restarted a couple hundreds
>>>> times
>>>> > > > >> caused a
>>>> > > > >> > > big
>>>> > > > >> > > > > > > confluent
>>>> > > > >> > > > > > > > > > cloud bill compared to what we usually spend
>>>> (1/4
>>>> > > of a
>>>> > > > >> full
>>>> > > > >> > > > > month in
>>>> > > > >> > > > > > > 1
>>>> > > > >> > > > > > > > > day).
>>>> > > > >> > > > > > > > > >
>>>> > > > >> > > > > > > > > > What I think is happening is that the topic
>>>> is
>>>> > > keeping
>>>> > > > >> all
>>>> > > > >> > > the
>>>> > > > >> > > > > > > previous
>>>> > > > >> > > > > > > > > > windows even with the compacting policy
>>>> because each
>>>> > > > >> key is
>>>> > > > >> > > the
>>>> > > > >> > > > > > > original
>>>> > > > >> > > > > > > > > > key + the timestamp not just the key. Since
>>>> we don't
>>>> > > > >> care
>>>> > > > >> > > about
>>>> > > > >> > > > > > > previous
>>>> > > > >> > > > > > > > > > windows as the flatTransform after the
>>>> toStream()
>>>> > > makes
>>>> > > > >> sure
>>>> > > > >> > > > > that we
>>>> > > > >> > > > > > > > > don't
>>>> > > > >> > > > > > > > > > process old windows (a custom suppressor
>>>> basically)
>>>> > > is
>>>> > > > >> there
>>>> > > > >> > > a
>>>> > > > >> > > > > way to
>>>> > > > >> > > > > > > > > only
>>>> > > > >> > > > > > > > > > keep the last window so that the store
>>>> rebuilding
>>>> > > goes
>>>> > > > >> > > faster and
>>>> > > > >> > > > > > > without
>>>> > > > >> > > > > > > > > > rebuilding old windows too? Or should I
>>>> create a
>>>> > > custom
>>>> > > > >> > > window
>>>> > > > >> > > > > using
>>>> > > > >> > > > > > > the
>>>> > > > >> > > > > > > > > > original key as key so that the compaction
>>>> keeps
>>>> > > only
>>>> > > > >> the
>>>> > > > >> > > last
>>>> > > > >> > > > > window
>>>> > > > >> > > > > > > > > data?
>>>> > > > >> > > > > > > > > >
>>>> > > > >> > > > > > > > > > Thank you
>>>> > > > >> > > > > > > > > >
>>>> > > > >> > > > > > > > > > --
>>>> > > > >> > > > > > > > > > Alessandro Tagliapietra
>>>> > > > >> > > > > > > > > >
>>>> > > > >> > > > > > > > >
>>>> > > > >> > > > > > > >
>>>> > > > >> > > > > > >
>>>> > > > >> > > > > >
>>>> > > > >> > > > >
>>>> > > > >> > > >
>>>> > > > >> > >
>>>> > > > >> >
>>>> > > > >>
>>>> > > > >
>>>> > > >
>>>> > >
>>>> >
>>>>
>>>

Reply via email to