It's just a bunch of public 'int' and 'String' values. There's an empty constructor and a copy constructor.
For functions I override 'equals' and the requirements for 'serde' (close, configure, serializer and deserializer). @Override public Serializer serializer() { JsonSerializer<AggKey> jsonSerializer = new JsonSerializer<>(); return jsonSerializer; } @Override public Deserializer deserializer() { JsonDeserializer<AggKey> jsonDeserializer = new JsonDeserializer<>(); return jsonDeserializer; } Which relates to: public class JsonSerializer<T> implements Serializer<T> { private Gson gson = new Gson(); @Override public void configure(Map<String, ?> map, boolean b) { } @Override public byte[] serialize(String topic, T t) { return gson.toJson(t).getBytes(Charset.forName("UTF-8")); } @Override public void close() { } } public class JsonDeserializer<T> implements Deserializer<T> { private Gson gson = new Gson(); private Class<T> deserializedClass; public JsonDeserializer(Class<T> deserializedClass) { this.deserializedClass = deserializedClass; } public JsonDeserializer() { } @Override @SuppressWarnings("unchecked") public void configure(Map<String, ?> map, boolean b) { if(deserializedClass == null) { deserializedClass = (Class<T>) map.get("serializedClass"); } } @Override public T deserialize(String s, byte[] bytes) { if(bytes == null){ return null; } return gson.fromJson(new String(bytes),deserializedClass); } @Override public void close() { } } On Tue, Dec 6, 2016 at 10:37 AM, Radek Gruchalski <ra...@gruchalski.com> wrote: > Do you mind sharing the code of AggKey class? > > – > Best regards, > Radek Gruchalski > ra...@gruchalski.com > > > On December 6, 2016 at 7:26:51 PM, Jon Yeargers (jon.yearg...@cedexis.com) > wrote: > > The 2nd. > > On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski <ra...@gruchalski.com> > wrote: > >> Is the error happening at this stage? >> >> KStream<AggKey, RtDetailLogLine> rtRekey = rtDetailLines.map((key, value) >> -> new KeyValue<>(new AggKey(value), value)); >> >> or here: >> >> KTable<Windowed<AggKey>, BqRtDetailLogLine_aggregate> ktRtDetail = >> rtRekey.groupByKey().aggregate( >> BqRtDetailLogLine_aggregate::new, >> new PRTAggregate(), >> TimeWindows.of(60 * 60 * 1000L), >> collectorSerde, "prt_minute_agg_stream"); >> >> – >> >> Best regards, >> Radek Gruchalski >> ra...@gruchalski.com >> >> >> On December 6, 2016 at 6:47:38 PM, Jon Yeargers (jon.yearg...@cedexis.com) >> wrote: >> >> If I comment out the aggregation step and just .print the .map step I >> don't hit the error. It's coming from aggregating the non-String key. >> >> On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski <ra...@gruchalski.com> >> wrote: >> >>> Jon, >>> >>> Looking at your code: >>> >>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, >>> Serdes.String().getClass().getName()); >>> >>> and later: >>> >>> KStream<String, RtDetailLogLine> rtDetailLines = >>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC); >>> >>> Is RtDetailLogLine inheriting from String? It is not, as the error >>> suggests. >>> You may have to write your own Serializer / Deserializer for >>> RtDetailLogLine. >>> >>> – >>> Best regards, >>> Radek Gruchalski >>> ra...@gruchalski.com >>> >>> >>> On December 6, 2016 at 6:28:23 PM, Jon Yeargers ( >>> jon.yearg...@cedexis.com) wrote: >>> >>> Using 0.10.1.0 >>> >>> This is my topology: >>> >>> Properties config = new Properties(); >>> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP); >>> config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP); >>> config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" ); >>> config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, >>> AggKey.class.getName()); >>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, >>> Serdes.String().getClass().getName()); >>> >>> JsonSerializer<BqRtDetailLogLine_aggregate> sumRecordsSerializer = new >>> JsonSerializer<>(); >>> JsonDeserializer<BqRtDetailLogLine_aggregate> sumRecordsDeserializer = >>> new >>> JsonDeserializer<>(BqRtDetailLogLine_aggregate.class); >>> Serde<BqRtDetailLogLine_aggregate> collectorSerde = >>> Serdes.serdeFrom(sumRecordsSerializer,sumRecordsDeserializer); >>> >>> StringSerializer stringSerializer = new StringSerializer(); >>> StringDeserializer stringDeserializer = new StringDeserializer(); >>> Serde<String> stringSerde = >>> Serdes.serdeFrom(stringSerializer,stringDeserializer); >>> >>> JsonDeserializer<RtDetailLogLine> prtRecordDeserializer = new >>> JsonDeserializer<>(RtDetailLogLine.class); >>> JsonSerializer<RtDetailLogLine> prtRecordJsonSerializer = new >>> JsonSerializer<>(); >>> Serde<RtDetailLogLine> prtRecordSerde = >>> Serdes.serdeFrom(prtRecordJsonSerializer,prtRecordDeserializer); >>> >>> KStreamBuilder kStreamBuilder = new KStreamBuilder(); >>> >>> KStream<String, RtDetailLogLine> rtDetailLines = >>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC); >>> >>> // change the keying >>> KStream<AggKey, RtDetailLogLine> rtRekey = rtDetailLines.map((key, value) >>> -> new KeyValue<>(new AggKey(value), value)); >>> >>> KTable<Windowed<AggKey>, BqRtDetailLogLine_aggregate> ktRtDetail = >>> rtRekey.groupByKey().aggregate( >>> BqRtDetailLogLine_aggregate::new, >>> new PRTAggregate(), >>> TimeWindows.of(60 * 60 * 1000L), >>> collectorSerde, "prt_minute_agg_stream"); >>> >>> ktRtDetail.toStream().print(); >>> >>> KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config); >>> >>> kafkaStreams.start(); >>> >>> >>> On Tue, Dec 6, 2016 at 6:48 AM, Damian Guy <damian....@gmail.com> wrote: >>> >>> > Hi Jon, >>> > >>> > A couple of things: Which version are you using? >>> > Can you share the code you are using to the build the topology? >>> > >>> > Thanks, >>> > Damian >>> > >>> > On Tue, 6 Dec 2016 at 14:44 Jon Yeargers <jon.yearg...@cedexis.com> >>> wrote: >>> > >>> > > Im using .map to convert my (k/v) string/Object to Object/Object but >>> > when I >>> > > chain this to an aggregation step Im getting this exception: >>> > > >>> > > Exception in thread "StreamThread-1" java.lang.ClassCastException: >>> > > com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to >>> > > java.lang.String >>> > > at >>> > > >>> > > org.apache.kafka.common.serialization.StringSerializer.serialize( >>> > StringSerializer.java:24) >>> > > at >>> > > >>> > > org.apache.kafka.streams.processor.internals.RecordCollector.send( >>> > RecordCollector.java:73) >>> > > at >>> > > >>> > > org.apache.kafka.streams.processor.internals.SinkNode. >>> > process(SinkNode.java:72) >>> > > at >>> > > >>> > > org.apache.kafka.streams.processor.internals. >>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) >>> > > at >>> > > >>> > > org.apache.kafka.streams.kstream.internals.KStreamFilter$ >>> > KStreamFilterProcessor.process(KStreamFilter.java:44) >>> > > at >>> > > >>> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( >>> > ProcessorNode.java:82) >>> > > at >>> > > >>> > > org.apache.kafka.streams.processor.internals. >>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) >>> > > at >>> > > >>> > > org.apache.kafka.streams.kstream.internals.KStreamMap$ >>> > KStreamMapProcessor.process(KStreamMap.java:43) >>> > > at >>> > > >>> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( >>> > ProcessorNode.java:82) >>> > > at >>> > > >>> > > org.apache.kafka.streams.processor.internals. >>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) >>> > > at >>> > > >>> > > org.apache.kafka.streams.processor.internals. >>> > SourceNode.process(SourceNode.java:66) >>> > > at >>> > > >>> > > org.apache.kafka.streams.processor.internals. >>> > StreamTask.process(StreamTask.java:181) >>> > > at >>> > > >>> > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( >>> > StreamThread.java:436) >>> > > at >>> > > >>> > > org.apache.kafka.streams.processor.internals. >>> > StreamThread.run(StreamThread.java:242) >>> > > >>> > > My key object implements Serde and returns a JsonSerializer for the >>> > > 'Serializer()' override. >>> > > >>> > > In the config for the topology Im >>> > > setting: config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, >>> > > AggKey.class.getName()); >>> > > >>> > > Where else do I need to specify the (de)serializer for my key class? >>> > > >>> > >>> >>> >> >