Hello Matthias,

Thanks for the feedback on the KIP.

It seems we had a slight misunderstanding regarding the cleanup logic, but
after revisiting the ticket and the existing codebase, your suggestion to
wipe stores older than state.cleanup.delay.ms makes perfect sense. I have
updated the KIP accordingly, and it is now ready for a second round of
review.

I would like to highlight two specific points for further discussion:

   -

   This proposal might cause global stores to be deleted if they aren't
   updated often. Currently, we check the last modification time of the
   directory. If a global table hasn't changed, it might be cleaned up even if
   the data is still valid. However, since these tables are usually small,
   this might not be a major issue. What do you think?
   -

   We previously discussed increasing the default value for
   state.cleanup.delay.ms to be less aggressive. Do we have any consensus
   on a reasonable default, or a recommended methodology for measuring what
   this value should be?

Regards,
Uladzislau Blok.

On Mon, Jan 12, 2026 at 2:55 AM Matthias J. Sax <[email protected]> wrote:

> Thanks for the KIP Uladzislau.
>
> Given that you propose to wipe the entire state if this config is set, I
> am wondering if we would need such a config to begin with, or if users
> could implement this themselves (via some custom config the application
> code uses) and calls `KafkaStreams#cleanUp()` to wipe out all local
> state if this custom config is set?
>
> I believe to remember from the original ticket discussion, that the idea
> was not to blindly wipe the entire state, but to do it still based on
> task directory age, similar to what the background cleaner thread does
> (based on `state.cleanup.delay.ms` config). And to trigger a cleanup run
> before startup. Thoughts?
>
>
> -Matthias
>
> On 12/21/25 6:37 AM, Uladzislau Blok wrote:
> > Hi everyone,
> >
> > I'd like to start a discussion on *KIP-1259: Add configuration to wipe
> > local state on startup*.
> > Problem
> >
> > Currently, Kafka Streams can encounter a "zombie data" issue when an
> > instance restarts using stale local files after a period exceeding the
> > changelog topic's delete.retention.ms. If the local checkpoint offset is
> > still within the broker's available log range (due to long-lived
> entities),
> > an automatic reset isn't triggered. However, since the broker has already
> > purged deletion tombstones, the state store is rehydrated without the
> > "delete" instructions, causing previously deleted entities to
> unexpectedly
> > reappear in the local RocksDB.
> > Proposed Solution
> >
> > I propose introducing a new configuration, state.cleanup.on.start
> (Boolean,
> > default: false). When enabled, this property forces the deletion of all
> > local state directories and checkpoint files during application
> > initialization. This ensures the state is rebuilt entirely from the
> > changelog—the broker's "source of truth"—effectively purging any expired
> > zombie records.
> >
> > This is particularly useful for environments with persistent volumes
> where
> > instances might remain dormant for long periods (e.g., multi-region
> > failover).
> >
> > *KIP Link: *
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1259%3A+Add+configuration+to+wipe+Kafka+Streams+local+state+on+startup
> >
> >
> > I look forward to your feedback and suggestions.
> >
> >
> > Best regards,
> >
> > Uladzislau Blok
> >
>
>

Reply via email to