If I comment out the aggregation step and just .print the .map step I don't hit the error. It's coming from aggregating the non-String key.
On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski <ra...@gruchalski.com> wrote: > Jon, > > Looking at your code: > > config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > Serdes.String().getClass().getName()); > > and later: > > KStream<String, RtDetailLogLine> rtDetailLines = > kStreamBuilder.stream(stringSerde, > prtRecordSerde, TOPIC); > > Is RtDetailLogLine inheriting from String? It is not, as the error > suggests. > You may have to write your own Serializer / Deserializer for > RtDetailLogLine. > > – > Best regards, > Radek Gruchalski > ra...@gruchalski.com > > > On December 6, 2016 at 6:28:23 PM, Jon Yeargers (jon.yearg...@cedexis.com) > wrote: > > Using 0.10.1.0 > > This is my topology: > > Properties config = new Properties(); > config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP); > config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP); > config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" ); > config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, AggKey.class.getName()); > config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > Serdes.String().getClass().getName()); > > JsonSerializer<BqRtDetailLogLine_aggregate> sumRecordsSerializer = new > JsonSerializer<>(); > JsonDeserializer<BqRtDetailLogLine_aggregate> sumRecordsDeserializer = > new > JsonDeserializer<>(BqRtDetailLogLine_aggregate.class); > Serde<BqRtDetailLogLine_aggregate> collectorSerde = > Serdes.serdeFrom(sumRecordsSerializer,sumRecordsDeserializer); > > StringSerializer stringSerializer = new StringSerializer(); > StringDeserializer stringDeserializer = new StringDeserializer(); > Serde<String> stringSerde = > Serdes.serdeFrom(stringSerializer,stringDeserializer); > > JsonDeserializer<RtDetailLogLine> prtRecordDeserializer = new > JsonDeserializer<>(RtDetailLogLine.class); > JsonSerializer<RtDetailLogLine> prtRecordJsonSerializer = new > JsonSerializer<>(); > Serde<RtDetailLogLine> prtRecordSerde = > Serdes.serdeFrom(prtRecordJsonSerializer,prtRecordDeserializer); > > KStreamBuilder kStreamBuilder = new KStreamBuilder(); > > KStream<String, RtDetailLogLine> rtDetailLines = > kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC); > > // change the keying > KStream<AggKey, RtDetailLogLine> rtRekey = rtDetailLines.map((key, value) > -> new KeyValue<>(new AggKey(value), value)); > > KTable<Windowed<AggKey>, BqRtDetailLogLine_aggregate> ktRtDetail = > rtRekey.groupByKey().aggregate( > BqRtDetailLogLine_aggregate::new, > new PRTAggregate(), > TimeWindows.of(60 * 60 * 1000L), > collectorSerde, "prt_minute_agg_stream"); > > ktRtDetail.toStream().print(); > > KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config); > > kafkaStreams.start(); > > > On Tue, Dec 6, 2016 at 6:48 AM, Damian Guy <damian....@gmail.com> wrote: > > > Hi Jon, > > > > A couple of things: Which version are you using? > > Can you share the code you are using to the build the topology? > > > > Thanks, > > Damian > > > > On Tue, 6 Dec 2016 at 14:44 Jon Yeargers <jon.yearg...@cedexis.com> > wrote: > > > > > Im using .map to convert my (k/v) string/Object to Object/Object but > > when I > > > chain this to an aggregation step Im getting this exception: > > > > > > Exception in thread "StreamThread-1" java.lang.ClassCastException: > > > com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to > > > java.lang.String > > > at > > > > > > org.apache.kafka.common.serialization.StringSerializer.serialize( > > StringSerializer.java:24) > > > at > > > > > > org.apache.kafka.streams.processor.internals.RecordCollector.send( > > RecordCollector.java:73) > > > at > > > > > > org.apache.kafka.streams.processor.internals.SinkNode. > > process(SinkNode.java:72) > > > at > > > > > > org.apache.kafka.streams.processor.internals. > > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > > > at > > > > > > org.apache.kafka.streams.kstream.internals.KStreamFilter$ > > KStreamFilterProcessor.process(KStreamFilter.java:44) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > > ProcessorNode.java:82) > > > at > > > > > > org.apache.kafka.streams.processor.internals. > > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > > > at > > > > > > org.apache.kafka.streams.kstream.internals.KStreamMap$ > > KStreamMapProcessor.process(KStreamMap.java:43) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > > ProcessorNode.java:82) > > > at > > > > > > org.apache.kafka.streams.processor.internals. > > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > > > at > > > > > > org.apache.kafka.streams.processor.internals. > > SourceNode.process(SourceNode.java:66) > > > at > > > > > > org.apache.kafka.streams.processor.internals. > > StreamTask.process(StreamTask.java:181) > > > at > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > > StreamThread.java:436) > > > at > > > > > > org.apache.kafka.streams.processor.internals. > > StreamThread.run(StreamThread.java:242) > > > > > > My key object implements Serde and returns a JsonSerializer for the > > > 'Serializer()' override. > > > > > > In the config for the topology Im > > > setting: config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > > > AggKey.class.getName()); > > > > > > Where else do I need to specify the (de)serializer for my key class? > > > > > > >