Im using .map to convert my (k/v) string/Object to Object/Object but when I chain this to an aggregation step Im getting this exception:
Exception in thread "StreamThread-1" java.lang.ClassCastException: com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to java.lang.String at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:24) at org.apache.kafka.streams.processor.internals.RecordCollector.send(RecordCollector.java:73) at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:72) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:181) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) My key object implements Serde and returns a JsonSerializer for the 'Serializer()' override. In the config for the topology Im setting: config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, AggKey.class.getName()); Where else do I need to specify the (de)serializer for my key class?