I'm not sure why you observed that aggregation works ok if String typed key
is used. I think I agree with Radek that the problem comes from the value,
and here is my understanding:

1. The source stream read from the topic named "rtDetailLines" is in type
<String, RtDetailLogLine>
2. After the map call, the result stream named "rtRekey" is in type
<AggKey, RtDetailLogLine>

3. Then in aggregateByKey, when repartitioning is auto executed (i.e. the
filter -> sink operators you saw in the stack trace), the default serdes
for type < AggKey, String> is used, and hence value Serializer<String>
failed to serialize an RtDetailLogLine object, which matches the error
message

"Exception in thread "StreamThread-1" java.lang.ClassCastException:
com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to
java.lang.String"

as well. Note that although we are only "grouping by key", we will use the
value serde anyways for repartitioning.


Anyways, I double checked the source code but cannot find any obvious bugs
that causes not using the default serdes.

Guozhang


On Tue, Dec 6, 2016 at 12:40 PM, Jon Yeargers <jon.yearg...@cedexis.com>
wrote:

> Here's the solution (props to Damian G)
>
> JsonSerializer<AggKey> keySerializer = new JsonSerializer<>();
> JsonDeserializer<AggKey> keyDeserializer = new
> JsonDeserializer<>(AggKey.class);
> Serde<AggKey> keySerde = Serdes.serdeFrom(keySerializer, keyDeserializer);
>
> then for the aggregator call 'groupByKey(keySerde, prtRecordSerde)'.
>
> In the documentation where it says the 'no param' groupByKey will use the
> default serializers - this doesn't seem to be true.
>
> On Tue, Dec 6, 2016 at 12:28 PM, Jon Yeargers <jon.yearg...@cedexis.com>
> wrote:
>
> > Hmm. That's odd as the aggregation works ok if I use a String value for
> > the key (and the corresponding String serde).
> >
> > This error only started occurring when I tried to substitute my 'custom'
> > key for the original String.
> >
> > On Tue, Dec 6, 2016 at 12:24 PM, Radek Gruchalski <ra...@gruchalski.com>
> > wrote:
> >
> >> Yeah, I knew that already, this part of the error:
> >>
> >> > > >>> > > org.apache.kafka.streams.processor.internals.
> >> > RecordCollector.send(
> >> > > >>> > RecordCollector.java:73)
> >>
> >> points to this line: https://github.com/apache/
> kafka/blob/0.10.1/streams/
> >> src/main/java/org/apache/kafka/streams/processor/
> >> internals/RecordCollector.java#L73
> >>
> >> which means that your error happens on the value, not the key.
> >>
> >> –
> >> Best regards,
> >> Radek Gruchalski
> >> ra...@gruchalski.com
> >>
> >>
> >> On December 6, 2016 at 9:18:53 PM, Jon Yeargers (
> jon.yearg...@cedexis.com)
> >> wrote:
> >>
> >> 0.10.1.0
> >>
> >> On Tue, Dec 6, 2016 at 11:11 AM, Radek Gruchalski <ra...@gruchalski.com
> >
> >> wrote:
> >>
> >> > Jon,
> >> >
> >> > Are you using 0.10.1 or 0.10.0.1?
> >> >
> >> > –
> >> > Best regards,
> >> > Radek Gruchalski
> >> > ra...@gruchalski.com
> >> >
> >> >
> >> > On December 6, 2016 at 7:55:30 PM, Damian Guy (damian....@gmail.com)
> >> > wrote:
> >> >
> >> > Hi Jon,
> >> >
> >> > At a glance the code looks ok, i.e, i believe the aggregate() should
> >> have
> >> > picked up the default Serde set in your StreamsConfig. However, you
> >> could
> >> > try adding the Serdes to the groupBy(..)
> >> >
> >> > i.e.,
> >> > rtRekey.groupByKey(new AggKeySerDe(), yourValueSerde).aggregate(...)
> >> >
> >> > Thanks,
> >> > Damian
> >> >
> >> > On Tue, 6 Dec 2016 at 18:42 Jon Yeargers <jon.yearg...@cedexis.com>
> >> wrote:
> >> >
> >> > > It's just a bunch of public 'int' and 'String' values. There's an
> >> empty
> >> > > constructor and a copy constructor.
> >> > >
> >> > > For functions I override 'equals' and the requirements for 'serde'
> >> > (close,
> >> > > configure, serializer and deserializer).
> >> > >
> >> > > @Override
> >> > > public Serializer serializer() {
> >> > > JsonSerializer<AggKey> jsonSerializer = new JsonSerializer<>();
> >> > > return jsonSerializer;
> >> > > }
> >> > >
> >> > > @Override
> >> > > public Deserializer deserializer() {
> >> > > JsonDeserializer<AggKey> jsonDeserializer = new
> >> > > JsonDeserializer<>();
> >> > > return jsonDeserializer;
> >> > > }
> >> > >
> >> > >
> >> > >
> >> > >
> >> > > Which relates to:
> >> > >
> >> > > public class JsonSerializer<T> implements Serializer<T> {
> >> > >
> >> > > private Gson gson = new Gson();
> >> > >
> >> > > @Override
> >> > > public void configure(Map<String, ?> map, boolean b) {
> >> > >
> >> > > }
> >> > >
> >> > > @Override
> >> > > public byte[] serialize(String topic, T t) {
> >> > > return gson.toJson(t).getBytes(Charset.forName("UTF-8"));
> >> > > }
> >> > >
> >> > > @Override
> >> > > public void close() {
> >> > >
> >> > > }
> >> > > }
> >> > >
> >> > >
> >> > >
> >> > > public class JsonDeserializer<T> implements Deserializer<T> {
> >> > >
> >> > > private Gson gson = new Gson();
> >> > > private Class<T> deserializedClass;
> >> > >
> >> > > public JsonDeserializer(Class<T> deserializedClass) {
> >> > > this.deserializedClass = deserializedClass;
> >> > > }
> >> > >
> >> > > public JsonDeserializer() {
> >> > > }
> >> > >
> >> > > @Override
> >> > > @SuppressWarnings("unchecked")
> >> > > public void configure(Map<String, ?> map, boolean b) {
> >> > > if(deserializedClass == null) {
> >> > > deserializedClass = (Class<T>) map.get("serializedClass");
> >> > > }
> >> > > }
> >> > >
> >> > > @Override
> >> > > public T deserialize(String s, byte[] bytes) {
> >> > > if(bytes == null){
> >> > > return null;
> >> > > }
> >> > >
> >> > > return gson.fromJson(new String(bytes),deserializedClass);
> >> > >
> >> > > }
> >> > >
> >> > > @Override
> >> > > public void close() {
> >> > >
> >> > > }
> >> > > }
> >> > >
> >> > > On Tue, Dec 6, 2016 at 10:37 AM, Radek Gruchalski <
> >> ra...@gruchalski.com>
> >> > > wrote:
> >> > >
> >> > > > Do you mind sharing the code of AggKey class?
> >> > > >
> >> > > > –
> >> > > > Best regards,
> >> > > > Radek Gruchalski
> >> > > > ra...@gruchalski.com
> >> > > >
> >> > > >
> >> > > > On December 6, 2016 at 7:26:51 PM, Jon Yeargers (
> >> > > jon.yearg...@cedexis.com)
> >> > > > wrote:
> >> > > >
> >> > > > The 2nd.
> >> > > >
> >> > > > On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski <
> >> > ra...@gruchalski.com>
> >> > > > wrote:
> >> > > >
> >> > > >> Is the error happening at this stage?
> >> > > >>
> >> > > >> KStream<AggKey, RtDetailLogLine> rtRekey =
> rtDetailLines.map((key,
> >> > > value)
> >> > > >> -> new KeyValue<>(new AggKey(value), value));
> >> > > >>
> >> > > >> or here:
> >> > > >>
> >> > > >> KTable<Windowed<AggKey>, BqRtDetailLogLine_aggregate> ktRtDetail
> =
> >> > > >> rtRekey.groupByKey().aggregate(
> >> > > >> BqRtDetailLogLine_aggregate::new,
> >> > > >> new PRTAggregate(),
> >> > > >> TimeWindows.of(60 * 60 * 1000L),
> >> > > >> collectorSerde, "prt_minute_agg_stream");
> >> > > >>
> >> > > >> –
> >> > > >>
> >> > > >> Best regards,
> >> > > >> Radek Gruchalski
> >> > > >> ra...@gruchalski.com
> >> > > >>
> >> > > >>
> >> > > >> On December 6, 2016 at 6:47:38 PM, Jon Yeargers (
> >> > > jon.yearg...@cedexis.com)
> >> > > >> wrote:
> >> > > >>
> >> > > >> If I comment out the aggregation step and just .print the .map
> >> step I
> >> > > >> don't hit the error. It's coming from aggregating the non-String
> >> key.
> >> > > >>
> >> > > >> On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski <
> >> > ra...@gruchalski.com>
> >> > > >> wrote:
> >> > > >>
> >> > > >>> Jon,
> >> > > >>>
> >> > > >>> Looking at your code:
> >> > > >>>
> >> > > >>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> >> > > >>> Serdes.String().getClass().getName());
> >> > > >>>
> >> > > >>> and later:
> >> > > >>>
> >> > > >>> KStream<String, RtDetailLogLine> rtDetailLines =
> >> > > >>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);
> >> > > >>>
> >> > > >>> Is RtDetailLogLine inheriting from String? It is not, as the
> >> error
> >> > > >>> suggests.
> >> > > >>> You may have to write your own Serializer / Deserializer for
> >> > > >>> RtDetailLogLine.
> >> > > >>>
> >> > > >>> –
> >> > > >>> Best regards,
> >> > > >>> Radek Gruchalski
> >> > > >>> ra...@gruchalski.com
> >> > > >>>
> >> > > >>>
> >> > > >>> On December 6, 2016 at 6:28:23 PM, Jon Yeargers (
> >> > > >>> jon.yearg...@cedexis.com) wrote:
> >> > > >>>
> >> > > >>> Using 0.10.1.0
> >> > > >>>
> >> > > >>> This is my topology:
> >> > > >>>
> >> > > >>> Properties config = new Properties();
> >> > > >>> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
> >> > > >>> config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
> >> ZOOKEEPER_IP);
> >> > > >>> config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg"
> >> );
> >> > > >>> config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> >> > > >>> AggKey.class.getName());
> >> > > >>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> >> > > >>> Serdes.String().getClass().getName());
> >> > > >>>
> >> > > >>> JsonSerializer<BqRtDetailLogLine_aggregate>
> sumRecordsSerializer
> >> =
> >> > new
> >> > > >>> JsonSerializer<>();
> >> > > >>> JsonDeserializer<BqRtDetailLogLine_aggregate>
> >> > sumRecordsDeserializer =
> >> > > >>> new
> >> > > >>> JsonDeserializer<>(BqRtDetailLogLine_aggregate.class);
> >> > > >>> Serde<BqRtDetailLogLine_aggregate> collectorSerde =
> >> > > >>> Serdes.serdeFrom(sumRecordsSerializer,sumRecordsDeserializer);
> >> > > >>>
> >> > > >>> StringSerializer stringSerializer = new StringSerializer();
> >> > > >>> StringDeserializer stringDeserializer = new
> StringDeserializer();
> >> > > >>> Serde<String> stringSerde =
> >> > > >>> Serdes.serdeFrom(stringSerializer,stringDeserializer);
> >> > > >>>
> >> > > >>> JsonDeserializer<RtDetailLogLine> prtRecordDeserializer = new
> >> > > >>> JsonDeserializer<>(RtDetailLogLine.class);
> >> > > >>> JsonSerializer<RtDetailLogLine> prtRecordJsonSerializer = new
> >> > > >>> JsonSerializer<>();
> >> > > >>> Serde<RtDetailLogLine> prtRecordSerde =
> >> > > >>> Serdes.serdeFrom(prtRecordJsonSerializer,
> prtRecordDeserializer);
> >> > > >>>
> >> > > >>> KStreamBuilder kStreamBuilder = new KStreamBuilder();
> >> > > >>>
> >> > > >>> KStream<String, RtDetailLogLine> rtDetailLines =
> >> > > >>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);
> >> > > >>>
> >> > > >>> // change the keying
> >> > > >>> KStream<AggKey, RtDetailLogLine> rtRekey =
> >> rtDetailLines.map((key,
> >> > > value)
> >> > > >>> -> new KeyValue<>(new AggKey(value), value));
> >> > > >>>
> >> > > >>> KTable<Windowed<AggKey>, BqRtDetailLogLine_aggregate> ktRtDetail
> >> =
> >> > > >>> rtRekey.groupByKey().aggregate(
> >> > > >>> BqRtDetailLogLine_aggregate::new,
> >> > > >>> new PRTAggregate(),
> >> > > >>> TimeWindows.of(60 * 60 * 1000L),
> >> > > >>> collectorSerde, "prt_minute_agg_stream");
> >> > > >>>
> >> > > >>> ktRtDetail.toStream().print();
> >> > > >>>
> >> > > >>> KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder,
> >> config);
> >> > > >>>
> >> > > >>> kafkaStreams.start();
> >> > > >>>
> >> > > >>>
> >> > > >>> On Tue, Dec 6, 2016 at 6:48 AM, Damian Guy <
> damian....@gmail.com>
> >>
> >> > > wrote:
> >> > > >>>
> >> > > >>> > Hi Jon,
> >> > > >>> >
> >> > > >>> > A couple of things: Which version are you using?
> >> > > >>> > Can you share the code you are using to the build the
> topology?
> >> > > >>> >
> >> > > >>> > Thanks,
> >> > > >>> > Damian
> >> > > >>> >
> >> > > >>> > On Tue, 6 Dec 2016 at 14:44 Jon Yeargers <
> >> jon.yearg...@cedexis.com
> >> > >
> >> > > >>> wrote:
> >> > > >>> >
> >> > > >>> > > Im using .map to convert my (k/v) string/Object to
> >> Object/Object
> >> > > but
> >> > > >>> > when I
> >> > > >>> > > chain this to an aggregation step Im getting this exception:
> >> > > >>> > >
> >> > > >>> > > Exception in thread "StreamThread-1"
> >> > java.lang.ClassCastException:
> >> > > >>> > > com.company.prtminuteagg.types.RtDetailLogLine cannot be
> >> cast to
> >> > > >>> > > java.lang.String
> >> > > >>> > > at
> >> > > >>> > >
> >> > > >>> > > org.apache.kafka.common.serialization.
> >> > StringSerializer.serialize(
> >> > > >>> > StringSerializer.java:24)
> >> > > >>> > > at
> >> > > >>> > >
> >> > > >>> > > org.apache.kafka.streams.processor.internals.
> >> > RecordCollector.send(
> >> > > >>> > RecordCollector.java:73)
> >> > > >>> > > at
> >> > > >>> > >
> >> > > >>> > > org.apache.kafka.streams.processor.internals.SinkNode.
> >> > > >>> > process(SinkNode.java:72)
> >> > > >>> > > at
> >> > > >>> > >
> >> > > >>> > > org.apache.kafka.streams.processor.internals.
> >> > > >>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> >> > > >>> > > at
> >> > > >>> > >
> >> > > >>> > > org.apache.kafka.streams.kstream.internals.KStreamFilter$
> >> > > >>> > KStreamFilterProcessor.process(KStreamFilter.java:44)
> >> > > >>> > > at
> >> > > >>> > >
> >> > > >>> > > org.apache.kafka.streams.processor.internals.
> >> > ProcessorNode.process(
> >> > > >>> > ProcessorNode.java:82)
> >> > > >>> > > at
> >> > > >>> > >
> >> > > >>> > > org.apache.kafka.streams.processor.internals.
> >> > > >>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> >> > > >>> > > at
> >> > > >>> > >
> >> > > >>> > > org.apache.kafka.streams.kstream.internals.KStreamMap$
> >> > > >>> > KStreamMapProcessor.process(KStreamMap.java:43)
> >> > > >>> > > at
> >> > > >>> > >
> >> > > >>> > > org.apache.kafka.streams.processor.internals.
> >> > ProcessorNode.process(
> >> > > >>> > ProcessorNode.java:82)
> >> > > >>> > > at
> >> > > >>> > >
> >> > > >>> > > org.apache.kafka.streams.processor.internals.
> >> > > >>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> >> > > >>> > > at
> >> > > >>> > >
> >> > > >>> > > org.apache.kafka.streams.processor.internals.
> >> > > >>> > SourceNode.process(SourceNode.java:66)
> >> > > >>> > > at
> >> > > >>> > >
> >> > > >>> > > org.apache.kafka.streams.processor.internals.
> >> > > >>> > StreamTask.process(StreamTask.java:181)
> >> > > >>> > > at
> >> > > >>> > >
> >> > > >>> > > org.apache.kafka.streams.processor.internals.
> >> > StreamThread.runLoop(
> >> > > >>> > StreamThread.java:436)
> >> > > >>> > > at
> >> > > >>> > >
> >> > > >>> > > org.apache.kafka.streams.processor.internals.
> >> > > >>> > StreamThread.run(StreamThread.java:242)
> >> > > >>> > >
> >> > > >>> > > My key object implements Serde and returns a JsonSerializer
> >> for
> >> > the
> >> > > >>> > > 'Serializer()' override.
> >> > > >>> > >
> >> > > >>> > > In the config for the topology Im
> >> > > >>> > > setting: config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> >> > > >>> > > AggKey.class.getName());
> >> > > >>> > >
> >> > > >>> > > Where else do I need to specify the (de)serializer for my
> key
> >> > > class?
> >> > > >>> > >
> >> > > >>> >
> >> > > >>>
> >> > > >>>
> >> > > >>
> >> > > >
> >> > >
> >> >
> >>
> >>
> >
>



-- 
-- Guozhang

Reply via email to