Never mind I've found out I can use `.withCachingEnabled` on the store
builder to achieve the same thing as the windowing example as
`Materialized.as` turns that on by default.

Does caching in any way reduces the EOS guarantees?

--
Alessandro Tagliapietra


On Sat, Dec 7, 2019 at 1:12 AM Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:

> Seems my journey with this isn't done just yet,
>
> This seems very complicated to me but I'll try to explain it as best I can.
> To better understand the streams network usage I've used prometheus with
> the JMX exporter to export kafka metrics.
> To check the amount of data we use I'm looking at the increments
> of kafka_producer_topic_metrics_byte_total and
> kafka_producer_producer_topic_metrics_record_send_total,
>
> Our current (before the change mentioned above) code looks like this:
>
> // This transformers just pairs a value with the previous one storing the
> temporary one in a store
> val pairsStream = metricStream
>   .transformValues(ValueTransformerWithKeySupplier { PairTransformer() },
> "LastValueStore")
>   .filter { _, value: MetricSequence? -> value != null }
>
> // Create a store to store suppressed windows until a new one is received
> val suppressStoreSupplier =
> Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("suppress-store"),
> ......
>
> // Window and aggregate data in 1 minute intervals
> val aggregatedStream = pairsStream
>   .groupByKey()
>   .windowedBy<TimeWindow>(TimeWindows.of(Duration.ofMinutes(1)))
>   .aggregate(
>           { MetricSequenceList(ArrayList()) },
>           { key, value, aggregate ->
>               aggregate.getRecords().add(value)
>               aggregate
>           },
>           Materialized.`as`<String, MetricSequenceList, WindowStore<Bytes,
> ByteArray>>("aggregate-store").withKeySerde(Serdes.String()).withValueSerde(Settings.getValueSpecificavroSerde())
>   )
>   .toStream()
>   .flatTransform(TransformerSupplier {
>       // This transformer basically waits until a new window is received
> to emit the previous one
>   }, "suppress-store")
>   .map { sensorId: String, suppressedOutput: SuppressedOutput ->
>       .... etc ....
>
>
> Basically:
>  - all data goes through LastValueStore store that stores each message and
> emits a pair with the previous one
>  - the aggregate-store is used to store the per-window list of messages in
> the aggregate method
>  - the suppress store is used to store each received window which is
> emitted only after a newer one is received
>
> What I'm experiencing is that:
>  - during normal execution, the streams app sends to the lastvalue store
> changelog topic 5k messages/min, the aggregate and suppress store changelog
> topics only about 100
>  - at some point (after many hours of operation), the streams app starts
> sending to the aggregate and suppress store changelog topic the same amount
> of messages going through the lastvaluestore
>  - if I restart the streams app it goes back to the initial behavior
>
> You can see the behavior in this graph https://imgur.com/dJcUNSf
> You can also see that after a restart everything goes back to normal
> levels.
> Regarding other metrics, process latency increases, poll latency
> decreases, poll rate decreases, commit rate stays the same while commit
> latency increases.
>
> Now, I've these questions:
>  - why isn't the aggregate/suppress store changelog topic throughput the
> same as the LastValueStore? Shouldn't every time it aggregates send a
> record to the changelog?
>  - is the windowing doing some internal caching like not sending every
> aggregation record until the window time is passed? (if so, where can I
> find that code since I would like to use that also for our new
> implementation)
>
> Thank you in advance
>
> --
> Alessandro Tagliapietra
>
>
> On Wed, Dec 4, 2019 at 7:57 AM John Roesler <vvcep...@apache.org> wrote:
>
>> Oh, good!
>>
>> On Tue, Dec 3, 2019, at 23:29, Alessandro Tagliapietra wrote:
>> > Testing on staging shows that a restart on exception is much faster and
>> the
>> > stream starts right away which I think means we're reading way less data
>> > than before!
>> >
>> > What I was referring to is that, in Streams, the keys for window
>> > > aggregation state is actually composed of both the window itself and
>> the
>> > > key. In the DSL, it looks like "Windowed<K>". That results in the
>> store
>> > > having a unique key per window for each K, which is why we need
>> retention
>> > > as well as compaction for our changelogs. But for you, if you just
>> make the
>> > > key "K", then compaction alone should do the trick.
>> >
>> > Yes we had compact,delete as cleanup policy but probably it still had a
>> too
>> > long retention value, also the rocksdb store is probably much faster now
>> > having only one key per key instead of one key per window per key.
>> >
>> > Thanks a lot for helping! I'm now going to setup a prometheus-jmx
>> > monitoring so we can keep better track of what's going on :)
>> >
>> > --
>> > Alessandro Tagliapietra
>> >
>> >
>> > On Tue, Dec 3, 2019 at 9:12 PM John Roesler <vvcep...@apache.org>
>> wrote:
>> >
>> > > Oh, yeah, I remember that conversation!
>> > >
>> > > Yes, then, I agree, if you're only storing state of the most recent
>> window
>> > > for each key, and the key you use for that state is actually the key
>> of the
>> > > records, then an aggressive compaction policy plus your custom
>> transformer
>> > > seems like a good way forward.
>> > >
>> > > What I was referring to is that, in Streams, the keys for window
>> > > aggregation state is actually composed of both the window itself and
>> the
>> > > key. In the DSL, it looks like "Windowed<K>". That results in the
>> store
>> > > having a unique key per window for each K, which is why we need
>> retention
>> > > as well as compaction for our changelogs. But for you, if you just
>> make the
>> > > key "K", then compaction alone should do the trick.
>> > >
>> > > And yes, if you manage the topic yourself, then Streams won't adjust
>> the
>> > > retention time. I think it might validate that the retention isn't too
>> > > short, but I don't remember offhand.
>> > >
>> > > Cheers, and let me know how it goes!
>> > > -John
>> > >
>> > > On Tue, Dec 3, 2019, at 23:03, Alessandro Tagliapietra wrote:
>> > > > Hi John,
>> > > >
>> > > > afaik grace period uses stream time
>> > > >
>> > >
>> https://kafka.apache.org/21/javadoc/org/apache/kafka/streams/kstream/Windows.html
>> > > > which is
>> > > > per partition, unfortunately we process data that's not in sync
>> between
>> > > > keys so each key needs to be independent and a key can have much
>> older
>> > > > data
>> > > > than the other.
>> > > >
>> > > > Having a small grace period would probably close old windows sooner
>> than
>> > > > expected. That's also why in my use case a custom store that just
>> stores
>> > > > the last window data for each key might work better. I had the same
>> issue
>> > > > with suppression and it has been reported here
>> > > > https://issues.apache.org/jira/browse/KAFKA-8769
>> > > > Oh I just saw that you're the one that helped me on slack and
>> created the
>> > > > issue (thanks again for that).
>> > > >
>> > > > The behavior that you mention about streams setting changelog
>> retention
>> > > > time is something they do on creation of the topic when the broker
>> has
>> > > auto
>> > > > creation enabled? Because we're using confluent cloud and I had to
>> create
>> > > > it manually.
>> > > > Regarding the change in the recovery behavior, with compact cleanup
>> > > policy
>> > > > shouldn't the changelog only keep the last value? That would make
>> the
>> > > > recovery faster and "cheaper" as it would only need to read a single
>> > > value
>> > > > per key (if the cleanup just happened) right?
>> > > >
>> > > > --
>> > > > Alessandro Tagliapietra
>> > > >
>> > > >
>> > > > On Tue, Dec 3, 2019 at 8:51 PM John Roesler <vvcep...@apache.org>
>> wrote:
>> > > >
>> > > > > Hey Alessandro,
>> > > > >
>> > > > > That sounds also like it would work. I'm wondering if it would
>> actually
>> > > > > change what you observe w.r.t. recovery behavior, though. Streams
>> > > already
>> > > > > sets the retention time on the changelog to equal the retention
>> time
>> > > of the
>> > > > > windows, for windowed aggregations, so you shouldn't be loading a
>> lot
>> > > of
>> > > > > window data for old windows you no longer care about.
>> > > > >
>> > > > > Have you set the "grace period" on your window definition? By
>> default,
>> > > it
>> > > > > is set to 24 hours, but you can set it as low as you like. E.g.,
>> if you
>> > > > > want to commit to having in-order data only, then you can set the
>> grace
>> > > > > period to zero. This _should_ let the broker clean up the
>> changelog
>> > > records
>> > > > > as soon as the window ends.
>> > > > >
>> > > > > Of course, the log cleaner doesn't run all the time, so there's
>> some
>> > > extra
>> > > > > delay in which "expired" data would still be visible in the
>> changelog,
>> > > but
>> > > > > it would actually be just the same as if you manage the store
>> yourself.
>> > > > >
>> > > > > Hope this helps!
>> > > > > -John
>> > > > >
>> > > > > On Tue, Dec 3, 2019, at 22:22, Alessandro Tagliapietra wrote:
>> > > > > > Thanks John for the explanation,
>> > > > > >
>> > > > > > I thought that with EOS enabled (which we have) it would in the
>> worst
>> > > > > case
>> > > > > > find a valid checkpoint and start the restore from there until
>> it
>> > > reached
>> > > > > > the last committed status, not completely from scratch. What
>> you say
>> > > > > > definitely makes sense now.
>> > > > > > Since we don't really need old time windows and we ensure data
>> is
>> > > ordered
>> > > > > > when processed I think I"ll just write a custom transformer to
>> keep
>> > > only
>> > > > > > the last window, store intermediate aggregation results in the
>> store
>> > > and
>> > > > > > emit a new value only when we receive data belonging to a new
>> window.
>> > > > > > That with a compact only changelog topic should keep the rebuild
>> > > data to
>> > > > > > the minimum as it would have only the last value for each key.
>> > > > > >
>> > > > > > Hope that makes sense
>> > > > > >
>> > > > > > Thanks again
>> > > > > >
>> > > > > > --
>> > > > > > Alessandro Tagliapietra
>> > > > > >
>> > > > > >
>> > > > > > On Tue, Dec 3, 2019 at 3:04 PM John Roesler <
>> vvcep...@apache.org>
>> > > wrote:
>> > > > > >
>> > > > > > > Hi Alessandro,
>> > > > > > >
>> > > > > > > To take a stab at your question, maybe it first doesn't find
>> it,
>> > > but
>> > > > > then
>> > > > > > > restores some data, writes the checkpoint, and then later on,
>> it
>> > > has to
>> > > > > > > re-initialize the task for some reason, and that's why it does
>> > > find a
>> > > > > > > checkpoint then?
>> > > > > > >
>> > > > > > > More to the heart of the issue, if you have EOS enabled,
>> Streams
>> > > _only_
>> > > > > > > records the checkpoint when the store is in a known-consistent
>> > > state.
>> > > > > For
>> > > > > > > example, if you have a graceful shutdown, Streams will flush
>> all
>> > > the
>> > > > > > > stores, commit all the transactions, and then write the
>> checkpoint
>> > > > > file.
>> > > > > > > Then, on re-start, it will pick up from that checkpoint.
>> > > > > > >
>> > > > > > > But as soon as it starts processing records, it removes the
>> > > checkpoint
>> > > > > > > file, so if it crashes while it was processing, there is no
>> > > checkpoint
>> > > > > file
>> > > > > > > there, and it will have to restore from the beginning of the
>> > > changelog.
>> > > > > > >
>> > > > > > > This design is there on purpose, because otherwise we cannot
>> > > actually
>> > > > > > > guarantee correctness... For example, if you are maintaining a
>> > > count
>> > > > > > > operation, and we process an input record i, increment the
>> count
>> > > and
>> > > > > write
>> > > > > > > it to the state store, and to the changelog topic. But we
>> crash
>> > > before
>> > > > > we
>> > > > > > > commit that transaction. Then, the write to the changelog
>> would be
>> > > > > aborted,
>> > > > > > > and we would re-process record i . However, we've already
>> updated
>> > > the
>> > > > > local
>> > > > > > > state store, so when we increment it again, it results in
>> > > > > double-counting
>> > > > > > > i. The key point here is that there's no way to do an atomic
>> > > operation
>> > > > > > > across two different systems (local state store and the
>> changelog
>> > > > > topic).
>> > > > > > > Since we can't guarantee that we roll back the incremented
>> count
>> > > when
>> > > > > the
>> > > > > > > changelog transaction is aborted, we can't keep the local
>> store
>> > > > > consistent
>> > > > > > > with the changelog.
>> > > > > > >
>> > > > > > > After a crash, the only way to ensure the local store is
>> consistent
>> > > > > with
>> > > > > > > the changelog is to discard the entire thing and rebuild it.
>> This
>> > > is
>> > > > > why we
>> > > > > > > have an invariant that the checkpoint file only exists when we
>> > > _know_
>> > > > > that
>> > > > > > > the local store is consistent with the changelog, and this is
>> why
>> > > > > you're
>> > > > > > > seeing so much bandwidth when re-starting from an unclean
>> shutdown.
>> > > > > > >
>> > > > > > > Note that it's definitely possible to do better than this,
>> and we
>> > > would
>> > > > > > > very much like to improve it in the future.
>> > > > > > >
>> > > > > > > Thanks,
>> > > > > > > -John
>> > > > > > >
>> > > > > > > On Tue, Dec 3, 2019, at 16:16, Alessandro Tagliapietra wrote:
>> > > > > > > > Hi John,
>> > > > > > > >
>> > > > > > > > thanks a lot for helping, regarding your message:
>> > > > > > > >  - no we only have 1 instance of the stream application,
>> and it
>> > > > > always
>> > > > > > > > re-uses the same state folder
>> > > > > > > >  - yes we're seeing most issues when restarting not
>> gracefully
>> > > due
>> > > > > > > exception
>> > > > > > > >
>> > > > > > > > I've enabled trace logging and filtering by a single state
>> store
>> > > the
>> > > > > > > > StoreChangelogReader messages are:
>> > > > > > > >
>> > > > > > > > Added restorer for changelog
>> > > > > sensors-stream-aggregate-store-changelog-0
>> > > > > > > > Added restorer for changelog
>> > > > > sensors-stream-aggregate-store-changelog-1
>> > > > > > > > Added restorer for changelog
>> > > > > sensors-stream-aggregate-store-changelog-2
>> > > > > > > > Did not find checkpoint from changelog
>> > > > > > > > sensors-stream-aggregate-store-changelog-2 for store
>> > > aggregate-store,
>> > > > > > > > rewinding to beginning.
>> > > > > > > > Did not find checkpoint from changelog
>> > > > > > > > sensors-stream-aggregate-store-changelog-1 for store
>> > > aggregate-store,
>> > > > > > > > rewinding to beginning.
>> > > > > > > > Did not find checkpoint from changelog
>> > > > > > > > sensors-stream-aggregate-store-changelog-0 for store
>> > > aggregate-store,
>> > > > > > > > rewinding to beginning.
>> > > > > > > > No checkpoint found for task 0_2 state store aggregate-store
>> > > > > changelog
>> > > > > > > > sensors-stream-aggregate-store-changelog-2 with EOS turned
>> on.
>> > > > > > > > Reinitializing the task and restore its state from the
>> beginning.
>> > > > > > > > No checkpoint found for task 0_1 state store aggregate-store
>> > > > > changelog
>> > > > > > > > sensors-stream-aggregate-store-changelog-1 with EOS turned
>> on.
>> > > > > > > > Reinitializing the task and restore its state from the
>> beginning.
>> > > > > > > > No checkpoint found for task 0_0 state store aggregate-store
>> > > > > changelog
>> > > > > > > > sensors-stream-aggregate-store-changelog-0 with EOS turned
>> on.
>> > > > > > > > Reinitializing the task and restore its state from the
>> beginning.
>> > > > > > > > Found checkpoint 709937 from changelog
>> > > > > > > > sensors-stream-aggregate-store-changelog-2 for store
>> > > aggregate-store.
>> > > > > > > > Restoring partition
>> sensors-stream-aggregate-store-changelog-2
>> > > from
>> > > > > > > offset
>> > > > > > > > 709937 to endOffset 742799
>> > > > > > > > Found checkpoint 3024234 from changelog
>> > > > > > > > sensors-stream-aggregate-store-changelog-1 for store
>> > > aggregate-store.
>> > > > > > > > Restoring partition
>> sensors-stream-aggregate-store-changelog-1
>> > > from
>> > > > > > > offset
>> > > > > > > > 3024234 to endOffset 3131513
>> > > > > > > > Found checkpoint 14514072 from changelog
>> > > > > > > > sensors-stream-aggregate-store-changelog-0 for store
>> > > aggregate-store.
>> > > > > > > > Restoring partition
>> sensors-stream-aggregate-store-changelog-0
>> > > from
>> > > > > > > offset
>> > > > > > > > 14514072 to endOffset 17116574
>> > > > > > > > Restored from sensors-stream-aggregate-store-changelog-2 to
>> > > > > > > aggregate-store
>> > > > > > > > with 966 records, ending offset is 711432, next starting
>> > > position is
>> > > > > > > 711434
>> > > > > > > > Restored from sensors-stream-aggregate-store-changelog-2 to
>> > > > > > > aggregate-store
>> > > > > > > > with 914 records, ending offset is 712711, next starting
>> > > position is
>> > > > > > > 712713
>> > > > > > > > Restored from sensors-stream-aggregate-store-changelog-1 to
>> > > > > > > aggregate-store
>> > > > > > > > with 18 records, ending offset is 3024261, next starting
>> > > position is
>> > > > > > > 3024262
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > why it first says it didn't find the checkpoint and then it
>> does
>> > > > > find it?
>> > > > > > > > It seems it loaded about  2.7M records (sum of offset
>> difference
>> > > in
>> > > > > the
>> > > > > > > > "restorting partition ...." messages) right?
>> > > > > > > > Maybe should I try to reduce the checkpoint interval?
>> > > > > > > >
>> > > > > > > > Regards
>> > > > > > > >
>> > > > > > > > --
>> > > > > > > > Alessandro Tagliapietra
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > On Mon, Dec 2, 2019 at 9:18 AM John Roesler <
>> vvcep...@apache.org
>> > > >
>> > > > > wrote:
>> > > > > > > >
>> > > > > > > > > Hi Alessandro,
>> > > > > > > > >
>> > > > > > > > > I'm sorry to hear that.
>> > > > > > > > >
>> > > > > > > > > The restore process only takes one factor into account:
>> the
>> > > current
>> > > > > > > offset
>> > > > > > > > > position of the changelog topic is stored in a local file
>> > > > > alongside the
>> > > > > > > > > state stores. On startup, the app checks if the recorded
>> > > position
>> > > > > lags
>> > > > > > > the
>> > > > > > > > > latest offset in the changelog. If so, then it reads the
>> > > missing
>> > > > > > > changelog
>> > > > > > > > > records before starting processing.
>> > > > > > > > >
>> > > > > > > > > Thus, it would not restore any old window data.
>> > > > > > > > >
>> > > > > > > > > There might be a few different things going on to explain
>> your
>> > > > > > > observation:
>> > > > > > > > > * if there is more than one instance in your Streams
>> cluster,
>> > > > > maybe the
>> > > > > > > > > task is "flopping" between instances, so each instance
>> still
>> > > has to
>> > > > > > > recover
>> > > > > > > > > state, since it wasn't the last one actively processing
>> it.
>> > > > > > > > > * if the application isn't stopped gracefully, it might
>> not
>> > > get a
>> > > > > > > chance
>> > > > > > > > > to record its offset in that local file, so on restart it
>> has
>> > > to
>> > > > > > > restore
>> > > > > > > > > some or all of the state store from changelog.
>> > > > > > > > >
>> > > > > > > > > Or it could be something else; that's just what comes to
>> mind.
>> > > > > > > > >
>> > > > > > > > > If you want to get to the bottom of it, you can take a
>> look at
>> > > the
>> > > > > > > logs,
>> > > > > > > > > paying close attention to which tasks are assigned to
>> which
>> > > > > instances
>> > > > > > > after
>> > > > > > > > > each restart. You can also look into the logs from
>> > > > > > > > >
>> > > `org.apache.kafka.streams.processor.internals.StoreChangelogReader`
>> > > > > > > (might
>> > > > > > > > > want to set it to DEBUG or TRACE level to really see
>> what's
>> > > > > happening).
>> > > > > > > > >
>> > > > > > > > > I hope this helps!
>> > > > > > > > > -John
>> > > > > > > > >
>> > > > > > > > > On Sun, Dec 1, 2019, at 21:25, Alessandro Tagliapietra
>> wrote:
>> > > > > > > > > > Hello everyone,
>> > > > > > > > > >
>> > > > > > > > > > we're having a problem with bandwidth usage on streams
>> > > > > application
>> > > > > > > > > startup,
>> > > > > > > > > > our current setup does this:
>> > > > > > > > > >
>> > > > > > > > > > ...
>> > > > > > > > > > .groupByKey()
>> > > > > > > > > >
>> > > .windowedBy<TimeWindow>(TimeWindows.of(Duration.ofMinutes(1)))
>> > > > > > > > > > .aggregate(
>> > > > > > > > > >         { MetricSequenceList(ArrayList()) },
>> > > > > > > > > >         { key, value, aggregate ->
>> > > > > > > > > >             aggregate.getRecords().add(value)
>> > > > > > > > > >             aggregate
>> > > > > > > > > >         },
>> > > > > > > > > >         Materialized.`as`<String, MetricSequenceList,
>> > > > > > > WindowStore<Bytes,
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > >
>> > > > >
>> > >
>> ByteArray>>("aggregate-store").withKeySerde(Serdes.String()).withValueSerde(Settings.getValueSpecificavroSerde())
>> > > > > > > > > > )
>> > > > > > > > > > .toStream()
>> > > > > > > > > > .flatTransform(TransformerSupplier {
>> > > > > > > > > > ...
>> > > > > > > > > >
>> > > > > > > > > > basically in each window we append the new values and
>> then do
>> > > > > some
>> > > > > > > other
>> > > > > > > > > > logic with the array of windowed values.
>> > > > > > > > > > The aggregate-store changelog topic configuration  uses
>> > > > > > > compact,delete as
>> > > > > > > > > > cleanup policy and has 12 hours of retention.
>> > > > > > > > > >
>> > > > > > > > > > What we've seen is that on application startup it takes
>> a
>> > > couple
>> > > > > > > minutes
>> > > > > > > > > to
>> > > > > > > > > > rebuild the state store, even if the state store
>> directory is
>> > > > > > > persisted
>> > > > > > > > > > across restarts. That along with an exception that
>> caused the
>> > > > > docker
>> > > > > > > > > > container to be restarted a couple hundreds times
>> caused a
>> > > big
>> > > > > > > confluent
>> > > > > > > > > > cloud bill compared to what we usually spend (1/4 of a
>> full
>> > > > > month in
>> > > > > > > 1
>> > > > > > > > > day).
>> > > > > > > > > >
>> > > > > > > > > > What I think is happening is that the topic is keeping
>> all
>> > > the
>> > > > > > > previous
>> > > > > > > > > > windows even with the compacting policy because each
>> key is
>> > > the
>> > > > > > > original
>> > > > > > > > > > key + the timestamp not just the key. Since we don't
>> care
>> > > about
>> > > > > > > previous
>> > > > > > > > > > windows as the flatTransform after the toStream() makes
>> sure
>> > > > > that we
>> > > > > > > > > don't
>> > > > > > > > > > process old windows (a custom suppressor basically) is
>> there
>> > > a
>> > > > > way to
>> > > > > > > > > only
>> > > > > > > > > > keep the last window so that the store rebuilding goes
>> > > faster and
>> > > > > > > without
>> > > > > > > > > > rebuilding old windows too? Or should I create a custom
>> > > window
>> > > > > using
>> > > > > > > the
>> > > > > > > > > > original key as key so that the compaction keeps only
>> the
>> > > last
>> > > > > window
>> > > > > > > > > data?
>> > > > > > > > > >
>> > > > > > > > > > Thank you
>> > > > > > > > > >
>> > > > > > > > > > --
>> > > > > > > > > > Alessandro Tagliapietra
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>

Reply via email to