I'm not sure why you observed that aggregation works ok if String typed key is used. I think I agree with Radek that the problem comes from the value, and here is my understanding:
1. The source stream read from the topic named "rtDetailLines" is in type <String, RtDetailLogLine> 2. After the map call, the result stream named "rtRekey" is in type <AggKey, RtDetailLogLine> 3. Then in aggregateByKey, when repartitioning is auto executed (i.e. the filter -> sink operators you saw in the stack trace), the default serdes for type < AggKey, String> is used, and hence value Serializer<String> failed to serialize an RtDetailLogLine object, which matches the error message "Exception in thread "StreamThread-1" java.lang.ClassCastException: com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to java.lang.String" as well. Note that although we are only "grouping by key", we will use the value serde anyways for repartitioning. Anyways, I double checked the source code but cannot find any obvious bugs that causes not using the default serdes. Guozhang On Tue, Dec 6, 2016 at 12:40 PM, Jon Yeargers <jon.yearg...@cedexis.com> wrote: > Here's the solution (props to Damian G) > > JsonSerializer<AggKey> keySerializer = new JsonSerializer<>(); > JsonDeserializer<AggKey> keyDeserializer = new > JsonDeserializer<>(AggKey.class); > Serde<AggKey> keySerde = Serdes.serdeFrom(keySerializer, keyDeserializer); > > then for the aggregator call 'groupByKey(keySerde, prtRecordSerde)'. > > In the documentation where it says the 'no param' groupByKey will use the > default serializers - this doesn't seem to be true. > > On Tue, Dec 6, 2016 at 12:28 PM, Jon Yeargers <jon.yearg...@cedexis.com> > wrote: > > > Hmm. That's odd as the aggregation works ok if I use a String value for > > the key (and the corresponding String serde). > > > > This error only started occurring when I tried to substitute my 'custom' > > key for the original String. > > > > On Tue, Dec 6, 2016 at 12:24 PM, Radek Gruchalski <ra...@gruchalski.com> > > wrote: > > > >> Yeah, I knew that already, this part of the error: > >> > >> > > >>> > > org.apache.kafka.streams.processor.internals. > >> > RecordCollector.send( > >> > > >>> > RecordCollector.java:73) > >> > >> points to this line: https://github.com/apache/ > kafka/blob/0.10.1/streams/ > >> src/main/java/org/apache/kafka/streams/processor/ > >> internals/RecordCollector.java#L73 > >> > >> which means that your error happens on the value, not the key. > >> > >> – > >> Best regards, > >> Radek Gruchalski > >> ra...@gruchalski.com > >> > >> > >> On December 6, 2016 at 9:18:53 PM, Jon Yeargers ( > jon.yearg...@cedexis.com) > >> wrote: > >> > >> 0.10.1.0 > >> > >> On Tue, Dec 6, 2016 at 11:11 AM, Radek Gruchalski <ra...@gruchalski.com > > > >> wrote: > >> > >> > Jon, > >> > > >> > Are you using 0.10.1 or 0.10.0.1? > >> > > >> > – > >> > Best regards, > >> > Radek Gruchalski > >> > ra...@gruchalski.com > >> > > >> > > >> > On December 6, 2016 at 7:55:30 PM, Damian Guy (damian....@gmail.com) > >> > wrote: > >> > > >> > Hi Jon, > >> > > >> > At a glance the code looks ok, i.e, i believe the aggregate() should > >> have > >> > picked up the default Serde set in your StreamsConfig. However, you > >> could > >> > try adding the Serdes to the groupBy(..) > >> > > >> > i.e., > >> > rtRekey.groupByKey(new AggKeySerDe(), yourValueSerde).aggregate(...) > >> > > >> > Thanks, > >> > Damian > >> > > >> > On Tue, 6 Dec 2016 at 18:42 Jon Yeargers <jon.yearg...@cedexis.com> > >> wrote: > >> > > >> > > It's just a bunch of public 'int' and 'String' values. There's an > >> empty > >> > > constructor and a copy constructor. > >> > > > >> > > For functions I override 'equals' and the requirements for 'serde' > >> > (close, > >> > > configure, serializer and deserializer). > >> > > > >> > > @Override > >> > > public Serializer serializer() { > >> > > JsonSerializer<AggKey> jsonSerializer = new JsonSerializer<>(); > >> > > return jsonSerializer; > >> > > } > >> > > > >> > > @Override > >> > > public Deserializer deserializer() { > >> > > JsonDeserializer<AggKey> jsonDeserializer = new > >> > > JsonDeserializer<>(); > >> > > return jsonDeserializer; > >> > > } > >> > > > >> > > > >> > > > >> > > > >> > > Which relates to: > >> > > > >> > > public class JsonSerializer<T> implements Serializer<T> { > >> > > > >> > > private Gson gson = new Gson(); > >> > > > >> > > @Override > >> > > public void configure(Map<String, ?> map, boolean b) { > >> > > > >> > > } > >> > > > >> > > @Override > >> > > public byte[] serialize(String topic, T t) { > >> > > return gson.toJson(t).getBytes(Charset.forName("UTF-8")); > >> > > } > >> > > > >> > > @Override > >> > > public void close() { > >> > > > >> > > } > >> > > } > >> > > > >> > > > >> > > > >> > > public class JsonDeserializer<T> implements Deserializer<T> { > >> > > > >> > > private Gson gson = new Gson(); > >> > > private Class<T> deserializedClass; > >> > > > >> > > public JsonDeserializer(Class<T> deserializedClass) { > >> > > this.deserializedClass = deserializedClass; > >> > > } > >> > > > >> > > public JsonDeserializer() { > >> > > } > >> > > > >> > > @Override > >> > > @SuppressWarnings("unchecked") > >> > > public void configure(Map<String, ?> map, boolean b) { > >> > > if(deserializedClass == null) { > >> > > deserializedClass = (Class<T>) map.get("serializedClass"); > >> > > } > >> > > } > >> > > > >> > > @Override > >> > > public T deserialize(String s, byte[] bytes) { > >> > > if(bytes == null){ > >> > > return null; > >> > > } > >> > > > >> > > return gson.fromJson(new String(bytes),deserializedClass); > >> > > > >> > > } > >> > > > >> > > @Override > >> > > public void close() { > >> > > > >> > > } > >> > > } > >> > > > >> > > On Tue, Dec 6, 2016 at 10:37 AM, Radek Gruchalski < > >> ra...@gruchalski.com> > >> > > wrote: > >> > > > >> > > > Do you mind sharing the code of AggKey class? > >> > > > > >> > > > – > >> > > > Best regards, > >> > > > Radek Gruchalski > >> > > > ra...@gruchalski.com > >> > > > > >> > > > > >> > > > On December 6, 2016 at 7:26:51 PM, Jon Yeargers ( > >> > > jon.yearg...@cedexis.com) > >> > > > wrote: > >> > > > > >> > > > The 2nd. > >> > > > > >> > > > On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski < > >> > ra...@gruchalski.com> > >> > > > wrote: > >> > > > > >> > > >> Is the error happening at this stage? > >> > > >> > >> > > >> KStream<AggKey, RtDetailLogLine> rtRekey = > rtDetailLines.map((key, > >> > > value) > >> > > >> -> new KeyValue<>(new AggKey(value), value)); > >> > > >> > >> > > >> or here: > >> > > >> > >> > > >> KTable<Windowed<AggKey>, BqRtDetailLogLine_aggregate> ktRtDetail > = > >> > > >> rtRekey.groupByKey().aggregate( > >> > > >> BqRtDetailLogLine_aggregate::new, > >> > > >> new PRTAggregate(), > >> > > >> TimeWindows.of(60 * 60 * 1000L), > >> > > >> collectorSerde, "prt_minute_agg_stream"); > >> > > >> > >> > > >> – > >> > > >> > >> > > >> Best regards, > >> > > >> Radek Gruchalski > >> > > >> ra...@gruchalski.com > >> > > >> > >> > > >> > >> > > >> On December 6, 2016 at 6:47:38 PM, Jon Yeargers ( > >> > > jon.yearg...@cedexis.com) > >> > > >> wrote: > >> > > >> > >> > > >> If I comment out the aggregation step and just .print the .map > >> step I > >> > > >> don't hit the error. It's coming from aggregating the non-String > >> key. > >> > > >> > >> > > >> On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski < > >> > ra...@gruchalski.com> > >> > > >> wrote: > >> > > >> > >> > > >>> Jon, > >> > > >>> > >> > > >>> Looking at your code: > >> > > >>> > >> > > >>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > >> > > >>> Serdes.String().getClass().getName()); > >> > > >>> > >> > > >>> and later: > >> > > >>> > >> > > >>> KStream<String, RtDetailLogLine> rtDetailLines = > >> > > >>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC); > >> > > >>> > >> > > >>> Is RtDetailLogLine inheriting from String? It is not, as the > >> error > >> > > >>> suggests. > >> > > >>> You may have to write your own Serializer / Deserializer for > >> > > >>> RtDetailLogLine. > >> > > >>> > >> > > >>> – > >> > > >>> Best regards, > >> > > >>> Radek Gruchalski > >> > > >>> ra...@gruchalski.com > >> > > >>> > >> > > >>> > >> > > >>> On December 6, 2016 at 6:28:23 PM, Jon Yeargers ( > >> > > >>> jon.yearg...@cedexis.com) wrote: > >> > > >>> > >> > > >>> Using 0.10.1.0 > >> > > >>> > >> > > >>> This is my topology: > >> > > >>> > >> > > >>> Properties config = new Properties(); > >> > > >>> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP); > >> > > >>> config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, > >> ZOOKEEPER_IP); > >> > > >>> config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" > >> ); > >> > > >>> config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > >> > > >>> AggKey.class.getName()); > >> > > >>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > >> > > >>> Serdes.String().getClass().getName()); > >> > > >>> > >> > > >>> JsonSerializer<BqRtDetailLogLine_aggregate> > sumRecordsSerializer > >> = > >> > new > >> > > >>> JsonSerializer<>(); > >> > > >>> JsonDeserializer<BqRtDetailLogLine_aggregate> > >> > sumRecordsDeserializer = > >> > > >>> new > >> > > >>> JsonDeserializer<>(BqRtDetailLogLine_aggregate.class); > >> > > >>> Serde<BqRtDetailLogLine_aggregate> collectorSerde = > >> > > >>> Serdes.serdeFrom(sumRecordsSerializer,sumRecordsDeserializer); > >> > > >>> > >> > > >>> StringSerializer stringSerializer = new StringSerializer(); > >> > > >>> StringDeserializer stringDeserializer = new > StringDeserializer(); > >> > > >>> Serde<String> stringSerde = > >> > > >>> Serdes.serdeFrom(stringSerializer,stringDeserializer); > >> > > >>> > >> > > >>> JsonDeserializer<RtDetailLogLine> prtRecordDeserializer = new > >> > > >>> JsonDeserializer<>(RtDetailLogLine.class); > >> > > >>> JsonSerializer<RtDetailLogLine> prtRecordJsonSerializer = new > >> > > >>> JsonSerializer<>(); > >> > > >>> Serde<RtDetailLogLine> prtRecordSerde = > >> > > >>> Serdes.serdeFrom(prtRecordJsonSerializer, > prtRecordDeserializer); > >> > > >>> > >> > > >>> KStreamBuilder kStreamBuilder = new KStreamBuilder(); > >> > > >>> > >> > > >>> KStream<String, RtDetailLogLine> rtDetailLines = > >> > > >>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC); > >> > > >>> > >> > > >>> // change the keying > >> > > >>> KStream<AggKey, RtDetailLogLine> rtRekey = > >> rtDetailLines.map((key, > >> > > value) > >> > > >>> -> new KeyValue<>(new AggKey(value), value)); > >> > > >>> > >> > > >>> KTable<Windowed<AggKey>, BqRtDetailLogLine_aggregate> ktRtDetail > >> = > >> > > >>> rtRekey.groupByKey().aggregate( > >> > > >>> BqRtDetailLogLine_aggregate::new, > >> > > >>> new PRTAggregate(), > >> > > >>> TimeWindows.of(60 * 60 * 1000L), > >> > > >>> collectorSerde, "prt_minute_agg_stream"); > >> > > >>> > >> > > >>> ktRtDetail.toStream().print(); > >> > > >>> > >> > > >>> KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, > >> config); > >> > > >>> > >> > > >>> kafkaStreams.start(); > >> > > >>> > >> > > >>> > >> > > >>> On Tue, Dec 6, 2016 at 6:48 AM, Damian Guy < > damian....@gmail.com> > >> > >> > > wrote: > >> > > >>> > >> > > >>> > Hi Jon, > >> > > >>> > > >> > > >>> > A couple of things: Which version are you using? > >> > > >>> > Can you share the code you are using to the build the > topology? > >> > > >>> > > >> > > >>> > Thanks, > >> > > >>> > Damian > >> > > >>> > > >> > > >>> > On Tue, 6 Dec 2016 at 14:44 Jon Yeargers < > >> jon.yearg...@cedexis.com > >> > > > >> > > >>> wrote: > >> > > >>> > > >> > > >>> > > Im using .map to convert my (k/v) string/Object to > >> Object/Object > >> > > but > >> > > >>> > when I > >> > > >>> > > chain this to an aggregation step Im getting this exception: > >> > > >>> > > > >> > > >>> > > Exception in thread "StreamThread-1" > >> > java.lang.ClassCastException: > >> > > >>> > > com.company.prtminuteagg.types.RtDetailLogLine cannot be > >> cast to > >> > > >>> > > java.lang.String > >> > > >>> > > at > >> > > >>> > > > >> > > >>> > > org.apache.kafka.common.serialization. > >> > StringSerializer.serialize( > >> > > >>> > StringSerializer.java:24) > >> > > >>> > > at > >> > > >>> > > > >> > > >>> > > org.apache.kafka.streams.processor.internals. > >> > RecordCollector.send( > >> > > >>> > RecordCollector.java:73) > >> > > >>> > > at > >> > > >>> > > > >> > > >>> > > org.apache.kafka.streams.processor.internals.SinkNode. > >> > > >>> > process(SinkNode.java:72) > >> > > >>> > > at > >> > > >>> > > > >> > > >>> > > org.apache.kafka.streams.processor.internals. > >> > > >>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > >> > > >>> > > at > >> > > >>> > > > >> > > >>> > > org.apache.kafka.streams.kstream.internals.KStreamFilter$ > >> > > >>> > KStreamFilterProcessor.process(KStreamFilter.java:44) > >> > > >>> > > at > >> > > >>> > > > >> > > >>> > > org.apache.kafka.streams.processor.internals. > >> > ProcessorNode.process( > >> > > >>> > ProcessorNode.java:82) > >> > > >>> > > at > >> > > >>> > > > >> > > >>> > > org.apache.kafka.streams.processor.internals. > >> > > >>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > >> > > >>> > > at > >> > > >>> > > > >> > > >>> > > org.apache.kafka.streams.kstream.internals.KStreamMap$ > >> > > >>> > KStreamMapProcessor.process(KStreamMap.java:43) > >> > > >>> > > at > >> > > >>> > > > >> > > >>> > > org.apache.kafka.streams.processor.internals. > >> > ProcessorNode.process( > >> > > >>> > ProcessorNode.java:82) > >> > > >>> > > at > >> > > >>> > > > >> > > >>> > > org.apache.kafka.streams.processor.internals. > >> > > >>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > >> > > >>> > > at > >> > > >>> > > > >> > > >>> > > org.apache.kafka.streams.processor.internals. > >> > > >>> > SourceNode.process(SourceNode.java:66) > >> > > >>> > > at > >> > > >>> > > > >> > > >>> > > org.apache.kafka.streams.processor.internals. > >> > > >>> > StreamTask.process(StreamTask.java:181) > >> > > >>> > > at > >> > > >>> > > > >> > > >>> > > org.apache.kafka.streams.processor.internals. > >> > StreamThread.runLoop( > >> > > >>> > StreamThread.java:436) > >> > > >>> > > at > >> > > >>> > > > >> > > >>> > > org.apache.kafka.streams.processor.internals. > >> > > >>> > StreamThread.run(StreamThread.java:242) > >> > > >>> > > > >> > > >>> > > My key object implements Serde and returns a JsonSerializer > >> for > >> > the > >> > > >>> > > 'Serializer()' override. > >> > > >>> > > > >> > > >>> > > In the config for the topology Im > >> > > >>> > > setting: config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > >> > > >>> > > AggKey.class.getName()); > >> > > >>> > > > >> > > >>> > > Where else do I need to specify the (de)serializer for my > key > >> > > class? > >> > > >>> > > > >> > > >>> > > >> > > >>> > >> > > >>> > >> > > >> > >> > > > > >> > > > >> > > >> > >> > > > -- -- Guozhang