Ayoub Omari created KAFKA-16573: ----------------------------------- Summary: Streams does not specify where a Serde is needed Key: KAFKA-16573 URL: https://issues.apache.org/jira/browse/KAFKA-16573 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 3.7.0 Reporter: Ayoub Omari
Example topology: {code:java} builder .table("input", Consumed.`with`(Serdes.String(), Serdes.String())) .groupBy((key, value) => new KeyValue(value, key)) .count() .toStream() .to("output", Produced.`with`(Serdes.String(), Serdes.Long())) {code} At runtime, we get the following exception {code:java} Please specify a key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG org.apache.kafka.common.config.ConfigException: Please specify a key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG at org.apache.kafka.streams.StreamsConfig.defaultKeySerde(StreamsConfig.java:1857) at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.keySerde(AbstractProcessorContext.java:92) at org.apache.kafka.streams.processor.internals.SerdeGetter.keySerde(SerdeGetter.java:47) at org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareSerde(WrappingNullableUtils.java:63) at org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde(WrappingNullableUtils.java:90) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.initStoreSerde(MeteredKeyValueStore.java:188) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:143) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232) at org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102) at org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258){code} The error does not give information about the line or the processor causing the issue. Here a Grouped was missing inside the groupBy, but because the groupBy api doesn't force to define Grouped, this one can be missed, and it could be difficult to spot on a more complex topology. Also, for someone who needs control over serdes in the topology and doesn't want to define default serdes. -- This message was sent by Atlassian Jira (v8.20.10#820010)