[ 
https://issues.apache.org/jira/browse/KAFKA-16573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17838672#comment-17838672
 ] 

Ayoub Omari commented on KAFKA-16573:
-------------------------------------

[~ableegoldman] [~mjsax] I looked a bit into this, and found that this error 
may appear only in +three+ cases:
 * Source node serde absent
 * Sink node serde absent
 * Store serde absent

For the first two cases it is easy to improve the error by showing the type of 
node (source or sink) and its name.

For the third case, which is the case in the description, kafka streams doesn't 
know the processor node behind the error when it is checking serdes because 
this check happens during stores initialization. At that moment, it only knows 
the name of the store.

But I think showing the name of the store may help (even for internal stores) ? 
We could say something like "The serdes of the store 
KTABLE-AGGREGATE-STATE-STORE-0000000004 are not specified". WDYT ?

> Streams does not specify where a Serde is needed
> ------------------------------------------------
>
>                 Key: KAFKA-16573
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16573
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 3.7.0
>            Reporter: Ayoub Omari
>            Priority: Major
>
> Example topology:
> {code:java}
>  builder
>    .table("input", Consumed.`with`(Serdes.String(), Serdes.String()))
>    .groupBy((key, value) => new KeyValue(value, key))
>    .count()
>    .toStream()
>    .to("output", Produced.`with`(Serdes.String(), Serdes.Long()))
>  {code}
> At runtime, we get the following exception 
> {code:java}
> Please specify a key serde or set one through 
> StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
> org.apache.kafka.common.config.ConfigException: Please specify a key serde or 
> set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
>     at 
> org.apache.kafka.streams.StreamsConfig.defaultKeySerde(StreamsConfig.java:1857)
>     at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.keySerde(AbstractProcessorContext.java:92)
>     at 
> org.apache.kafka.streams.processor.internals.SerdeGetter.keySerde(SerdeGetter.java:47)
>     at 
> org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareSerde(WrappingNullableUtils.java:63)
>     at 
> org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde(WrappingNullableUtils.java:90)
>     at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.initStoreSerde(MeteredKeyValueStore.java:188)
>     at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:143)
>     at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232)
>     at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258){code}
> The error does not give information about the line or the processor causing 
> the issue.
> Here a Grouped was missing inside the groupBy, but because the groupBy api 
> doesn't force to define Grouped, this one can be missed, and it could be 
> difficult to spot on a more complex topology. 
> Also, for someone who needs control over serdes in the topology and doesn't 
> want to define default serdes.
>  
>   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to