[ 
https://issues.apache.org/jira/browse/KAFKA-9321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17000233#comment-17000233
 ] 

Matthias J. Sax commented on KAFKA-9321:
----------------------------------------

The behavior you describe is know and documented in the JavaDocs: 
[http://kafka.apache.org/24/javadoc/org/apache/kafka/streams/StreamsBuilder.html#table-java.lang.String-org.apache.kafka.streams.kstream.Consumed-org.apache.kafka.streams.kstream.Materialized-]
{quote}You should only specify serdes in the 
[{{Consumed}}|http://kafka.apache.org/24/javadoc/org/apache/kafka/streams/kstream/Consumed.html]
 instance as these will also be used to overwrite the serdes in 
[{{Materialized}}|http://kafka.apache.org/24/javadoc/org/apache/kafka/streams/kstream/Materialized.html]
{quote}
 

There is already a ticket about this, and thus I will close this as a duplicate.

> StreamsBuilder table method overwrites the materialized parameter
> -----------------------------------------------------------------
>
>                 Key: KAFKA-9321
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9321
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 2.3.0, 2.4.0
>            Reporter: Stefan Bocutiu
>            Priority: Major
>
> One of the methods on the StreamsBuilder to create a KTable takes 3 arguments:
> {code:java}
> // code placeholder
> public synchronized <K, V> KTable<K, V> table(final String topic,
>                                               final Consumed<K, V> consumed,
>                                               final Materialized<K, V, 
> KeyValueStore<Bytes, byte[]>> materialized)
> {code}
> The method code, however, overwrites the materialized Key & Value serde with 
> the ones coming from consumed. There is already  _.table(String, Consumed)_  
> which constructs the table this way. 
>  
> {code:java}
> // code placeholder
> ...
> final ConsumedInternal<K, V> consumedInternal = new 
> ConsumedInternal<>(consumed);
> materialized.withKeySerde(consumedInternal.keySerde()).withValueSerde(consumedInternal.valueSerde());{code}
> This stops use cases where a table should consume let's say Json but 
> materialize it as Avro. 
>  
> *Suggestion*
> Drop the line overwriting the Key&Value serdes and ideally remove the 
> `withKeySerde` and `withValueSerde` from Materialized - it could help avoid 
> scenarios like this where state is changed unexpectedly and incorrectly.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to