[
https://issues.apache.org/jira/browse/KAFKA-19759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18027912#comment-18027912
]
Ankur Sinha edited comment on KAFKA-19759 at 10/6/25 8:53 PM:
--------------------------------------------------------------
Hello [~mjsax]
This ticket obviously is relates to KAFKA-4212 and went through the code pushed
& discussions, but the concept is not a duplicate. KAFKA-4212 introduced a
specific TTL store, whereas with this the proposal is to have *general TTL
support for all state stores* with automatic eviction and changelog tombstones.
Currently, i implement TTL manually using a timestampedkeyvalstore:
context.schedule(Duration.ofDays(scheduledFrequencyDays),
PunctuationType.WALL_CLOCK_TIME, timestamp - > {
try (var iterator = stateStore.all()) {
while (iterator.hasNext()) {
var entry = iterator.next();
if (entry.value.timestamp() +
Duration.ofDays(retentionDays).toMillis() <= timestamp)
{ stateStore.delete(entry.key); }
}
}
});
Right now i consider wallclocktime only to trigger cleanup at fixed time
irrespective of incoming records to puntuate ~ StreAM_tIME, independent of new
records.
With this idea is to simplify things as below
{{Stores.persistentKeyValueStore("state-store")
.withTtl(Duration.ofDays(retentionDays));}}
* TTL evictionwould be {*}automatic{*}.
* expired entries would generate {*}changelog tombstones{*}.
* no manual scheduling or iteration required.
* to work consistently for all state stores, not just a specific store type.
* .withTtl(duration){{{}{}}} is {*}optional at the API level{*}: passing null
would mean {*}no TTL is applied{*}, and the store behaves like a normal
persistent store.
* Any store without a TTL set will still function normally, maintaining
{*}backward compatibility{*}.
was (Author: JIRAUSER311142):
Hello [~mjsax]
This ticket obviously is relates to KAFKA-4212 and went through the code pushed
& discussions, but the concept is not a duplicate. KAFKA-4212 introduced a
specific TTL store, whereas with this the proposal is to have *general TTL
support for all state stores* with automatic eviction and changelog tombstones.
Currently, i implement TTL manually using a timestampedkeyvalstore:
context.schedule(Duration.ofDays(scheduledFrequencyDays),
PunctuationType.WALL_CLOCK_TIME, timestamp - > {
try (var iterator = stateStore.all()) {
while (iterator.hasNext()) {
var entry = iterator.next();
if (entry.value.timestamp() +
Duration.ofDays(retentionDays).toMillis() <= timestamp)
{ stateStore.delete(entry.key); }
}
}
});
Right now i consider wallclocktime only to trigger cleanup at fixed time
irrespective of incoming records to puntuate ~ StreAM_tIME, independent of new
records.
EveryTime the state gets updated with timestamp either record or stream. So
below would be acceptable.
* If a key is updated before the TTL expires, the behavior depends on how you
store the timestamp:
* *Not updating the timestamp:* TTL is counted from the original insertion →
key could bedeleted even if updated recently.
* *Updating the timestamp on change:* TTL resets from thelatest update → key
survives the full retention period aftereach update.
* what i do thoday , i *update the timestamp on every key update* to ensure
TTL reflects the {*}last activity{*}, so records aren’t prematurelydeleted.
With this idea is to simplify things as below
{{Stores.persistentKeyValueStore("state-store")
.withTtl(Duration.ofDays(retentionDays));}}
* TTL evictionwould be {*}automatic{*}.
* expired entries would generate {*}changelog tombstones{*}.
* no manual scheduling or iteration required.
* to work consistently for all state stores, not just a specific store type.
> Add built-in TTL (Time-to-Live) support for Kafka Streams State Stores
> ----------------------------------------------------------------------
>
> Key: KAFKA-19759
> URL: https://issues.apache.org/jira/browse/KAFKA-19759
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Reporter: Ankur Sinha
> Priority: Minor
> Labels: needs-kip
>
> In business cases Kafka Streams users frequently need *per-key Time-To-Live
> (TTL)* behavior for state stores such as keyValueStore or kTables , typically
> to model cache-like or deduplication scenarios.
> Today, achieving this requires *manual handling* using:
> * Custom timestamp tracking per key,
> * Punctuators to periodically scan and remove expired entries, and
> * Manual emission of tombstones to maintain changelog consistency.
> These workarounds are:
> * {*}Inconsistent across applications{*}, and
> * {*}Operationally costly{*}, as each developer must reimplement the same
> logic.
> *Proposal* is to introduce a *built-in TTL mechanism* for Kafka Streams state
> stores, allowing automatic expiration of records after a configured duration.
> Introduction to new Api's like :
> StoreBuilder<T> withTTL(Duration ttl);
> Materialized<K, V, S> withTtl(Duration ttl);
> When configured:
> * Each record’s timestamp (from event-time or processing-time) is tracked.
> * Expired keys are automatically evicted by a background task (via
> ProcessorContext.Schedule()).
> * Corresponding tombstones are flushed to changelog.
> This feature can provide a *TTL abstraction* that simplifies common use cases
> as:
> * Maintaining cache-like state (e.g., last-seen values with limited lifespan)
> * Automatically purging inactive or stale keys without manual cleanup.
> Points of Risk and Benifits i considered it can bring :
> * Consistency as automatic changelog tombstones will preserve correctness
> across rebalances and restores.
> * Will help to avoid boilerplate punctuator code for manual expiration.
> * TTL is optional and opt-in; existing stores remain unaffected so backward
> compatibility would be maintaoined.
> Example to StateStore/ kTable inferface :
> KTable<String, UserSession> sessions = builder
> .table("sessions", Materialized.<String, UserSession,
> KeyValueStore<Bytes, byte[]>>as("session-store")
> .withTtl(Duration.ofHours(1))
> .withValueSerde(userSessionSerde));
> Here, session entries older than 1 hour will be automatically expired and
> deleted from the local RocksDB store and hence a flush ~ tombstone to
> changelog topic.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)