The 2nd. On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski <ra...@gruchalski.com> wrote:
> Is the error happening at this stage? > > KStream<AggKey, RtDetailLogLine> rtRekey = rtDetailLines.map((key, value) -> > new KeyValue<>(new AggKey(value), value)); > > or here: > > KTable<Windowed<AggKey>, BqRtDetailLogLine_aggregate> ktRtDetail = > rtRekey.groupByKey().aggregate( > BqRtDetailLogLine_aggregate::new, > new PRTAggregate(), > TimeWindows.of(60 * 60 * 1000L), > collectorSerde, "prt_minute_agg_stream"); > > – > > Best regards, > Radek Gruchalski > ra...@gruchalski.com > > > On December 6, 2016 at 6:47:38 PM, Jon Yeargers (jon.yearg...@cedexis.com) > wrote: > > If I comment out the aggregation step and just .print the .map step I > don't hit the error. It's coming from aggregating the non-String key. > > On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski <ra...@gruchalski.com> > wrote: > >> Jon, >> >> Looking at your code: >> >> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, >> Serdes.String().getClass().getName()); >> >> and later: >> >> KStream<String, RtDetailLogLine> rtDetailLines = >> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC); >> >> Is RtDetailLogLine inheriting from String? It is not, as the error >> suggests. >> You may have to write your own Serializer / Deserializer for >> RtDetailLogLine. >> >> – >> Best regards, >> Radek Gruchalski >> ra...@gruchalski.com >> >> >> On December 6, 2016 at 6:28:23 PM, Jon Yeargers (jon.yearg...@cedexis.com) >> wrote: >> >> Using 0.10.1.0 >> >> This is my topology: >> >> Properties config = new Properties(); >> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP); >> config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP); >> config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" ); >> config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, AggKey.class.getName()); >> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, >> Serdes.String().getClass().getName()); >> >> JsonSerializer<BqRtDetailLogLine_aggregate> sumRecordsSerializer = new >> JsonSerializer<>(); >> JsonDeserializer<BqRtDetailLogLine_aggregate> sumRecordsDeserializer = >> new >> JsonDeserializer<>(BqRtDetailLogLine_aggregate.class); >> Serde<BqRtDetailLogLine_aggregate> collectorSerde = >> Serdes.serdeFrom(sumRecordsSerializer,sumRecordsDeserializer); >> >> StringSerializer stringSerializer = new StringSerializer(); >> StringDeserializer stringDeserializer = new StringDeserializer(); >> Serde<String> stringSerde = >> Serdes.serdeFrom(stringSerializer,stringDeserializer); >> >> JsonDeserializer<RtDetailLogLine> prtRecordDeserializer = new >> JsonDeserializer<>(RtDetailLogLine.class); >> JsonSerializer<RtDetailLogLine> prtRecordJsonSerializer = new >> JsonSerializer<>(); >> Serde<RtDetailLogLine> prtRecordSerde = >> Serdes.serdeFrom(prtRecordJsonSerializer,prtRecordDeserializer); >> >> KStreamBuilder kStreamBuilder = new KStreamBuilder(); >> >> KStream<String, RtDetailLogLine> rtDetailLines = >> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC); >> >> // change the keying >> KStream<AggKey, RtDetailLogLine> rtRekey = rtDetailLines.map((key, value) >> -> new KeyValue<>(new AggKey(value), value)); >> >> KTable<Windowed<AggKey>, BqRtDetailLogLine_aggregate> ktRtDetail = >> rtRekey.groupByKey().aggregate( >> BqRtDetailLogLine_aggregate::new, >> new PRTAggregate(), >> TimeWindows.of(60 * 60 * 1000L), >> collectorSerde, "prt_minute_agg_stream"); >> >> ktRtDetail.toStream().print(); >> >> KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config); >> >> kafkaStreams.start(); >> >> >> On Tue, Dec 6, 2016 at 6:48 AM, Damian Guy <damian....@gmail.com> wrote: >> >> > Hi Jon, >> > >> > A couple of things: Which version are you using? >> > Can you share the code you are using to the build the topology? >> > >> > Thanks, >> > Damian >> > >> > On Tue, 6 Dec 2016 at 14:44 Jon Yeargers <jon.yearg...@cedexis.com> >> wrote: >> > >> > > Im using .map to convert my (k/v) string/Object to Object/Object but >> > when I >> > > chain this to an aggregation step Im getting this exception: >> > > >> > > Exception in thread "StreamThread-1" java.lang.ClassCastException: >> > > com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to >> > > java.lang.String >> > > at >> > > >> > > org.apache.kafka.common.serialization.StringSerializer.serialize( >> > StringSerializer.java:24) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals.RecordCollector.send( >> > RecordCollector.java:73) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals.SinkNode. >> > process(SinkNode.java:72) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals. >> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) >> > > at >> > > >> > > org.apache.kafka.streams.kstream.internals.KStreamFilter$ >> > KStreamFilterProcessor.process(KStreamFilter.java:44) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( >> > ProcessorNode.java:82) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals. >> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) >> > > at >> > > >> > > org.apache.kafka.streams.kstream.internals.KStreamMap$ >> > KStreamMapProcessor.process(KStreamMap.java:43) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( >> > ProcessorNode.java:82) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals. >> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals. >> > SourceNode.process(SourceNode.java:66) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals. >> > StreamTask.process(StreamTask.java:181) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( >> > StreamThread.java:436) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals. >> > StreamThread.run(StreamThread.java:242) >> > > >> > > My key object implements Serde and returns a JsonSerializer for the >> > > 'Serializer()' override. >> > > >> > > In the config for the topology Im >> > > setting: config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, >> > > AggKey.class.getName()); >> > > >> > > Where else do I need to specify the (de)serializer for my key class? >> > > >> > >> >> >