[ https://issues.apache.org/jira/browse/KAFKA-7368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16867770#comment-16867770 ]
John Roesler edited comment on KAFKA-7368 at 6/20/19 5:23 PM: -------------------------------------------------------------- Also, note that the same issue limits the utility of aggregation on windowed tables, for the same reason. I recently spent some time reflecting on the ideal solution to this problem. There's a simple solution and a good solution. The simple solution is to address KAFKA-4212. There's no semantic problem with using a KeyValueStore in the current API, just the problem that it's doomed to run out of space for most windowed tables. Adding a TTL on the KeyValueStore API would directly address this problem. The risk is that, for high volume stores, the expiration itself could become expensive, in CPU, memory, and disk. As an alternative, WindowStores already support an efficient expiration mechanism (they store the records in segments and then drop entire segments at a time). After some analysis, it seems we could allow swapping in a WindowStore instead of a KeyValueStore for such operations, but it requires: * Replace the "bytes store" layer's interface with a new one. It's currently (e.g.) KeyValueStore<Bytes, byte[]>, and we'd just create an independent interface KeyValueBytesStore. This would let us also create a WindowBytesStore that extends KeyValueBytesStore. In turn, this allows us to supply the expiration-efficient store implementation when we know that the data is windowed. * To resolve some related ergonomic concerns, we'd probably also want to create a new WindowStore<K,V> interface that extends KeyValueStore<Windowed<K>,V>. These two points sound simple, but their implications will spider out through the whole code base. Likely, the change would be in the neighborhood of 10,000 lines of code. was (Author: vvcephei): Also, note that the same issue limits the utility of aggregation on windowed tables, for the same reason. I recently spent some time reflecting on the ideal solution to this problem. There's a simple solution and a good solution. The simple solution is to address KAFKA-4212. There's no semantic problem with using a KeyValueStore in the current API, just the problem that it's doomed to run out of space for most windowed tables. Adding a TTL on the KeyValueStore API would directly address this problem. The risk is that, for high volume stores, the expiration itself could become expensive, in CPU, memory, and disk. As an alternative, WindowStores already support an efficient expiration mechanism (they store the records in segments and then drop entire segments at a time). After some analysis, it seems we could allow swapping in a WindowStore instead of a KeyValueStore for such operations, but it requires: * Replace the "bytes store" layer's interface with a new one. It's currently (e.g.) KeyValueStore<Bytes, byte[]>, and we'd just create an independent interface KeyValueBytesStore. This would let us also create a WindowBytesStore that extends KeyValueBytesStore. In turn, this allows us to supply the expiration-efficient store implementation when we know that the data is windowed. * To resolve some related ergonomic concerns, we'd probably also want to create a new WindowStore<K,V> interface that extends KeyValueStore<Windowed<K>,V>. These two points sound simple, but their implications will spider out through the whole code base. Likely, the change would be in the neighborhood of 20,000 lines of code. > Support joining Windowed KTables > -------------------------------- > > Key: KAFKA-7368 > URL: https://issues.apache.org/jira/browse/KAFKA-7368 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: John Roesler > Priority: Major > > Currently, there is no good way to join two `KTable<Windowed<K>, V>`, aka > windowed KTables. > They are KTable, so they have a `join` operator available, but it currently > will use a regular KeyValue store instead of a Windowed store, so it will > grow without bound and new windows enter. > One option is to convert both KTables into KStream, and join them (which is a > windowed join), and then convert them back into KTables for further > processing, but this is an awkward way to accomplish an apparently > straightforward task. > It should instead be possible to directly support it, but the trick will be > to make it impossible to accidentally use a window store for normal (aka > non-windowed) KTables. -- This message was sent by Atlassian JIRA (v7.6.3#76005)