[
https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17909411#comment-17909411
]
Matthias J. Sax commented on KAFKA-4273:
----------------------------------------
I just found
[https://github.com/apache/kafka/commit/88823c6016ea2e306340938994d9e122abf3c6c0#diff-6d7c3538dbdac56edfc18b1dc1d7b865c2a7850971ec4215d684dde80c8ac831]
– seems it does actually address this ticket (merge in AK 2.1)...
It's of course still not really recommended to enable TTL, but it seems it
should be possible. – Of course, the underlying changelog should be configured
with cleanup.policy "compact,delete" with a retention time similar to the TTL.
And there is also a difference in semantics: RocksDB TTL is purely wall-clock
time based, while retention time uses record timestamps which are compared to
broker wall-clock time.
But nevertheless, it seems we can resolve this ticket?
> Streams DSL - Add TTL / retention period support for intermediate topics and
> state stores
> -----------------------------------------------------------------------------------------
>
> Key: KAFKA-4273
> URL: https://issues.apache.org/jira/browse/KAFKA-4273
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Affects Versions: 0.10.0.1
> Reporter: Davor Poldrugo
> Priority: Major
>
> Hi!
> I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state
> as far as I know - it's not configurable.
> In my use case my data has TTL / retnetion period. It's 48 hours. After that
> - data can be discarded.
> I join two topics: "messages" and "prices" using windowed inner join.
> The two intermediate Kafka topics for this join are named:
> * messages-prices-join-this-changelog
> * messages-prices-join-other-changelog
> Since these topics are created as compacted by Kafka Streams, and I don't
> wan't to keep data forever, I have altered them to not use compaction. Right
> now my RocksDB state stores grow indefinitely, and I don't have any options
> to define TTL, or somehow periodically clean the older data.
> A "hack" that I use to keep my disk usage low - I have schedulled a job to
> periodically stop Kafka Streams instances - one at the time. This triggers a
> rebalance, and partitions migrate to other instances. When the instance is
> started again, there's another rebalance, and sometimes this instance starts
> processing partitions that wasn't processing before the stop - which leads to
> deletion of the RocksDB state store for those partitions
> (state.cleanup.delay.ms). In the next rebalance the local store is recreated
> with a restore consumer - which reads data from - as previously mentioned - a
> non compacted topic. And this effectively leads to a "hacked TTL support" in
> Kafka Streams DSL.
> Questions:
> * Do you think would be reasonable to add support in the DSL api to define
> TTL for local store?
> * Which opens another question - there are use cases which don't need the
> intermediate topics to be created as "compact". Could also this be added to
> the DSL api? Maybe only this could be added, and this flag should also be
> used for the RocksDB TTL. Of course in this case another config would be
> mandatory - the retention period or TTL for the intermediate topics and the
> state stores. I saw there is a new cleanup.policy - compact_and_delete -
> added with KAFKA-4015.
> * Which also leads to another question, maybe some intermediate topics /
> state stores need different TTL, so a it's not as simple as that. But after
> KAFKA-3870, it will be easier.
> RocksDB supports TTL:
> *
> https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166
> * https://github.com/facebook/rocksdb/wiki/Time-to-Live
> *
> https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java
> A somehow similar issue: KAFKA-4212
--
This message was sent by Atlassian Jira
(v8.20.10#820010)