Hello Matthias, Thanks for the feedback on the KIP.
It seems we had a slight misunderstanding regarding the cleanup logic, but after revisiting the ticket and the existing codebase, your suggestion to wipe stores older than state.cleanup.delay.ms makes perfect sense. I have updated the KIP accordingly, and it is now ready for a second round of review. I would like to highlight two specific points for further discussion: - This proposal might cause global stores to be deleted if they aren't updated often. Currently, we check the last modification time of the directory. If a global table hasn't changed, it might be cleaned up even if the data is still valid. However, since these tables are usually small, this might not be a major issue. What do you think? - We previously discussed increasing the default value for state.cleanup.delay.ms to be less aggressive. Do we have any consensus on a reasonable default, or a recommended methodology for measuring what this value should be? Regards, Uladzislau Blok. On Mon, Jan 12, 2026 at 2:55 AM Matthias J. Sax <[email protected]> wrote: > Thanks for the KIP Uladzislau. > > Given that you propose to wipe the entire state if this config is set, I > am wondering if we would need such a config to begin with, or if users > could implement this themselves (via some custom config the application > code uses) and calls `KafkaStreams#cleanUp()` to wipe out all local > state if this custom config is set? > > I believe to remember from the original ticket discussion, that the idea > was not to blindly wipe the entire state, but to do it still based on > task directory age, similar to what the background cleaner thread does > (based on `state.cleanup.delay.ms` config). And to trigger a cleanup run > before startup. Thoughts? > > > -Matthias > > On 12/21/25 6:37 AM, Uladzislau Blok wrote: > > Hi everyone, > > > > I'd like to start a discussion on *KIP-1259: Add configuration to wipe > > local state on startup*. > > Problem > > > > Currently, Kafka Streams can encounter a "zombie data" issue when an > > instance restarts using stale local files after a period exceeding the > > changelog topic's delete.retention.ms. If the local checkpoint offset is > > still within the broker's available log range (due to long-lived > entities), > > an automatic reset isn't triggered. However, since the broker has already > > purged deletion tombstones, the state store is rehydrated without the > > "delete" instructions, causing previously deleted entities to > unexpectedly > > reappear in the local RocksDB. > > Proposed Solution > > > > I propose introducing a new configuration, state.cleanup.on.start > (Boolean, > > default: false). When enabled, this property forces the deletion of all > > local state directories and checkpoint files during application > > initialization. This ensures the state is rebuilt entirely from the > > changelog—the broker's "source of truth"—effectively purging any expired > > zombie records. > > > > This is particularly useful for environments with persistent volumes > where > > instances might remain dormant for long periods (e.g., multi-region > > failover). > > > > *KIP Link: * > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1259%3A+Add+configuration+to+wipe+Kafka+Streams+local+state+on+startup > > > > > > I look forward to your feedback and suggestions. > > > > > > Best regards, > > > > Uladzislau Blok > > > >
