Im using .map to convert my (k/v) string/Object to Object/Object but when I
chain this to an aggregation step Im getting this exception:

Exception in thread "StreamThread-1" java.lang.ClassCastException:
com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to
java.lang.String
at
org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:24)
at
org.apache.kafka.streams.processor.internals.RecordCollector.send(RecordCollector.java:73)
at
org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:72)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
at
org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
at
org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:181)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)

My key object implements Serde and returns a JsonSerializer for the
'Serializer()' override.

In the config for the topology Im
setting: config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
AggKey.class.getName());

Where else do I need to specify the (de)serializer for my key class?

Reply via email to