Hi John,

afaik grace period uses stream time
https://kafka.apache.org/21/javadoc/org/apache/kafka/streams/kstream/Windows.html
which is
per partition, unfortunately we process data that's not in sync between
keys so each key needs to be independent and a key can have much older data
than the other.

Having a small grace period would probably close old windows sooner than
expected. That's also why in my use case a custom store that just stores
the last window data for each key might work better. I had the same issue
with suppression and it has been reported here
https://issues.apache.org/jira/browse/KAFKA-8769
Oh I just saw that you're the one that helped me on slack and created the
issue (thanks again for that).

The behavior that you mention about streams setting changelog retention
time is something they do on creation of the topic when the broker has auto
creation enabled? Because we're using confluent cloud and I had to create
it manually.
Regarding the change in the recovery behavior, with compact cleanup policy
shouldn't the changelog only keep the last value? That would make the
recovery faster and "cheaper" as it would only need to read a single value
per key (if the cleanup just happened) right?

--
Alessandro Tagliapietra


On Tue, Dec 3, 2019 at 8:51 PM John Roesler <vvcep...@apache.org> wrote:

> Hey Alessandro,
>
> That sounds also like it would work. I'm wondering if it would actually
> change what you observe w.r.t. recovery behavior, though. Streams already
> sets the retention time on the changelog to equal the retention time of the
> windows, for windowed aggregations, so you shouldn't be loading a lot of
> window data for old windows you no longer care about.
>
> Have you set the "grace period" on your window definition? By default, it
> is set to 24 hours, but you can set it as low as you like. E.g., if you
> want to commit to having in-order data only, then you can set the grace
> period to zero. This _should_ let the broker clean up the changelog records
> as soon as the window ends.
>
> Of course, the log cleaner doesn't run all the time, so there's some extra
> delay in which "expired" data would still be visible in the changelog, but
> it would actually be just the same as if you manage the store yourself.
>
> Hope this helps!
> -John
>
> On Tue, Dec 3, 2019, at 22:22, Alessandro Tagliapietra wrote:
> > Thanks John for the explanation,
> >
> > I thought that with EOS enabled (which we have) it would in the worst
> case
> > find a valid checkpoint and start the restore from there until it reached
> > the last committed status, not completely from scratch. What you say
> > definitely makes sense now.
> > Since we don't really need old time windows and we ensure data is ordered
> > when processed I think I"ll just write a custom transformer to keep only
> > the last window, store intermediate aggregation results in the store and
> > emit a new value only when we receive data belonging to a new window.
> > That with a compact only changelog topic should keep the rebuild data to
> > the minimum as it would have only the last value for each key.
> >
> > Hope that makes sense
> >
> > Thanks again
> >
> > --
> > Alessandro Tagliapietra
> >
> >
> > On Tue, Dec 3, 2019 at 3:04 PM John Roesler <vvcep...@apache.org> wrote:
> >
> > > Hi Alessandro,
> > >
> > > To take a stab at your question, maybe it first doesn't find it, but
> then
> > > restores some data, writes the checkpoint, and then later on, it has to
> > > re-initialize the task for some reason, and that's why it does find a
> > > checkpoint then?
> > >
> > > More to the heart of the issue, if you have EOS enabled, Streams _only_
> > > records the checkpoint when the store is in a known-consistent state.
> For
> > > example, if you have a graceful shutdown, Streams will flush all the
> > > stores, commit all the transactions, and then write the checkpoint
> file.
> > > Then, on re-start, it will pick up from that checkpoint.
> > >
> > > But as soon as it starts processing records, it removes the checkpoint
> > > file, so if it crashes while it was processing, there is no checkpoint
> file
> > > there, and it will have to restore from the beginning of the changelog.
> > >
> > > This design is there on purpose, because otherwise we cannot actually
> > > guarantee correctness... For example, if you are maintaining a count
> > > operation, and we process an input record i, increment the count and
> write
> > > it to the state store, and to the changelog topic. But we crash before
> we
> > > commit that transaction. Then, the write to the changelog would be
> aborted,
> > > and we would re-process record i . However, we've already updated the
> local
> > > state store, so when we increment it again, it results in
> double-counting
> > > i. The key point here is that there's no way to do an atomic operation
> > > across two different systems (local state store and the changelog
> topic).
> > > Since we can't guarantee that we roll back the incremented count when
> the
> > > changelog transaction is aborted, we can't keep the local store
> consistent
> > > with the changelog.
> > >
> > > After a crash, the only way to ensure the local store is consistent
> with
> > > the changelog is to discard the entire thing and rebuild it. This is
> why we
> > > have an invariant that the checkpoint file only exists when we _know_
> that
> > > the local store is consistent with the changelog, and this is why
> you're
> > > seeing so much bandwidth when re-starting from an unclean shutdown.
> > >
> > > Note that it's definitely possible to do better than this, and we would
> > > very much like to improve it in the future.
> > >
> > > Thanks,
> > > -John
> > >
> > > On Tue, Dec 3, 2019, at 16:16, Alessandro Tagliapietra wrote:
> > > > Hi John,
> > > >
> > > > thanks a lot for helping, regarding your message:
> > > >  - no we only have 1 instance of the stream application, and it
> always
> > > > re-uses the same state folder
> > > >  - yes we're seeing most issues when restarting not gracefully due
> > > exception
> > > >
> > > > I've enabled trace logging and filtering by a single state store the
> > > > StoreChangelogReader messages are:
> > > >
> > > > Added restorer for changelog
> sensors-stream-aggregate-store-changelog-0
> > > > Added restorer for changelog
> sensors-stream-aggregate-store-changelog-1
> > > > Added restorer for changelog
> sensors-stream-aggregate-store-changelog-2
> > > > Did not find checkpoint from changelog
> > > > sensors-stream-aggregate-store-changelog-2 for store aggregate-store,
> > > > rewinding to beginning.
> > > > Did not find checkpoint from changelog
> > > > sensors-stream-aggregate-store-changelog-1 for store aggregate-store,
> > > > rewinding to beginning.
> > > > Did not find checkpoint from changelog
> > > > sensors-stream-aggregate-store-changelog-0 for store aggregate-store,
> > > > rewinding to beginning.
> > > > No checkpoint found for task 0_2 state store aggregate-store
> changelog
> > > > sensors-stream-aggregate-store-changelog-2 with EOS turned on.
> > > > Reinitializing the task and restore its state from the beginning.
> > > > No checkpoint found for task 0_1 state store aggregate-store
> changelog
> > > > sensors-stream-aggregate-store-changelog-1 with EOS turned on.
> > > > Reinitializing the task and restore its state from the beginning.
> > > > No checkpoint found for task 0_0 state store aggregate-store
> changelog
> > > > sensors-stream-aggregate-store-changelog-0 with EOS turned on.
> > > > Reinitializing the task and restore its state from the beginning.
> > > > Found checkpoint 709937 from changelog
> > > > sensors-stream-aggregate-store-changelog-2 for store aggregate-store.
> > > > Restoring partition sensors-stream-aggregate-store-changelog-2 from
> > > offset
> > > > 709937 to endOffset 742799
> > > > Found checkpoint 3024234 from changelog
> > > > sensors-stream-aggregate-store-changelog-1 for store aggregate-store.
> > > > Restoring partition sensors-stream-aggregate-store-changelog-1 from
> > > offset
> > > > 3024234 to endOffset 3131513
> > > > Found checkpoint 14514072 from changelog
> > > > sensors-stream-aggregate-store-changelog-0 for store aggregate-store.
> > > > Restoring partition sensors-stream-aggregate-store-changelog-0 from
> > > offset
> > > > 14514072 to endOffset 17116574
> > > > Restored from sensors-stream-aggregate-store-changelog-2 to
> > > aggregate-store
> > > > with 966 records, ending offset is 711432, next starting position is
> > > 711434
> > > > Restored from sensors-stream-aggregate-store-changelog-2 to
> > > aggregate-store
> > > > with 914 records, ending offset is 712711, next starting position is
> > > 712713
> > > > Restored from sensors-stream-aggregate-store-changelog-1 to
> > > aggregate-store
> > > > with 18 records, ending offset is 3024261, next starting position is
> > > 3024262
> > > >
> > > >
> > > > why it first says it didn't find the checkpoint and then it does
> find it?
> > > > It seems it loaded about  2.7M records (sum of offset difference in
> the
> > > > "restorting partition ...." messages) right?
> > > > Maybe should I try to reduce the checkpoint interval?
> > > >
> > > > Regards
> > > >
> > > > --
> > > > Alessandro Tagliapietra
> > > >
> > > >
> > > > On Mon, Dec 2, 2019 at 9:18 AM John Roesler <vvcep...@apache.org>
> wrote:
> > > >
> > > > > Hi Alessandro,
> > > > >
> > > > > I'm sorry to hear that.
> > > > >
> > > > > The restore process only takes one factor into account: the current
> > > offset
> > > > > position of the changelog topic is stored in a local file
> alongside the
> > > > > state stores. On startup, the app checks if the recorded position
> lags
> > > the
> > > > > latest offset in the changelog. If so, then it reads the missing
> > > changelog
> > > > > records before starting processing.
> > > > >
> > > > > Thus, it would not restore any old window data.
> > > > >
> > > > > There might be a few different things going on to explain your
> > > observation:
> > > > > * if there is more than one instance in your Streams cluster,
> maybe the
> > > > > task is "flopping" between instances, so each instance still has to
> > > recover
> > > > > state, since it wasn't the last one actively processing it.
> > > > > * if the application isn't stopped gracefully, it might not get a
> > > chance
> > > > > to record its offset in that local file, so on restart it has to
> > > restore
> > > > > some or all of the state store from changelog.
> > > > >
> > > > > Or it could be something else; that's just what comes to mind.
> > > > >
> > > > > If you want to get to the bottom of it, you can take a look at the
> > > logs,
> > > > > paying close attention to which tasks are assigned to which
> instances
> > > after
> > > > > each restart. You can also look into the logs from
> > > > > `org.apache.kafka.streams.processor.internals.StoreChangelogReader`
> > > (might
> > > > > want to set it to DEBUG or TRACE level to really see what's
> happening).
> > > > >
> > > > > I hope this helps!
> > > > > -John
> > > > >
> > > > > On Sun, Dec 1, 2019, at 21:25, Alessandro Tagliapietra wrote:
> > > > > > Hello everyone,
> > > > > >
> > > > > > we're having a problem with bandwidth usage on streams
> application
> > > > > startup,
> > > > > > our current setup does this:
> > > > > >
> > > > > > ...
> > > > > > .groupByKey()
> > > > > > .windowedBy<TimeWindow>(TimeWindows.of(Duration.ofMinutes(1)))
> > > > > > .aggregate(
> > > > > >         { MetricSequenceList(ArrayList()) },
> > > > > >         { key, value, aggregate ->
> > > > > >             aggregate.getRecords().add(value)
> > > > > >             aggregate
> > > > > >         },
> > > > > >         Materialized.`as`<String, MetricSequenceList,
> > > WindowStore<Bytes,
> > > > > >
> > > > >
> > >
> ByteArray>>("aggregate-store").withKeySerde(Serdes.String()).withValueSerde(Settings.getValueSpecificavroSerde())
> > > > > > )
> > > > > > .toStream()
> > > > > > .flatTransform(TransformerSupplier {
> > > > > > ...
> > > > > >
> > > > > > basically in each window we append the new values and then do
> some
> > > other
> > > > > > logic with the array of windowed values.
> > > > > > The aggregate-store changelog topic configuration  uses
> > > compact,delete as
> > > > > > cleanup policy and has 12 hours of retention.
> > > > > >
> > > > > > What we've seen is that on application startup it takes a couple
> > > minutes
> > > > > to
> > > > > > rebuild the state store, even if the state store directory is
> > > persisted
> > > > > > across restarts. That along with an exception that caused the
> docker
> > > > > > container to be restarted a couple hundreds times caused a big
> > > confluent
> > > > > > cloud bill compared to what we usually spend (1/4 of a full
> month in
> > > 1
> > > > > day).
> > > > > >
> > > > > > What I think is happening is that the topic is keeping all the
> > > previous
> > > > > > windows even with the compacting policy because each key is the
> > > original
> > > > > > key + the timestamp not just the key. Since we don't care about
> > > previous
> > > > > > windows as the flatTransform after the toStream() makes sure
> that we
> > > > > don't
> > > > > > process old windows (a custom suppressor basically) is there a
> way to
> > > > > only
> > > > > > keep the last window so that the store rebuilding goes faster and
> > > without
> > > > > > rebuilding old windows too? Or should I create a custom window
> using
> > > the
> > > > > > original key as key so that the compaction keeps only the last
> window
> > > > > data?
> > > > > >
> > > > > > Thank you
> > > > > >
> > > > > > --
> > > > > > Alessandro Tagliapietra
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to