[ 
https://issues.apache.org/jira/browse/KAFKA-6840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16481825#comment-16481825
 ] 

ASF GitHub Bot commented on KAFKA-6840:
---------------------------------------

abbccdda opened a new pull request #5044: KAFKA-6840: windowed ktable API
URL: https://github.com/apache/kafka/pull/5044
 
 
   We are proposing adding a new API called `windowedTable` to 
`StreamsBuilder.java` to bring in the ability of materializing a windowed topic 
to local. Links to related sources:
   KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-300%3A+Add+Windowed+KTable+API+in+StreamsBuilder
   Jira: https://issues.apache.org/jira/browse/KAFKA-6840

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> support windowing in ktable API
> -------------------------------
>
>                 Key: KAFKA-6840
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6840
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 1.1.0
>            Reporter: Boyang Chen
>            Assignee: Boyang Chen
>            Priority: Major
>              Labels: api, needs-kip
>
> The StreamsBuilder provides table() API to materialize a changelog topic into 
> a local key-value store (KTable), which is very convenient. However, current 
> underlying implementation does not support materializing one topic to a 
> windowed key-value store, which in certain cases would be very useful. 
> To make up the gap, we proposed a new API in StreamsBuilder that could get a 
> windowed Ktable.
> The table() API in StreamsBuilder looks like this:
> public synchronized <K, V> KTable<K, V> table(final String topic,
>                                                   final Consumed<K, V> 
> consumed,
>                                                   final Materialized<K, V, 
> KeyValueStore<Bytes, byte[]>> materialized) {
>         Objects.requireNonNull(topic, "topic can't be null");
>         Objects.requireNonNull(consumed, "consumed can't be null");
>         Objects.requireNonNull(materialized, "materialized can't be null");
>         
> materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde);
>         return internalStreamsBuilder.table(topic,
>                                             new ConsumedInternal<>(consumed),
>                                             new 
> MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-"));
>     }
>  
> Where we could see that the store type is given as KeyValueStore. There is no 
> flexibility to change it to WindowStore.
>  
> To maintain compatibility of the existing API, we have two options to define 
> a new API:
> 1.Overload existing KTable struct
> public synchronized <K, V> KTable<Windowed<K>, V> windowedTable(final String 
> topic,
>                                                   final Consumed<K, V> 
> consumed,
>                                                   final Materialized<K, V, 
> WindowStore<Bytes, byte[]>> materialized);
>  
> This could give developer an alternative to use windowed table instead. 
> However, this implies that we need to make sure all the KTable logic still 
> works as expected, such as join, aggregation, etc, so the challenge would be 
> making sure all current KTable logics work.
>  
> 2.Define a new type called WindowedKTable
> public synchronized <K, V> WindowedKTable<K, V> windowedTable(final String 
> topic,
>                                                   final Consumed<K, V> 
> consumed,
>                                                   final Materialized<K, V, 
> WindowStore<Bytes, byte[]>> materialized);
> The benefit of doing this is that we don’t need to worry about the existing 
> functionality of KTable. However, the cost is to introduce redundancy of 
> common operation logic. When upgrading common functionality, we need to take 
> care of both types.
> We could fill in more details in the KIP. Right now I would like to hear some 
> feedbacks on the two approaches, thank you!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to