[ https://issues.apache.org/jira/browse/KAFKA-16573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839274#comment-17839274 ]
Ayoub Omari commented on KAFKA-16573: ------------------------------------- Thanks [~mjsax], I am going to work on it in this case. > Streams does not specify where a Serde is needed > ------------------------------------------------ > > Key: KAFKA-16573 > URL: https://issues.apache.org/jira/browse/KAFKA-16573 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 3.7.0 > Reporter: Ayoub Omari > Assignee: Ayoub Omari > Priority: Minor > > Example topology: > {code:java} > builder > .table("input", Consumed.`with`(Serdes.String(), Serdes.String())) > .groupBy((key, value) => new KeyValue(value, key)) > .count() > .toStream() > .to("output", Produced.`with`(Serdes.String(), Serdes.Long())) > {code} > At runtime, we get the following exception > {code:java} > Please specify a key serde or set one through > StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG > org.apache.kafka.common.config.ConfigException: Please specify a key serde or > set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG > at > org.apache.kafka.streams.StreamsConfig.defaultKeySerde(StreamsConfig.java:1857) > at > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.keySerde(AbstractProcessorContext.java:92) > at > org.apache.kafka.streams.processor.internals.SerdeGetter.keySerde(SerdeGetter.java:47) > at > org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareSerde(WrappingNullableUtils.java:63) > at > org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde(WrappingNullableUtils.java:90) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.initStoreSerde(MeteredKeyValueStore.java:188) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:143) > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232) > at > org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102) > at > org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258){code} > The error does not give information about the line or the processor causing > the issue. > Here a Grouped was missing inside the groupBy, but because the groupBy api > doesn't force to define Grouped, this one can be missed, and it could be > difficult to spot on a more complex topology. > Also, for someone who needs control over serdes in the topology and doesn't > want to define default serdes. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)