[ 
https://issues.apache.org/jira/browse/KAFKA-9321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Bocutiu updated KAFKA-9321:
----------------------------------
    Description: 
One of the methods on the StreamsBuilder to create a KTable takes 3 arguments:
{code:java}
// code placeholder

public synchronized <K, V> KTable<K, V> table(final String topic,
                                              final Consumed<K, V> consumed,
                                              final Materialized<K, V, 
KeyValueStore<Bytes, byte[]>> materialized)
{code}
The method code, however, overwrites the materialized Key & Value serde with 
the ones coming from consumed. There is already  _.table(String, Consumed)_  
which constructs the table this way. 

 
{code:java}
// code placeholder
...
final ConsumedInternal<K, V> consumedInternal = new 
ConsumedInternal<>(consumed);
materialized.withKeySerde(consumedInternal.keySerde()).withValueSerde(consumedInternal.valueSerde());{code}
This stops use cases where a table should consume let's say Json but 
materialize it as Avro. 

 

*Suggestion*

Drop the line overwriting the Key&Value serdes and ideally remove the 
`withKeySerde` and `withValueSerde` from Materialized - it could help avoid 
scenarios like this where state is changed unexpectedly and incorrectly.

 

  was:
One of the methods on the StreamsBuilder to create a KTable takes 3 arguments:
{code:java}
// code placeholder

public synchronized <K, V> KTable<K, V> table(final String topic,
                                              final Consumed<K, V> consumed,
                                              final Materialized<K, V, 
KeyValueStore<Bytes, byte[]>> materialized)
{code}
The method code, however, overwrites the materialized Key & Value serde with 
the ones coming from consumed. There is already  _.table(String, Consumed)_  
which constructs the table this way. 

 
{code:java}
// code placeholder
...
final ConsumedInternal<K, V> consumedInternal = new 
ConsumedInternal<>(consumed);
materialized.withKeySerde(consumedInternal.keySerde()).withValueSerde(consumedInternal.valueSerde());{code}
This stops use cases where a table should consume let's say Json but 
materialize it as Avro. 

 

 


> StreamsBuilder table method overwrites the materialized parameter
> -----------------------------------------------------------------
>
>                 Key: KAFKA-9321
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9321
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 2.3.0, 2.4.0
>            Reporter: Stefan Bocutiu
>            Priority: Major
>
> One of the methods on the StreamsBuilder to create a KTable takes 3 arguments:
> {code:java}
> // code placeholder
> public synchronized <K, V> KTable<K, V> table(final String topic,
>                                               final Consumed<K, V> consumed,
>                                               final Materialized<K, V, 
> KeyValueStore<Bytes, byte[]>> materialized)
> {code}
> The method code, however, overwrites the materialized Key & Value serde with 
> the ones coming from consumed. There is already  _.table(String, Consumed)_  
> which constructs the table this way. 
>  
> {code:java}
> // code placeholder
> ...
> final ConsumedInternal<K, V> consumedInternal = new 
> ConsumedInternal<>(consumed);
> materialized.withKeySerde(consumedInternal.keySerde()).withValueSerde(consumedInternal.valueSerde());{code}
> This stops use cases where a table should consume let's say Json but 
> materialize it as Avro. 
>  
> *Suggestion*
> Drop the line overwriting the Key&Value serdes and ideally remove the 
> `withKeySerde` and `withValueSerde` from Materialized - it could help avoid 
> scenarios like this where state is changed unexpectedly and incorrectly.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to