Hi Jon, At a glance the code looks ok, i.e, i believe the aggregate() should have picked up the default Serde set in your StreamsConfig. However, you could try adding the Serdes to the groupBy(..)
i.e., rtRekey.groupByKey(new AggKeySerDe(), yourValueSerde).aggregate(...) Thanks, Damian On Tue, 6 Dec 2016 at 18:42 Jon Yeargers <[email protected]> wrote: > It's just a bunch of public 'int' and 'String' values. There's an empty > constructor and a copy constructor. > > For functions I override 'equals' and the requirements for 'serde' (close, > configure, serializer and deserializer). > > @Override > public Serializer serializer() { > JsonSerializer<AggKey> jsonSerializer = new JsonSerializer<>(); > return jsonSerializer; > } > > @Override > public Deserializer deserializer() { > JsonDeserializer<AggKey> jsonDeserializer = new > JsonDeserializer<>(); > return jsonDeserializer; > } > > > > > Which relates to: > > public class JsonSerializer<T> implements Serializer<T> { > > private Gson gson = new Gson(); > > @Override > public void configure(Map<String, ?> map, boolean b) { > > } > > @Override > public byte[] serialize(String topic, T t) { > return gson.toJson(t).getBytes(Charset.forName("UTF-8")); > } > > @Override > public void close() { > > } > } > > > > public class JsonDeserializer<T> implements Deserializer<T> { > > private Gson gson = new Gson(); > private Class<T> deserializedClass; > > public JsonDeserializer(Class<T> deserializedClass) { > this.deserializedClass = deserializedClass; > } > > public JsonDeserializer() { > } > > @Override > @SuppressWarnings("unchecked") > public void configure(Map<String, ?> map, boolean b) { > if(deserializedClass == null) { > deserializedClass = (Class<T>) map.get("serializedClass"); > } > } > > @Override > public T deserialize(String s, byte[] bytes) { > if(bytes == null){ > return null; > } > > return gson.fromJson(new String(bytes),deserializedClass); > > } > > @Override > public void close() { > > } > } > > On Tue, Dec 6, 2016 at 10:37 AM, Radek Gruchalski <[email protected]> > wrote: > > > Do you mind sharing the code of AggKey class? > > > > – > > Best regards, > > Radek Gruchalski > > [email protected] > > > > > > On December 6, 2016 at 7:26:51 PM, Jon Yeargers ( > [email protected]) > > wrote: > > > > The 2nd. > > > > On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski <[email protected]> > > wrote: > > > >> Is the error happening at this stage? > >> > >> KStream<AggKey, RtDetailLogLine> rtRekey = rtDetailLines.map((key, > value) > >> -> new KeyValue<>(new AggKey(value), value)); > >> > >> or here: > >> > >> KTable<Windowed<AggKey>, BqRtDetailLogLine_aggregate> ktRtDetail = > >> rtRekey.groupByKey().aggregate( > >> BqRtDetailLogLine_aggregate::new, > >> new PRTAggregate(), > >> TimeWindows.of(60 * 60 * 1000L), > >> collectorSerde, "prt_minute_agg_stream"); > >> > >> – > >> > >> Best regards, > >> Radek Gruchalski > >> [email protected] > >> > >> > >> On December 6, 2016 at 6:47:38 PM, Jon Yeargers ( > [email protected]) > >> wrote: > >> > >> If I comment out the aggregation step and just .print the .map step I > >> don't hit the error. It's coming from aggregating the non-String key. > >> > >> On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski <[email protected]> > >> wrote: > >> > >>> Jon, > >>> > >>> Looking at your code: > >>> > >>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > >>> Serdes.String().getClass().getName()); > >>> > >>> and later: > >>> > >>> KStream<String, RtDetailLogLine> rtDetailLines = > >>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC); > >>> > >>> Is RtDetailLogLine inheriting from String? It is not, as the error > >>> suggests. > >>> You may have to write your own Serializer / Deserializer for > >>> RtDetailLogLine. > >>> > >>> – > >>> Best regards, > >>> Radek Gruchalski > >>> [email protected] > >>> > >>> > >>> On December 6, 2016 at 6:28:23 PM, Jon Yeargers ( > >>> [email protected]) wrote: > >>> > >>> Using 0.10.1.0 > >>> > >>> This is my topology: > >>> > >>> Properties config = new Properties(); > >>> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP); > >>> config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP); > >>> config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" ); > >>> config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > >>> AggKey.class.getName()); > >>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > >>> Serdes.String().getClass().getName()); > >>> > >>> JsonSerializer<BqRtDetailLogLine_aggregate> sumRecordsSerializer = new > >>> JsonSerializer<>(); > >>> JsonDeserializer<BqRtDetailLogLine_aggregate> sumRecordsDeserializer = > >>> new > >>> JsonDeserializer<>(BqRtDetailLogLine_aggregate.class); > >>> Serde<BqRtDetailLogLine_aggregate> collectorSerde = > >>> Serdes.serdeFrom(sumRecordsSerializer,sumRecordsDeserializer); > >>> > >>> StringSerializer stringSerializer = new StringSerializer(); > >>> StringDeserializer stringDeserializer = new StringDeserializer(); > >>> Serde<String> stringSerde = > >>> Serdes.serdeFrom(stringSerializer,stringDeserializer); > >>> > >>> JsonDeserializer<RtDetailLogLine> prtRecordDeserializer = new > >>> JsonDeserializer<>(RtDetailLogLine.class); > >>> JsonSerializer<RtDetailLogLine> prtRecordJsonSerializer = new > >>> JsonSerializer<>(); > >>> Serde<RtDetailLogLine> prtRecordSerde = > >>> Serdes.serdeFrom(prtRecordJsonSerializer,prtRecordDeserializer); > >>> > >>> KStreamBuilder kStreamBuilder = new KStreamBuilder(); > >>> > >>> KStream<String, RtDetailLogLine> rtDetailLines = > >>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC); > >>> > >>> // change the keying > >>> KStream<AggKey, RtDetailLogLine> rtRekey = rtDetailLines.map((key, > value) > >>> -> new KeyValue<>(new AggKey(value), value)); > >>> > >>> KTable<Windowed<AggKey>, BqRtDetailLogLine_aggregate> ktRtDetail = > >>> rtRekey.groupByKey().aggregate( > >>> BqRtDetailLogLine_aggregate::new, > >>> new PRTAggregate(), > >>> TimeWindows.of(60 * 60 * 1000L), > >>> collectorSerde, "prt_minute_agg_stream"); > >>> > >>> ktRtDetail.toStream().print(); > >>> > >>> KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config); > >>> > >>> kafkaStreams.start(); > >>> > >>> > >>> On Tue, Dec 6, 2016 at 6:48 AM, Damian Guy <[email protected]> > wrote: > >>> > >>> > Hi Jon, > >>> > > >>> > A couple of things: Which version are you using? > >>> > Can you share the code you are using to the build the topology? > >>> > > >>> > Thanks, > >>> > Damian > >>> > > >>> > On Tue, 6 Dec 2016 at 14:44 Jon Yeargers <[email protected]> > >>> wrote: > >>> > > >>> > > Im using .map to convert my (k/v) string/Object to Object/Object > but > >>> > when I > >>> > > chain this to an aggregation step Im getting this exception: > >>> > > > >>> > > Exception in thread "StreamThread-1" java.lang.ClassCastException: > >>> > > com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to > >>> > > java.lang.String > >>> > > at > >>> > > > >>> > > org.apache.kafka.common.serialization.StringSerializer.serialize( > >>> > StringSerializer.java:24) > >>> > > at > >>> > > > >>> > > org.apache.kafka.streams.processor.internals.RecordCollector.send( > >>> > RecordCollector.java:73) > >>> > > at > >>> > > > >>> > > org.apache.kafka.streams.processor.internals.SinkNode. > >>> > process(SinkNode.java:72) > >>> > > at > >>> > > > >>> > > org.apache.kafka.streams.processor.internals. > >>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > >>> > > at > >>> > > > >>> > > org.apache.kafka.streams.kstream.internals.KStreamFilter$ > >>> > KStreamFilterProcessor.process(KStreamFilter.java:44) > >>> > > at > >>> > > > >>> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > >>> > ProcessorNode.java:82) > >>> > > at > >>> > > > >>> > > org.apache.kafka.streams.processor.internals. > >>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > >>> > > at > >>> > > > >>> > > org.apache.kafka.streams.kstream.internals.KStreamMap$ > >>> > KStreamMapProcessor.process(KStreamMap.java:43) > >>> > > at > >>> > > > >>> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > >>> > ProcessorNode.java:82) > >>> > > at > >>> > > > >>> > > org.apache.kafka.streams.processor.internals. > >>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > >>> > > at > >>> > > > >>> > > org.apache.kafka.streams.processor.internals. > >>> > SourceNode.process(SourceNode.java:66) > >>> > > at > >>> > > > >>> > > org.apache.kafka.streams.processor.internals. > >>> > StreamTask.process(StreamTask.java:181) > >>> > > at > >>> > > > >>> > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > >>> > StreamThread.java:436) > >>> > > at > >>> > > > >>> > > org.apache.kafka.streams.processor.internals. > >>> > StreamThread.run(StreamThread.java:242) > >>> > > > >>> > > My key object implements Serde and returns a JsonSerializer for the > >>> > > 'Serializer()' override. > >>> > > > >>> > > In the config for the topology Im > >>> > > setting: config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > >>> > > AggKey.class.getName()); > >>> > > > >>> > > Where else do I need to specify the (de)serializer for my key > class? > >>> > > > >>> > > >>> > >>> > >> > > >
