[ 
https://issues.apache.org/jira/browse/KAFKA-19759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ankur Sinha updated KAFKA-19759:
--------------------------------
    Description: 
In business cases Kafka Streams users frequently need *per-key Time-To-Live 
(TTL)* behavior for state stores such as keyValueStore or kTables , typically 
to model cache-like or deduplication scenarios.

Today, achieving this requires *manual handling* using:
 * Custom timestamp tracking per key,

 * Punctuators to periodically scan and remove expired entries, and

 * Manual emission of tombstones to maintain changelog consistency.

These workarounds are:
 * {*}Inconsistent across applications{*}, and

 * {*}Operationally costly{*}, as each developer must reimplement the same 
logic.

*Proposal* is to introduce a *built-in TTL mechanism* for Kafka Streams state 
stores, allowing automatic expiration of records after a configured duration.

Introduction to new Api's like : 

StoreBuilder<T> withTTL(Duration ttl);
Materialized<K, V, S> withTtl(Duration ttl);

When configured:
 * Each record’s timestamp (from event-time or processing-time) is tracked.

 * Expired keys are automatically evicted by a background task (via 
ProcessorContext.Schedule()).

 * Corresponding tombstones are flushed to changelog.

This feature can provide a *TTL abstraction* that simplifies common use cases 
as:
 * Maintaining cache-like state (e.g., last-seen values with limited lifespan)

 * Automatically purging inactive or stale keys without manual cleanup.

Points of Risk and Benifits i considered it can bring : 
 * Consistency as automatic changelog tombstones will preserve correctness 
across rebalances and restores.

 * Will help to avoid boilerplate punctuator code for manual expiration.

 * TTL is optional and opt-in; existing stores remain unaffected so backward 
compatibility would be maintaoined.

Example to StateStore/ kTable inferface : 

KTable<String, UserSession> sessions = builder
    .table("sessions", Materialized.<String, UserSession, KeyValueStore<Bytes, 
byte[]>>as("session-store")
        .withTtl(Duration.ofHours(1))
        .withValueSerde(userSessionSerde));

Here, session entries older than 1 hour will be automatically expired and 
tombstoned from both the local RocksDB store and hence a flush to changelog 
topic.

  was:
In business cases Kafka Streams users frequently need *per-key Time-To-Live 
(TTL)* behavior for state stores such as keyValueStore or kTables , typically 
to model cache-like or deduplication scenarios.

Today, achieving this requires *manual handling* using:
 * Custom timestamp tracking per key,

 * Punctuators to periodically scan and remove expired entries, and

 * Manual emission of tombstones to maintain changelog consistency.

These workarounds are:
 * {*}Inconsistent across applications{*}, and

 * {*}Operationally costly{*}, as each developer must reimplement the same 
logic.

*Proposal* is to introduce a *built-in TTL mechanism* for Kafka Streams state 
stores, allowing automatic expiration of records after a configured duration.

Introduction to new Api's like : 

StoreBuilder<T> withTTL(Duration ttl);
Materialized<K, V, S> withTtl(Duration ttl);

When configured:
 * Each record’s timestamp (from event-time or processing-time) is tracked.

 * Expired keys are automatically evicted by a background task (via 
ProcessorContext.Schedule()).

 * Corresponding tombstones are flushed to changelog.

This feature can provide a *TTL abstraction* that simplifies common use cases 
as:
 * Maintaining cache-like state (e.g., last-seen values with limited lifespan)

 * Automatically purging inactive or stale keys without manual cleanup.

Points of Risk and Benifits i considered it can bring : 
 * Consistency as automatic changelog tombstones will preserve correctness 
across rebalances and restores.

 * Will help to avoid boilerplate punctuator code for manual expiration.

 * TTL is optional and opt-in; existing stores remain unaffected so backward 
compatibility would be maintaoined.

Example to StateStore/ kTable inferface : 

KTable<String, UserSession> sessions = builder
    .table("sessions", Materialized.<String, UserSession, KeyValueStore<Bytes, 
byte[]>>as("session-store")
        .withTtl(Duration.ofHours(1))
        .withValueSerde(userSessionSerde));

Here, session entries older than 1 hour will be automatically expired and 
tombstoned from both the local RocksDB store and the changelog topic.


> Add built-in TTL (Time-to-Live) support for Kafka Streams State Stores
> ----------------------------------------------------------------------
>
>                 Key: KAFKA-19759
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19759
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Ankur Sinha
>            Priority: Minor
>
> In business cases Kafka Streams users frequently need *per-key Time-To-Live 
> (TTL)* behavior for state stores such as keyValueStore or kTables , typically 
> to model cache-like or deduplication scenarios.
> Today, achieving this requires *manual handling* using:
>  * Custom timestamp tracking per key,
>  * Punctuators to periodically scan and remove expired entries, and
>  * Manual emission of tombstones to maintain changelog consistency.
> These workarounds are:
>  * {*}Inconsistent across applications{*}, and
>  * {*}Operationally costly{*}, as each developer must reimplement the same 
> logic.
> *Proposal* is to introduce a *built-in TTL mechanism* for Kafka Streams state 
> stores, allowing automatic expiration of records after a configured duration.
> Introduction to new Api's like : 
> StoreBuilder<T> withTTL(Duration ttl);
> Materialized<K, V, S> withTtl(Duration ttl);
> When configured:
>  * Each record’s timestamp (from event-time or processing-time) is tracked.
>  * Expired keys are automatically evicted by a background task (via 
> ProcessorContext.Schedule()).
>  * Corresponding tombstones are flushed to changelog.
> This feature can provide a *TTL abstraction* that simplifies common use cases 
> as:
>  * Maintaining cache-like state (e.g., last-seen values with limited lifespan)
>  * Automatically purging inactive or stale keys without manual cleanup.
> Points of Risk and Benifits i considered it can bring : 
>  * Consistency as automatic changelog tombstones will preserve correctness 
> across rebalances and restores.
>  * Will help to avoid boilerplate punctuator code for manual expiration.
>  * TTL is optional and opt-in; existing stores remain unaffected so backward 
> compatibility would be maintaoined.
> Example to StateStore/ kTable inferface : 
> KTable<String, UserSession> sessions = builder
>     .table("sessions", Materialized.<String, UserSession, 
> KeyValueStore<Bytes, byte[]>>as("session-store")
>         .withTtl(Duration.ofHours(1))
>         .withValueSerde(userSessionSerde));
> Here, session entries older than 1 hour will be automatically expired and 
> tombstoned from both the local RocksDB store and hence a flush to changelog 
> topic.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to