Hi Jon,

A couple of things: Which version are you using?
Can you share the code you are using to the build the topology?

Thanks,
Damian

On Tue, 6 Dec 2016 at 14:44 Jon Yeargers <jon.yearg...@cedexis.com> wrote:

> Im using .map to convert my (k/v) string/Object to Object/Object but when I
> chain this to an aggregation step Im getting this exception:
>
> Exception in thread "StreamThread-1" java.lang.ClassCastException:
> com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to
> java.lang.String
> at
>
> org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:24)
> at
>
> org.apache.kafka.streams.processor.internals.RecordCollector.send(RecordCollector.java:73)
> at
>
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:72)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
>
> org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
>
> org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
>
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
> at
>
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:181)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
>
> My key object implements Serde and returns a JsonSerializer for the
> 'Serializer()' override.
>
> In the config for the topology Im
> setting: config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> AggKey.class.getName());
>
> Where else do I need to specify the (de)serializer for my key class?
>

Reply via email to