Ayoub Omari created KAFKA-16573:
-----------------------------------

             Summary: Streams does not specify where a Serde is needed
                 Key: KAFKA-16573
                 URL: https://issues.apache.org/jira/browse/KAFKA-16573
             Project: Kafka
          Issue Type: Improvement
          Components: streams
    Affects Versions: 3.7.0
            Reporter: Ayoub Omari


Example topology:
{code:java}
 builder
   .table("input", Consumed.`with`(Serdes.String(), Serdes.String()))
   .groupBy((key, value) => new KeyValue(value, key))
   .count()
   .toStream()
   .to("output", Produced.`with`(Serdes.String(), Serdes.Long()))
 {code}
At runtime, we get the following exception 
{code:java}
Please specify a key serde or set one through 
StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
org.apache.kafka.common.config.ConfigException: Please specify a key serde or 
set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
    at 
org.apache.kafka.streams.StreamsConfig.defaultKeySerde(StreamsConfig.java:1857)
    at 
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.keySerde(AbstractProcessorContext.java:92)
    at 
org.apache.kafka.streams.processor.internals.SerdeGetter.keySerde(SerdeGetter.java:47)
    at 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareSerde(WrappingNullableUtils.java:63)
    at 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde(WrappingNullableUtils.java:90)
    at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.initStoreSerde(MeteredKeyValueStore.java:188)
    at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:143)
    at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232)
    at 
org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258){code}
The error does not give information about the line or the processor causing the 
issue.

Here a Grouped was missing inside the groupBy, but because the groupBy api 
doesn't force to define Grouped, this one can be missed, and it could be 
difficult to spot on a more complex topology. 

Also, for someone who needs control over serdes in the topology and doesn't 
want to define default serdes.

 

  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to