[ 
https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17152866#comment-17152866
 ] 

Di Campo edited comment on KAFKA-4273 at 7/7/20, 4:27 PM:
----------------------------------------------------------

Thanks Matthias. 
In my case I want an emit-on-change behaviour, so keeping just the latest 
element works for me. 
I am trying a windowed-store. However WindowIterator doesn't have the best 
semantics for having one element only. 
I will try to overcome that by removing the element before updating. Is that 
performant, and safe enough? 
Code example: 

{{def transform(key: K, value: V): V = {
    if(value == null) {
      null.asInstanceOf[V]
    } else {
      val eventTime = context.timestamp()
      var timeIterator : WindowStoreIterator[V] = null
      try {
        var timeIterator = eventIdStore.fetch(
          key,
          eventTime - leftDurationMs,
          eventTime + rightDurationMs)
        val exists = timeIterator.hasNext()
        if (exists) {
          val old = timeIterator.next()
          if( old != null && value.equalsRelation(old.value)) {
            return null.asInstanceOf[V]
          } else {
            // update
            timeIterator.remove()
            insert(key, value)
            value
          }
        } else {
          insert(key, value)
          value
        }
      } finally {
        if(timeIterator != null) {
          timeIterator.close()
        }
      }
    }
  }}}




was (Author: xmar):

Thanks Matthias. 
In my case I want an emit-on-change behaviour, so keeping just the latest 
element works for me. 
I am trying a windowed-store. However WindowIterator doesn't have the best 
semantics for having one element only. 
I will try to overcome that by removing the element before updating. Is that 
performant, and safe enough? 
Code example: 

{{  def transform(key: K, value: V): V = {
    if(value == null) {
      null.asInstanceOf[V]
    } else {
      val eventTime = context.timestamp()
      var timeIterator : WindowStoreIterator[V] = null
      try {
        var timeIterator = eventIdStore.fetch(
          key,
          eventTime - leftDurationMs,
          eventTime + rightDurationMs)
        val exists = timeIterator.hasNext()
        if (exists) {
          val old = timeIterator.next()
          if( old != null && value.equalsRelation(old.value)) {
            return null.asInstanceOf[V]
          } else {
            // update
            timeIterator.remove()
            insert(key, value)
            value
          }
        } else {
          insert(key, value)
          value
        }
      } finally {
        if(timeIterator != null) {
          timeIterator.close()
        }
      }
    }
  }}}



> Streams DSL - Add TTL / retention period support for intermediate topics and 
> state stores
> -----------------------------------------------------------------------------------------
>
>                 Key: KAFKA-4273
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4273
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 0.10.0.1
>            Reporter: Davor Poldrugo
>            Priority: Major
>
> Hi!
> I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state 
> as far as I know - it's not configurable.
> In my use case my data has TTL / retnetion period. It's 48 hours. After that 
> - data can be discarded.
> I join two topics: "messages" and "prices" using windowed inner join.
> The two intermediate Kafka topics for this join are named:
>  * messages-prices-join-this-changelog
>  * messages-prices-join-other-changelog
> Since these topics are created as compacted by Kafka Streams, and I don't 
> wan't to keep data forever, I have altered them to not use compaction. Right 
> now my RocksDB state stores grow indefinitely, and I don't have any options 
> to define TTL, or somehow periodically clean the older data.
> A "hack" that I use to keep my disk usage low - I have schedulled a job to 
> periodically stop Kafka Streams instances - one at the time. This triggers a 
> rebalance, and partitions migrate to other instances. When the instance is 
> started again, there's another rebalance, and sometimes this instance starts 
> processing partitions that wasn't processing before the stop - which leads to 
> deletion of the RocksDB state store for those partitions 
> (state.cleanup.delay.ms). In the next rebalance the local store is recreated 
> with a restore consumer - which reads data from - as previously mentioned - a 
> non compacted topic. And this effectively leads to a "hacked TTL support" in 
> Kafka Streams DSL.
> Questions:
>  * Do you think would be reasonable to add support in the DSL api to define 
> TTL for local store?
>  * Which opens another question - there are use cases which don't need the 
> intermediate topics to be created as "compact". Could also this be added to 
> the DSL api? Maybe only this could be added, and this flag should also be 
> used for the RocksDB TTL. Of course in this case another config would be 
> mandatory - the retention period or TTL for the intermediate topics and the 
> state stores. I saw there is a new cleanup.policy - compact_and_delete - 
> added with KAFKA-4015.
>  * Which also leads to another question, maybe some intermediate topics / 
> state stores need different TTL, so a it's not as simple as that. But after 
> KAFKA-3870, it will be easier.
> RocksDB supports TTL:
>  * 
> https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166
>  * https://github.com/facebook/rocksdb/wiki/Time-to-Live
>  * 
> https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java
> A somehow similar issue: KAFKA-4212



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to