[ 
https://issues.apache.org/jira/browse/KAFKA-7368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16867770#comment-16867770
 ] 

John Roesler edited comment on KAFKA-7368 at 6/20/19 5:23 PM:
--------------------------------------------------------------

Also, note that the same issue limits the utility of aggregation on windowed 
tables, for the same reason.

I recently spent some time reflecting on the ideal solution to this problem. 
There's a simple solution and a good solution.

The simple solution is to address KAFKA-4212. There's no semantic problem with 
using a KeyValueStore in the current API, just the problem that it's doomed to 
run out of space for most windowed tables. Adding a TTL on the KeyValueStore 
API would directly address this problem. The risk is that, for high volume 
stores, the expiration itself could become expensive, in CPU, memory, and disk.

As an alternative, WindowStores already support an efficient expiration 
mechanism (they store the records in segments and then drop entire segments at 
a time). After some analysis, it seems we could allow swapping in a WindowStore 
instead of a KeyValueStore for such operations, but it requires:
* Replace the "bytes store" layer's interface with a new one. It's currently 
(e.g.) KeyValueStore<Bytes, byte[]>, and we'd just create an independent 
interface KeyValueBytesStore. This would let us also create a WindowBytesStore 
that extends KeyValueBytesStore. In turn, this allows us to supply the 
expiration-efficient store implementation when we know that the data is 
windowed.
* To resolve some related ergonomic concerns, we'd probably also want to create 
a new WindowStore<K,V> interface that extends KeyValueStore<Windowed<K>,V>.

These two points sound simple, but their implications will spider out through 
the whole code base. Likely, the change would be in the neighborhood of 10,000 
lines of code.


was (Author: vvcephei):
Also, note that the same issue limits the utility of aggregation on windowed 
tables, for the same reason.

I recently spent some time reflecting on the ideal solution to this problem. 
There's a simple solution and a good solution.

The simple solution is to address KAFKA-4212. There's no semantic problem with 
using a KeyValueStore in the current API, just the problem that it's doomed to 
run out of space for most windowed tables. Adding a TTL on the KeyValueStore 
API would directly address this problem. The risk is that, for high volume 
stores, the expiration itself could become expensive, in CPU, memory, and disk.

As an alternative, WindowStores already support an efficient expiration 
mechanism (they store the records in segments and then drop entire segments at 
a time). After some analysis, it seems we could allow swapping in a WindowStore 
instead of a KeyValueStore for such operations, but it requires:
* Replace the "bytes store" layer's interface with a new one. It's currently 
(e.g.) KeyValueStore<Bytes, byte[]>, and we'd just create an independent 
interface KeyValueBytesStore. This would let us also create a WindowBytesStore 
that extends KeyValueBytesStore. In turn, this allows us to supply the 
expiration-efficient store implementation when we know that the data is 
windowed.
* To resolve some related ergonomic concerns, we'd probably also want to create 
a new WindowStore<K,V> interface that extends KeyValueStore<Windowed<K>,V>.

These two points sound simple, but their implications will spider out through 
the whole code base. Likely, the change would be in the neighborhood of 20,000 
lines of code.

> Support joining Windowed KTables
> --------------------------------
>
>                 Key: KAFKA-7368
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7368
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: John Roesler
>            Priority: Major
>
> Currently, there is no good way to join two `KTable<Windowed<K>, V>`, aka 
> windowed KTables.
> They are KTable, so they have a `join` operator available, but it currently 
> will use a regular KeyValue store instead of a Windowed store, so it will 
> grow without bound and new windows enter.
> One option is to convert both KTables into KStream, and join them (which is a 
> windowed join), and then convert them back into KTables for further 
> processing, but this is an awkward way to accomplish an apparently 
> straightforward task.
> It should instead be possible to directly support it, but the trick will be 
> to make it impossible to accidentally use a window store for normal (aka 
> non-windowed) KTables.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to