Using 0.10.1.0 This is my topology:
Properties config = new Properties(); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP); config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" ); config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, AggKey.class.getName()); config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); JsonSerializer<BqRtDetailLogLine_aggregate> sumRecordsSerializer = new JsonSerializer<>(); JsonDeserializer<BqRtDetailLogLine_aggregate> sumRecordsDeserializer = new JsonDeserializer<>(BqRtDetailLogLine_aggregate.class); Serde<BqRtDetailLogLine_aggregate> collectorSerde = Serdes.serdeFrom(sumRecordsSerializer,sumRecordsDeserializer); StringSerializer stringSerializer = new StringSerializer(); StringDeserializer stringDeserializer = new StringDeserializer(); Serde<String> stringSerde = Serdes.serdeFrom(stringSerializer,stringDeserializer); JsonDeserializer<RtDetailLogLine> prtRecordDeserializer = new JsonDeserializer<>(RtDetailLogLine.class); JsonSerializer<RtDetailLogLine> prtRecordJsonSerializer = new JsonSerializer<>(); Serde<RtDetailLogLine> prtRecordSerde = Serdes.serdeFrom(prtRecordJsonSerializer,prtRecordDeserializer); KStreamBuilder kStreamBuilder = new KStreamBuilder(); KStream<String, RtDetailLogLine> rtDetailLines = kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC); // change the keying KStream<AggKey, RtDetailLogLine> rtRekey = rtDetailLines.map((key, value) -> new KeyValue<>(new AggKey(value), value)); KTable<Windowed<AggKey>, BqRtDetailLogLine_aggregate> ktRtDetail = rtRekey.groupByKey().aggregate( BqRtDetailLogLine_aggregate::new, new PRTAggregate(), TimeWindows.of(60 * 60 * 1000L), collectorSerde, "prt_minute_agg_stream"); ktRtDetail.toStream().print(); KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config); kafkaStreams.start(); On Tue, Dec 6, 2016 at 6:48 AM, Damian Guy <damian....@gmail.com> wrote: > Hi Jon, > > A couple of things: Which version are you using? > Can you share the code you are using to the build the topology? > > Thanks, > Damian > > On Tue, 6 Dec 2016 at 14:44 Jon Yeargers <jon.yearg...@cedexis.com> wrote: > > > Im using .map to convert my (k/v) string/Object to Object/Object but > when I > > chain this to an aggregation step Im getting this exception: > > > > Exception in thread "StreamThread-1" java.lang.ClassCastException: > > com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to > > java.lang.String > > at > > > > org.apache.kafka.common.serialization.StringSerializer.serialize( > StringSerializer.java:24) > > at > > > > org.apache.kafka.streams.processor.internals.RecordCollector.send( > RecordCollector.java:73) > > at > > > > org.apache.kafka.streams.processor.internals.SinkNode. > process(SinkNode.java:72) > > at > > > > org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > > at > > > > org.apache.kafka.streams.kstream.internals.KStreamFilter$ > KStreamFilterProcessor.process(KStreamFilter.java:44) > > at > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > ProcessorNode.java:82) > > at > > > > org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > > at > > > > org.apache.kafka.streams.kstream.internals.KStreamMap$ > KStreamMapProcessor.process(KStreamMap.java:43) > > at > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > ProcessorNode.java:82) > > at > > > > org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > > at > > > > org.apache.kafka.streams.processor.internals. > SourceNode.process(SourceNode.java:66) > > at > > > > org.apache.kafka.streams.processor.internals. > StreamTask.process(StreamTask.java:181) > > at > > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:436) > > at > > > > org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:242) > > > > My key object implements Serde and returns a JsonSerializer for the > > 'Serializer()' override. > > > > In the config for the topology Im > > setting: config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > > AggKey.class.getName()); > > > > Where else do I need to specify the (de)serializer for my key class? > > >