Hi Jon, A couple of things: Which version are you using? Can you share the code you are using to the build the topology?
Thanks, Damian On Tue, 6 Dec 2016 at 14:44 Jon Yeargers <jon.yearg...@cedexis.com> wrote: > Im using .map to convert my (k/v) string/Object to Object/Object but when I > chain this to an aggregation step Im getting this exception: > > Exception in thread "StreamThread-1" java.lang.ClassCastException: > com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to > java.lang.String > at > > org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:24) > at > > org.apache.kafka.streams.processor.internals.RecordCollector.send(RecordCollector.java:73) > at > > org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:72) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > at > > org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44) > at > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > at > > org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43) > at > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > at > > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66) > at > > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:181) > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436) > at > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) > > My key object implements Serde and returns a JsonSerializer for the > 'Serializer()' override. > > In the config for the topology Im > setting: config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > AggKey.class.getName()); > > Where else do I need to specify the (de)serializer for my key class? >