Re: Implementing custom key serializer
I'm not sure why you observed that aggregation works ok if String typed key is used. I think I agree with Radek that the problem comes from the value, and here is my understanding: 1. The source stream read from the topic named "rtDetailLines" is in type 2. After the map call, the result stream named "rtRekey" is in type 3. Then in aggregateByKey, when repartitioning is auto executed (i.e. the filter -> sink operators you saw in the stack trace), the default serdes for type < AggKey, String> is used, and hence value Serializer failed to serialize an RtDetailLogLine object, which matches the error message "Exception in thread "StreamThread-1" java.lang.ClassCastException: com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to java.lang.String" as well. Note that although we are only "grouping by key", we will use the value serde anyways for repartitioning. Anyways, I double checked the source code but cannot find any obvious bugs that causes not using the default serdes. Guozhang On Tue, Dec 6, 2016 at 12:40 PM, Jon Yeargers wrote: > Here's the solution (props to Damian G) > > JsonSerializer keySerializer = new JsonSerializer<>(); > JsonDeserializer keyDeserializer = new > JsonDeserializer<>(AggKey.class); > Serde keySerde = Serdes.serdeFrom(keySerializer, keyDeserializer); > > then for the aggregator call 'groupByKey(keySerde, prtRecordSerde)'. > > In the documentation where it says the 'no param' groupByKey will use the > default serializers - this doesn't seem to be true. > > On Tue, Dec 6, 2016 at 12:28 PM, Jon Yeargers > wrote: > > > Hmm. That's odd as the aggregation works ok if I use a String value for > > the key (and the corresponding String serde). > > > > This error only started occurring when I tried to substitute my 'custom' > > key for the original String. > > > > On Tue, Dec 6, 2016 at 12:24 PM, Radek Gruchalski > > wrote: > > > >> Yeah, I knew that already, this part of the error: > >> > >> > > >>> > > org.apache.kafka.streams.processor.internals. > >> > RecordCollector.send( > >> > > >>> > RecordCollector.java:73) > >> > >> points to this line: https://github.com/apache/ > kafka/blob/0.10.1/streams/ > >> src/main/java/org/apache/kafka/streams/processor/ > >> internals/RecordCollector.java#L73 > >> > >> which means that your error happens on the value, not the key. > >> > >> – > >> Best regards, > >> Radek Gruchalski > >> ra...@gruchalski.com > >> > >> > >> On December 6, 2016 at 9:18:53 PM, Jon Yeargers ( > jon.yearg...@cedexis.com) > >> wrote: > >> > >> 0.10.1.0 > >> > >> On Tue, Dec 6, 2016 at 11:11 AM, Radek Gruchalski > > >> wrote: > >> > >> > Jon, > >> > > >> > Are you using 0.10.1 or 0.10.0.1? > >> > > >> > – > >> > Best regards, > >> > Radek Gruchalski > >> > ra...@gruchalski.com > >> > > >> > > >> > On December 6, 2016 at 7:55:30 PM, Damian Guy (damian@gmail.com) > >> > wrote: > >> > > >> > Hi Jon, > >> > > >> > At a glance the code looks ok, i.e, i believe the aggregate() should > >> have > >> > picked up the default Serde set in your StreamsConfig. However, you > >> could > >> > try adding the Serdes to the groupBy(..) > >> > > >> > i.e., > >> > rtRekey.groupByKey(new AggKeySerDe(), yourValueSerde).aggregate(...) > >> > > >> > Thanks, > >> > Damian > >> > > >> > On Tue, 6 Dec 2016 at 18:42 Jon Yeargers > >> wrote: > >> > > >> > > It's just a bunch of public 'int' and 'String' values. There's an > >> empty > >> > > constructor and a copy constructor. > >> > > > >> > > For functions I override 'equals' and the requirements for 'serde' > >> > (close, > >> > > configure, serializer and deserializer). > >> > > > >> > > @Override > >> > > public Serializer serializer() { > >> > > JsonSerializer jsonSerializer = new JsonSerializer<>(); > >> > > return jsonSerializer; > >> > > } > >> > > > >> > > @Override > >> > > public Deserializer deserializer() { > >> > > JsonDeserializer jsonDeserializer = new > >> > > JsonDeserializer<>(); > >> > > return jsonDeserializer; > >> > > } > >> > > > >> > > > >> > > > >> > > > >> > > Which relates to: > >> > > > >> > > public class JsonSerializer implements Serializer { > >> > > > >> > > private Gson gson = new Gson(); > >> > > > >> > > @Override > >> > > public void configure(Map map, boolean b) { > >> > > > >> > > } > >> > > > >> > > @Override > >> > > public byte[] serialize(String topic, T t) { > >> > > return gson.toJson(t).getBytes(Charset.forName("UTF-8")); > >> > > } > >> > > > >> > > @Override > >> > > public void close() { > >> > > > >> > > } > >> > > } > >> > > > >> > > > >> > > > >> > > public class JsonDeserializer implements Deserializer { > >> > > > >> > > private Gson gson = new Gson(); > >> > > private Class deserializedClass; > >> > > > >> > > public JsonDeserializer(Class deserializedClass) { > >> > > this.deserializedClass = deserializedClass; > >> > > } > >> > > > >> > > public JsonDeserializer() { > >> > > } > >> > > > >> > > @Override > >> > > @SuppressWarnings("unchecked") > >> > > public void config
Re: Implementing custom key serializer
Here's the solution (props to Damian G) JsonSerializer keySerializer = new JsonSerializer<>(); JsonDeserializer keyDeserializer = new JsonDeserializer<>(AggKey.class); Serde keySerde = Serdes.serdeFrom(keySerializer, keyDeserializer); then for the aggregator call 'groupByKey(keySerde, prtRecordSerde)'. In the documentation where it says the 'no param' groupByKey will use the default serializers - this doesn't seem to be true. On Tue, Dec 6, 2016 at 12:28 PM, Jon Yeargers wrote: > Hmm. That's odd as the aggregation works ok if I use a String value for > the key (and the corresponding String serde). > > This error only started occurring when I tried to substitute my 'custom' > key for the original String. > > On Tue, Dec 6, 2016 at 12:24 PM, Radek Gruchalski > wrote: > >> Yeah, I knew that already, this part of the error: >> >> > > >>> > > org.apache.kafka.streams.processor.internals. >> > RecordCollector.send( >> > > >>> > RecordCollector.java:73) >> >> points to this line: https://github.com/apache/kafka/blob/0.10.1/streams/ >> src/main/java/org/apache/kafka/streams/processor/ >> internals/RecordCollector.java#L73 >> >> which means that your error happens on the value, not the key. >> >> – >> Best regards, >> Radek Gruchalski >> ra...@gruchalski.com >> >> >> On December 6, 2016 at 9:18:53 PM, Jon Yeargers (jon.yearg...@cedexis.com) >> wrote: >> >> 0.10.1.0 >> >> On Tue, Dec 6, 2016 at 11:11 AM, Radek Gruchalski >> wrote: >> >> > Jon, >> > >> > Are you using 0.10.1 or 0.10.0.1? >> > >> > – >> > Best regards, >> > Radek Gruchalski >> > ra...@gruchalski.com >> > >> > >> > On December 6, 2016 at 7:55:30 PM, Damian Guy (damian@gmail.com) >> > wrote: >> > >> > Hi Jon, >> > >> > At a glance the code looks ok, i.e, i believe the aggregate() should >> have >> > picked up the default Serde set in your StreamsConfig. However, you >> could >> > try adding the Serdes to the groupBy(..) >> > >> > i.e., >> > rtRekey.groupByKey(new AggKeySerDe(), yourValueSerde).aggregate(...) >> > >> > Thanks, >> > Damian >> > >> > On Tue, 6 Dec 2016 at 18:42 Jon Yeargers >> wrote: >> > >> > > It's just a bunch of public 'int' and 'String' values. There's an >> empty >> > > constructor and a copy constructor. >> > > >> > > For functions I override 'equals' and the requirements for 'serde' >> > (close, >> > > configure, serializer and deserializer). >> > > >> > > @Override >> > > public Serializer serializer() { >> > > JsonSerializer jsonSerializer = new JsonSerializer<>(); >> > > return jsonSerializer; >> > > } >> > > >> > > @Override >> > > public Deserializer deserializer() { >> > > JsonDeserializer jsonDeserializer = new >> > > JsonDeserializer<>(); >> > > return jsonDeserializer; >> > > } >> > > >> > > >> > > >> > > >> > > Which relates to: >> > > >> > > public class JsonSerializer implements Serializer { >> > > >> > > private Gson gson = new Gson(); >> > > >> > > @Override >> > > public void configure(Map map, boolean b) { >> > > >> > > } >> > > >> > > @Override >> > > public byte[] serialize(String topic, T t) { >> > > return gson.toJson(t).getBytes(Charset.forName("UTF-8")); >> > > } >> > > >> > > @Override >> > > public void close() { >> > > >> > > } >> > > } >> > > >> > > >> > > >> > > public class JsonDeserializer implements Deserializer { >> > > >> > > private Gson gson = new Gson(); >> > > private Class deserializedClass; >> > > >> > > public JsonDeserializer(Class deserializedClass) { >> > > this.deserializedClass = deserializedClass; >> > > } >> > > >> > > public JsonDeserializer() { >> > > } >> > > >> > > @Override >> > > @SuppressWarnings("unchecked") >> > > public void configure(Map map, boolean b) { >> > > if(deserializedClass == null) { >> > > deserializedClass = (Class) map.get("serializedClass"); >> > > } >> > > } >> > > >> > > @Override >> > > public T deserialize(String s, byte[] bytes) { >> > > if(bytes == null){ >> > > return null; >> > > } >> > > >> > > return gson.fromJson(new String(bytes),deserializedClass); >> > > >> > > } >> > > >> > > @Override >> > > public void close() { >> > > >> > > } >> > > } >> > > >> > > On Tue, Dec 6, 2016 at 10:37 AM, Radek Gruchalski < >> ra...@gruchalski.com> >> > > wrote: >> > > >> > > > Do you mind sharing the code of AggKey class? >> > > > >> > > > – >> > > > Best regards, >> > > > Radek Gruchalski >> > > > ra...@gruchalski.com >> > > > >> > > > >> > > > On December 6, 2016 at 7:26:51 PM, Jon Yeargers ( >> > > jon.yearg...@cedexis.com) >> > > > wrote: >> > > > >> > > > The 2nd. >> > > > >> > > > On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski < >> > ra...@gruchalski.com> >> > > > wrote: >> > > > >> > > >> Is the error happening at this stage? >> > > >> >> > > >> KStream rtRekey = rtDetailLines.map((key, >> > > value) >> > > >> -> new KeyValue<>(new AggKey(value), value)); >> > > >> >> > > >> or here: >> > > >> >> > > >> KTable, BqRtDetailLogLine_aggregate> ktRtDetail = >> > > >> rtRekey.groupByKey().aggregate( >> > > >> BqRtDetailLogLine_aggregate::new, >> > >
Re: Implementing custom key serializer
Hmm. That's odd as the aggregation works ok if I use a String value for the key (and the corresponding String serde). This error only started occurring when I tried to substitute my 'custom' key for the original String. On Tue, Dec 6, 2016 at 12:24 PM, Radek Gruchalski wrote: > Yeah, I knew that already, this part of the error: > > > > >>> > > org.apache.kafka.streams.processor.internals. > > RecordCollector.send( > > > >>> > RecordCollector.java:73) > > points to this line: https://github.com/apache/kafka/blob/0.10.1/ > streams/src/main/java/org/apache/kafka/streams/processor/internals/ > RecordCollector.java#L73 > > which means that your error happens on the value, not the key. > > – > Best regards, > Radek Gruchalski > ra...@gruchalski.com > > > On December 6, 2016 at 9:18:53 PM, Jon Yeargers (jon.yearg...@cedexis.com) > wrote: > > 0.10.1.0 > > On Tue, Dec 6, 2016 at 11:11 AM, Radek Gruchalski > wrote: > > > Jon, > > > > Are you using 0.10.1 or 0.10.0.1? > > > > – > > Best regards, > > Radek Gruchalski > > ra...@gruchalski.com > > > > > > On December 6, 2016 at 7:55:30 PM, Damian Guy (damian@gmail.com) > > wrote: > > > > Hi Jon, > > > > At a glance the code looks ok, i.e, i believe the aggregate() should > have > > picked up the default Serde set in your StreamsConfig. However, you > could > > try adding the Serdes to the groupBy(..) > > > > i.e., > > rtRekey.groupByKey(new AggKeySerDe(), yourValueSerde).aggregate(...) > > > > Thanks, > > Damian > > > > On Tue, 6 Dec 2016 at 18:42 Jon Yeargers > wrote: > > > > > It's just a bunch of public 'int' and 'String' values. There's an > empty > > > constructor and a copy constructor. > > > > > > For functions I override 'equals' and the requirements for 'serde' > > (close, > > > configure, serializer and deserializer). > > > > > > @Override > > > public Serializer serializer() { > > > JsonSerializer jsonSerializer = new JsonSerializer<>(); > > > return jsonSerializer; > > > } > > > > > > @Override > > > public Deserializer deserializer() { > > > JsonDeserializer jsonDeserializer = new > > > JsonDeserializer<>(); > > > return jsonDeserializer; > > > } > > > > > > > > > > > > > > > Which relates to: > > > > > > public class JsonSerializer implements Serializer { > > > > > > private Gson gson = new Gson(); > > > > > > @Override > > > public void configure(Map map, boolean b) { > > > > > > } > > > > > > @Override > > > public byte[] serialize(String topic, T t) { > > > return gson.toJson(t).getBytes(Charset.forName("UTF-8")); > > > } > > > > > > @Override > > > public void close() { > > > > > > } > > > } > > > > > > > > > > > > public class JsonDeserializer implements Deserializer { > > > > > > private Gson gson = new Gson(); > > > private Class deserializedClass; > > > > > > public JsonDeserializer(Class deserializedClass) { > > > this.deserializedClass = deserializedClass; > > > } > > > > > > public JsonDeserializer() { > > > } > > > > > > @Override > > > @SuppressWarnings("unchecked") > > > public void configure(Map map, boolean b) { > > > if(deserializedClass == null) { > > > deserializedClass = (Class) map.get("serializedClass"); > > > } > > > } > > > > > > @Override > > > public T deserialize(String s, byte[] bytes) { > > > if(bytes == null){ > > > return null; > > > } > > > > > > return gson.fromJson(new String(bytes),deserializedClass); > > > > > > } > > > > > > @Override > > > public void close() { > > > > > > } > > > } > > > > > > On Tue, Dec 6, 2016 at 10:37 AM, Radek Gruchalski < > ra...@gruchalski.com> > > > wrote: > > > > > > > Do you mind sharing the code of AggKey class? > > > > > > > > – > > > > Best regards, > > > > Radek Gruchalski > > > > ra...@gruchalski.com > > > > > > > > > > > > On December 6, 2016 at 7:26:51 PM, Jon Yeargers ( > > > jon.yearg...@cedexis.com) > > > > wrote: > > > > > > > > The 2nd. > > > > > > > > On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski < > > ra...@gruchalski.com> > > > > wrote: > > > > > > > >> Is the error happening at this stage? > > > >> > > > >> KStream rtRekey = rtDetailLines.map((key, > > > value) > > > >> -> new KeyValue<>(new AggKey(value), value)); > > > >> > > > >> or here: > > > >> > > > >> KTable, BqRtDetailLogLine_aggregate> ktRtDetail = > > > >> rtRekey.groupByKey().aggregate( > > > >> BqRtDetailLogLine_aggregate::new, > > > >> new PRTAggregate(), > > > >> TimeWindows.of(60 * 60 * 1000L), > > > >> collectorSerde, "prt_minute_agg_stream"); > > > >> > > > >> – > > > >> > > > >> Best regards, > > > >> Radek Gruchalski > > > >> ra...@gruchalski.com > > > >> > > > >> > > > >> On December 6, 2016 at 6:47:38 PM, Jon Yeargers ( > > > jon.yearg...@cedexis.com) > > > >> wrote: > > > >> > > > >> If I comment out the aggregation step and just .print the .map step > I > > > >> don't hit the error. It's coming from aggregating the non-String > key. > > > >> > > > >> On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski < > > ra...@gruchalski.com> > > > >> wrote: > > > >> > > > >>> Jon, > > > >>> > > > >>> Loo
Re: Implementing custom key serializer
Yeah, I knew that already, this part of the error: > > >>> > > org.apache.kafka.streams.processor.internals. > RecordCollector.send( > > >>> > RecordCollector.java:73) points to this line: https://github.com/apache/kafka/blob/0.10.1/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java#L73 which means that your error happens on the value, not the key. – Best regards, Radek Gruchalski ra...@gruchalski.com On December 6, 2016 at 9:18:53 PM, Jon Yeargers (jon.yearg...@cedexis.com) wrote: 0.10.1.0 On Tue, Dec 6, 2016 at 11:11 AM, Radek Gruchalski wrote: > Jon, > > Are you using 0.10.1 or 0.10.0.1? > > – > Best regards, > Radek Gruchalski > ra...@gruchalski.com > > > On December 6, 2016 at 7:55:30 PM, Damian Guy (damian@gmail.com) > wrote: > > Hi Jon, > > At a glance the code looks ok, i.e, i believe the aggregate() should have > picked up the default Serde set in your StreamsConfig. However, you could > try adding the Serdes to the groupBy(..) > > i.e., > rtRekey.groupByKey(new AggKeySerDe(), yourValueSerde).aggregate(...) > > Thanks, > Damian > > On Tue, 6 Dec 2016 at 18:42 Jon Yeargers wrote: > > > It's just a bunch of public 'int' and 'String' values. There's an empty > > constructor and a copy constructor. > > > > For functions I override 'equals' and the requirements for 'serde' > (close, > > configure, serializer and deserializer). > > > > @Override > > public Serializer serializer() { > > JsonSerializer jsonSerializer = new JsonSerializer<>(); > > return jsonSerializer; > > } > > > > @Override > > public Deserializer deserializer() { > > JsonDeserializer jsonDeserializer = new > > JsonDeserializer<>(); > > return jsonDeserializer; > > } > > > > > > > > > > Which relates to: > > > > public class JsonSerializer implements Serializer { > > > > private Gson gson = new Gson(); > > > > @Override > > public void configure(Map map, boolean b) { > > > > } > > > > @Override > > public byte[] serialize(String topic, T t) { > > return gson.toJson(t).getBytes(Charset.forName("UTF-8")); > > } > > > > @Override > > public void close() { > > > > } > > } > > > > > > > > public class JsonDeserializer implements Deserializer { > > > > private Gson gson = new Gson(); > > private Class deserializedClass; > > > > public JsonDeserializer(Class deserializedClass) { > > this.deserializedClass = deserializedClass; > > } > > > > public JsonDeserializer() { > > } > > > > @Override > > @SuppressWarnings("unchecked") > > public void configure(Map map, boolean b) { > > if(deserializedClass == null) { > > deserializedClass = (Class) map.get("serializedClass"); > > } > > } > > > > @Override > > public T deserialize(String s, byte[] bytes) { > > if(bytes == null){ > > return null; > > } > > > > return gson.fromJson(new String(bytes),deserializedClass); > > > > } > > > > @Override > > public void close() { > > > > } > > } > > > > On Tue, Dec 6, 2016 at 10:37 AM, Radek Gruchalski > > wrote: > > > > > Do you mind sharing the code of AggKey class? > > > > > > – > > > Best regards, > > > Radek Gruchalski > > > ra...@gruchalski.com > > > > > > > > > On December 6, 2016 at 7:26:51 PM, Jon Yeargers ( > > jon.yearg...@cedexis.com) > > > wrote: > > > > > > The 2nd. > > > > > > On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski < > ra...@gruchalski.com> > > > wrote: > > > > > >> Is the error happening at this stage? > > >> > > >> KStream rtRekey = rtDetailLines.map((key, > > value) > > >> -> new KeyValue<>(new AggKey(value), value)); > > >> > > >> or here: > > >> > > >> KTable, BqRtDetailLogLine_aggregate> ktRtDetail = > > >> rtRekey.groupByKey().aggregate( > > >> BqRtDetailLogLine_aggregate::new, > > >> new PRTAggregate(), > > >> TimeWindows.of(60 * 60 * 1000L), > > >> collectorSerde, "prt_minute_agg_stream"); > > >> > > >> – > > >> > > >> Best regards, > > >> Radek Gruchalski > > >> ra...@gruchalski.com > > >> > > >> > > >> On December 6, 2016 at 6:47:38 PM, Jon Yeargers ( > > jon.yearg...@cedexis.com) > > >> wrote: > > >> > > >> If I comment out the aggregation step and just .print the .map step I > > >> don't hit the error. It's coming from aggregating the non-String key. > > >> > > >> On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski < > ra...@gruchalski.com> > > >> wrote: > > >> > > >>> Jon, > > >>> > > >>> Looking at your code: > > >>> > > >>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > > >>> Serdes.String().getClass().getName()); > > >>> > > >>> and later: > > >>> > > >>> KStream rtDetailLines = > > >>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC); > > >>> > > >>> Is RtDetailLogLine inheriting from String? It is not, as the error > > >>> suggests. > > >>> You may have to write your own Serializer / Deserializer for > > >>> RtDetailLogLine. > > >>> > > >>> – > > >>> Best regards, > > >>> Radek Gruchalski > > >>> ra...@gruchalski.com > > >>> > > >>> > > >>> On December 6, 2016 at 6:28:23 PM, Jon Yeargers ( > > >>> jon.yearg...@cedexis.com) wrote: > > >>> > > >>> Using 0.1
Re: Implementing custom key serializer
0.10.1.0 On Tue, Dec 6, 2016 at 11:11 AM, Radek Gruchalski wrote: > Jon, > > Are you using 0.10.1 or 0.10.0.1? > > – > Best regards, > Radek Gruchalski > ra...@gruchalski.com > > > On December 6, 2016 at 7:55:30 PM, Damian Guy (damian@gmail.com) > wrote: > > Hi Jon, > > At a glance the code looks ok, i.e, i believe the aggregate() should have > picked up the default Serde set in your StreamsConfig. However, you could > try adding the Serdes to the groupBy(..) > > i.e., > rtRekey.groupByKey(new AggKeySerDe(), yourValueSerde).aggregate(...) > > Thanks, > Damian > > On Tue, 6 Dec 2016 at 18:42 Jon Yeargers wrote: > > > It's just a bunch of public 'int' and 'String' values. There's an empty > > constructor and a copy constructor. > > > > For functions I override 'equals' and the requirements for 'serde' > (close, > > configure, serializer and deserializer). > > > > @Override > > public Serializer serializer() { > > JsonSerializer jsonSerializer = new JsonSerializer<>(); > > return jsonSerializer; > > } > > > > @Override > > public Deserializer deserializer() { > > JsonDeserializer jsonDeserializer = new > > JsonDeserializer<>(); > > return jsonDeserializer; > > } > > > > > > > > > > Which relates to: > > > > public class JsonSerializer implements Serializer { > > > > private Gson gson = new Gson(); > > > > @Override > > public void configure(Map map, boolean b) { > > > > } > > > > @Override > > public byte[] serialize(String topic, T t) { > > return gson.toJson(t).getBytes(Charset.forName("UTF-8")); > > } > > > > @Override > > public void close() { > > > > } > > } > > > > > > > > public class JsonDeserializer implements Deserializer { > > > > private Gson gson = new Gson(); > > private Class deserializedClass; > > > > public JsonDeserializer(Class deserializedClass) { > > this.deserializedClass = deserializedClass; > > } > > > > public JsonDeserializer() { > > } > > > > @Override > > @SuppressWarnings("unchecked") > > public void configure(Map map, boolean b) { > > if(deserializedClass == null) { > > deserializedClass = (Class) map.get("serializedClass"); > > } > > } > > > > @Override > > public T deserialize(String s, byte[] bytes) { > > if(bytes == null){ > > return null; > > } > > > > return gson.fromJson(new String(bytes),deserializedClass); > > > > } > > > > @Override > > public void close() { > > > > } > > } > > > > On Tue, Dec 6, 2016 at 10:37 AM, Radek Gruchalski > > wrote: > > > > > Do you mind sharing the code of AggKey class? > > > > > > – > > > Best regards, > > > Radek Gruchalski > > > ra...@gruchalski.com > > > > > > > > > On December 6, 2016 at 7:26:51 PM, Jon Yeargers ( > > jon.yearg...@cedexis.com) > > > wrote: > > > > > > The 2nd. > > > > > > On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski < > ra...@gruchalski.com> > > > wrote: > > > > > >> Is the error happening at this stage? > > >> > > >> KStream rtRekey = rtDetailLines.map((key, > > value) > > >> -> new KeyValue<>(new AggKey(value), value)); > > >> > > >> or here: > > >> > > >> KTable, BqRtDetailLogLine_aggregate> ktRtDetail = > > >> rtRekey.groupByKey().aggregate( > > >> BqRtDetailLogLine_aggregate::new, > > >> new PRTAggregate(), > > >> TimeWindows.of(60 * 60 * 1000L), > > >> collectorSerde, "prt_minute_agg_stream"); > > >> > > >> – > > >> > > >> Best regards, > > >> Radek Gruchalski > > >> ra...@gruchalski.com > > >> > > >> > > >> On December 6, 2016 at 6:47:38 PM, Jon Yeargers ( > > jon.yearg...@cedexis.com) > > >> wrote: > > >> > > >> If I comment out the aggregation step and just .print the .map step I > > >> don't hit the error. It's coming from aggregating the non-String key. > > >> > > >> On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski < > ra...@gruchalski.com> > > >> wrote: > > >> > > >>> Jon, > > >>> > > >>> Looking at your code: > > >>> > > >>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > > >>> Serdes.String().getClass().getName()); > > >>> > > >>> and later: > > >>> > > >>> KStream rtDetailLines = > > >>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC); > > >>> > > >>> Is RtDetailLogLine inheriting from String? It is not, as the error > > >>> suggests. > > >>> You may have to write your own Serializer / Deserializer for > > >>> RtDetailLogLine. > > >>> > > >>> – > > >>> Best regards, > > >>> Radek Gruchalski > > >>> ra...@gruchalski.com > > >>> > > >>> > > >>> On December 6, 2016 at 6:28:23 PM, Jon Yeargers ( > > >>> jon.yearg...@cedexis.com) wrote: > > >>> > > >>> Using 0.10.1.0 > > >>> > > >>> This is my topology: > > >>> > > >>> Properties config = new Properties(); > > >>> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP); > > >>> config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP); > > >>> config.put(StreamsConfig.APPLICATION_ID_CON
Re: Implementing custom key serializer
Jon, Are you using 0.10.1 or 0.10.0.1? – Best regards, Radek Gruchalski ra...@gruchalski.com On December 6, 2016 at 7:55:30 PM, Damian Guy (damian@gmail.com) wrote: Hi Jon, At a glance the code looks ok, i.e, i believe the aggregate() should have picked up the default Serde set in your StreamsConfig. However, you could try adding the Serdes to the groupBy(..) i.e., rtRekey.groupByKey(new AggKeySerDe(), yourValueSerde).aggregate(...) Thanks, Damian On Tue, 6 Dec 2016 at 18:42 Jon Yeargers wrote: > It's just a bunch of public 'int' and 'String' values. There's an empty > constructor and a copy constructor. > > For functions I override 'equals' and the requirements for 'serde' (close, > configure, serializer and deserializer). > > @Override > public Serializer serializer() { > JsonSerializer jsonSerializer = new JsonSerializer<>(); > return jsonSerializer; > } > > @Override > public Deserializer deserializer() { > JsonDeserializer jsonDeserializer = new > JsonDeserializer<>(); > return jsonDeserializer; > } > > > > > Which relates to: > > public class JsonSerializer implements Serializer { > > private Gson gson = new Gson(); > > @Override > public void configure(Map map, boolean b) { > > } > > @Override > public byte[] serialize(String topic, T t) { > return gson.toJson(t).getBytes(Charset.forName("UTF-8")); > } > > @Override > public void close() { > > } > } > > > > public class JsonDeserializer implements Deserializer { > > private Gson gson = new Gson(); > private Class deserializedClass; > > public JsonDeserializer(Class deserializedClass) { > this.deserializedClass = deserializedClass; > } > > public JsonDeserializer() { > } > > @Override > @SuppressWarnings("unchecked") > public void configure(Map map, boolean b) { > if(deserializedClass == null) { > deserializedClass = (Class) map.get("serializedClass"); > } > } > > @Override > public T deserialize(String s, byte[] bytes) { > if(bytes == null){ > return null; > } > > return gson.fromJson(new String(bytes),deserializedClass); > > } > > @Override > public void close() { > > } > } > > On Tue, Dec 6, 2016 at 10:37 AM, Radek Gruchalski > wrote: > > > Do you mind sharing the code of AggKey class? > > > > – > > Best regards, > > Radek Gruchalski > > ra...@gruchalski.com > > > > > > On December 6, 2016 at 7:26:51 PM, Jon Yeargers ( > jon.yearg...@cedexis.com) > > wrote: > > > > The 2nd. > > > > On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski > > wrote: > > > >> Is the error happening at this stage? > >> > >> KStream rtRekey = rtDetailLines.map((key, > value) > >> -> new KeyValue<>(new AggKey(value), value)); > >> > >> or here: > >> > >> KTable, BqRtDetailLogLine_aggregate> ktRtDetail = > >> rtRekey.groupByKey().aggregate( > >> BqRtDetailLogLine_aggregate::new, > >> new PRTAggregate(), > >> TimeWindows.of(60 * 60 * 1000L), > >> collectorSerde, "prt_minute_agg_stream"); > >> > >> – > >> > >> Best regards, > >> Radek Gruchalski > >> ra...@gruchalski.com > >> > >> > >> On December 6, 2016 at 6:47:38 PM, Jon Yeargers ( > jon.yearg...@cedexis.com) > >> wrote: > >> > >> If I comment out the aggregation step and just .print the .map step I > >> don't hit the error. It's coming from aggregating the non-String key. > >> > >> On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski > >> wrote: > >> > >>> Jon, > >>> > >>> Looking at your code: > >>> > >>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > >>> Serdes.String().getClass().getName()); > >>> > >>> and later: > >>> > >>> KStream rtDetailLines = > >>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC); > >>> > >>> Is RtDetailLogLine inheriting from String? It is not, as the error > >>> suggests. > >>> You may have to write your own Serializer / Deserializer for > >>> RtDetailLogLine. > >>> > >>> – > >>> Best regards, > >>> Radek Gruchalski > >>> ra...@gruchalski.com > >>> > >>> > >>> On December 6, 2016 at 6:28:23 PM, Jon Yeargers ( > >>> jon.yearg...@cedexis.com) wrote: > >>> > >>> Using 0.10.1.0 > >>> > >>> This is my topology: > >>> > >>> Properties config = new Properties(); > >>> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP); > >>> config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP); > >>> config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" ); > >>> config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > >>> AggKey.class.getName()); > >>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > >>> Serdes.String().getClass().getName()); > >>> > >>> JsonSerializer sumRecordsSerializer = new > >>> JsonSerializer<>(); > >>> JsonDeserializer sumRecordsDeserializer = > >>> new > >>> JsonDeserializer<>(BqRtDetailLogLine_aggregate.class); > >>> Serde collectorSerde = > >>> Serdes.serdeFrom(sumRecordsSerializer,sumRecordsDeserial
Re: Implementing custom key serializer
Hi Jon, At a glance the code looks ok, i.e, i believe the aggregate() should have picked up the default Serde set in your StreamsConfig. However, you could try adding the Serdes to the groupBy(..) i.e., rtRekey.groupByKey(new AggKeySerDe(), yourValueSerde).aggregate(...) Thanks, Damian On Tue, 6 Dec 2016 at 18:42 Jon Yeargers wrote: > It's just a bunch of public 'int' and 'String' values. There's an empty > constructor and a copy constructor. > > For functions I override 'equals' and the requirements for 'serde' (close, > configure, serializer and deserializer). > > @Override > public Serializer serializer() { > JsonSerializer jsonSerializer = new JsonSerializer<>(); > return jsonSerializer; > } > > @Override > public Deserializer deserializer() { > JsonDeserializer jsonDeserializer = new > JsonDeserializer<>(); > return jsonDeserializer; > } > > > > > Which relates to: > > public class JsonSerializer implements Serializer { > > private Gson gson = new Gson(); > > @Override > public void configure(Map map, boolean b) { > > } > > @Override > public byte[] serialize(String topic, T t) { > return gson.toJson(t).getBytes(Charset.forName("UTF-8")); > } > > @Override > public void close() { > > } > } > > > > public class JsonDeserializer implements Deserializer { > > private Gson gson = new Gson(); > private Class deserializedClass; > > public JsonDeserializer(Class deserializedClass) { > this.deserializedClass = deserializedClass; > } > > public JsonDeserializer() { > } > > @Override > @SuppressWarnings("unchecked") > public void configure(Map map, boolean b) { > if(deserializedClass == null) { > deserializedClass = (Class) map.get("serializedClass"); > } > } > > @Override > public T deserialize(String s, byte[] bytes) { > if(bytes == null){ > return null; > } > > return gson.fromJson(new String(bytes),deserializedClass); > > } > > @Override > public void close() { > > } > } > > On Tue, Dec 6, 2016 at 10:37 AM, Radek Gruchalski > wrote: > > > Do you mind sharing the code of AggKey class? > > > > – > > Best regards, > > Radek Gruchalski > > ra...@gruchalski.com > > > > > > On December 6, 2016 at 7:26:51 PM, Jon Yeargers ( > jon.yearg...@cedexis.com) > > wrote: > > > > The 2nd. > > > > On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski > > wrote: > > > >> Is the error happening at this stage? > >> > >> KStream rtRekey = rtDetailLines.map((key, > value) > >> -> new KeyValue<>(new AggKey(value), value)); > >> > >> or here: > >> > >> KTable, BqRtDetailLogLine_aggregate> ktRtDetail = > >> rtRekey.groupByKey().aggregate( > >> BqRtDetailLogLine_aggregate::new, > >> new PRTAggregate(), > >> TimeWindows.of(60 * 60 * 1000L), > >> collectorSerde, "prt_minute_agg_stream"); > >> > >> – > >> > >> Best regards, > >> Radek Gruchalski > >> ra...@gruchalski.com > >> > >> > >> On December 6, 2016 at 6:47:38 PM, Jon Yeargers ( > jon.yearg...@cedexis.com) > >> wrote: > >> > >> If I comment out the aggregation step and just .print the .map step I > >> don't hit the error. It's coming from aggregating the non-String key. > >> > >> On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski > >> wrote: > >> > >>> Jon, > >>> > >>> Looking at your code: > >>> > >>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > >>> Serdes.String().getClass().getName()); > >>> > >>> and later: > >>> > >>> KStream rtDetailLines = > >>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC); > >>> > >>> Is RtDetailLogLine inheriting from String? It is not, as the error > >>> suggests. > >>> You may have to write your own Serializer / Deserializer for > >>> RtDetailLogLine. > >>> > >>> – > >>> Best regards, > >>> Radek Gruchalski > >>> ra...@gruchalski.com > >>> > >>> > >>> On December 6, 2016 at 6:28:23 PM, Jon Yeargers ( > >>> jon.yearg...@cedexis.com) wrote: > >>> > >>> Using 0.10.1.0 > >>> > >>> This is my topology: > >>> > >>> Properties config = new Properties(); > >>> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP); > >>> config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP); > >>> config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" ); > >>> config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > >>> AggKey.class.getName()); > >>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > >>> Serdes.String().getClass().getName()); > >>> > >>> JsonSerializer sumRecordsSerializer = new > >>> JsonSerializer<>(); > >>> JsonDeserializer sumRecordsDeserializer = > >>> new > >>> JsonDeserializer<>(BqRtDetailLogLine_aggregate.class); > >>> Serde collectorSerde = > >>> Serdes.serdeFrom(sumRecordsSerializer,sumRecordsDeserializer); > >>> > >>> StringSerializer stringSerializer = new StringSerializer(); > >>> StringDeserializer stringDeserializer = new StringDeserializer(); > >>> Serde stringSerde
Re: Implementing custom key serializer
It's just a bunch of public 'int' and 'String' values. There's an empty constructor and a copy constructor. For functions I override 'equals' and the requirements for 'serde' (close, configure, serializer and deserializer). @Override public Serializer serializer() { JsonSerializer jsonSerializer = new JsonSerializer<>(); return jsonSerializer; } @Override public Deserializer deserializer() { JsonDeserializer jsonDeserializer = new JsonDeserializer<>(); return jsonDeserializer; } Which relates to: public class JsonSerializer implements Serializer { private Gson gson = new Gson(); @Override public void configure(Map map, boolean b) { } @Override public byte[] serialize(String topic, T t) { return gson.toJson(t).getBytes(Charset.forName("UTF-8")); } @Override public void close() { } } public class JsonDeserializer implements Deserializer { private Gson gson = new Gson(); private Class deserializedClass; public JsonDeserializer(Class deserializedClass) { this.deserializedClass = deserializedClass; } public JsonDeserializer() { } @Override @SuppressWarnings("unchecked") public void configure(Map map, boolean b) { if(deserializedClass == null) { deserializedClass = (Class) map.get("serializedClass"); } } @Override public T deserialize(String s, byte[] bytes) { if(bytes == null){ return null; } return gson.fromJson(new String(bytes),deserializedClass); } @Override public void close() { } } On Tue, Dec 6, 2016 at 10:37 AM, Radek Gruchalski wrote: > Do you mind sharing the code of AggKey class? > > – > Best regards, > Radek Gruchalski > ra...@gruchalski.com > > > On December 6, 2016 at 7:26:51 PM, Jon Yeargers (jon.yearg...@cedexis.com) > wrote: > > The 2nd. > > On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski > wrote: > >> Is the error happening at this stage? >> >> KStream rtRekey = rtDetailLines.map((key, value) >> -> new KeyValue<>(new AggKey(value), value)); >> >> or here: >> >> KTable, BqRtDetailLogLine_aggregate> ktRtDetail = >> rtRekey.groupByKey().aggregate( >> BqRtDetailLogLine_aggregate::new, >> new PRTAggregate(), >> TimeWindows.of(60 * 60 * 1000L), >> collectorSerde, "prt_minute_agg_stream"); >> >> – >> >> Best regards, >> Radek Gruchalski >> ra...@gruchalski.com >> >> >> On December 6, 2016 at 6:47:38 PM, Jon Yeargers (jon.yearg...@cedexis.com) >> wrote: >> >> If I comment out the aggregation step and just .print the .map step I >> don't hit the error. It's coming from aggregating the non-String key. >> >> On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski >> wrote: >> >>> Jon, >>> >>> Looking at your code: >>> >>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, >>> Serdes.String().getClass().getName()); >>> >>> and later: >>> >>> KStream rtDetailLines = >>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC); >>> >>> Is RtDetailLogLine inheriting from String? It is not, as the error >>> suggests. >>> You may have to write your own Serializer / Deserializer for >>> RtDetailLogLine. >>> >>> – >>> Best regards, >>> Radek Gruchalski >>> ra...@gruchalski.com >>> >>> >>> On December 6, 2016 at 6:28:23 PM, Jon Yeargers ( >>> jon.yearg...@cedexis.com) wrote: >>> >>> Using 0.10.1.0 >>> >>> This is my topology: >>> >>> Properties config = new Properties(); >>> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP); >>> config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP); >>> config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" ); >>> config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, >>> AggKey.class.getName()); >>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, >>> Serdes.String().getClass().getName()); >>> >>> JsonSerializer sumRecordsSerializer = new >>> JsonSerializer<>(); >>> JsonDeserializer sumRecordsDeserializer = >>> new >>> JsonDeserializer<>(BqRtDetailLogLine_aggregate.class); >>> Serde collectorSerde = >>> Serdes.serdeFrom(sumRecordsSerializer,sumRecordsDeserializer); >>> >>> StringSerializer stringSerializer = new StringSerializer(); >>> StringDeserializer stringDeserializer = new StringDeserializer(); >>> Serde stringSerde = >>> Serdes.serdeFrom(stringSerializer,stringDeserializer); >>> >>> JsonDeserializer prtRecordDeserializer = new >>> JsonDeserializer<>(RtDetailLogLine.class); >>> JsonSerializer prtRecordJsonSerializer = new >>> JsonSerializer<>(); >>> Serde prtRecordSerde = >>> Serdes.serdeFrom(prtRecordJsonSerializer,prtRecordDeserializer); >>> >>> KStreamBuilder kStreamBuilder = new KStreamBuilder(); >>> >>> KStream rtDetailLines = >>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC); >>> >>> // change the keying >>> KStream rtRekey = rtDetailLines.map((key, value) >>> -> new KeyValue<>(new AggKey(value), value)); >>> >>> KTable, BqRtDetailLogLine_aggregate> ktRtDetail = >>> r
Re: Implementing custom key serializer
Do you mind sharing the code of AggKey class? – Best regards, Radek Gruchalski ra...@gruchalski.com On December 6, 2016 at 7:26:51 PM, Jon Yeargers (jon.yearg...@cedexis.com) wrote: The 2nd. On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski wrote: > Is the error happening at this stage? > > KStream rtRekey = rtDetailLines.map((key, value) -> > new KeyValue<>(new AggKey(value), value)); > > or here: > > KTable, BqRtDetailLogLine_aggregate> ktRtDetail = > rtRekey.groupByKey().aggregate( > BqRtDetailLogLine_aggregate::new, > new PRTAggregate(), > TimeWindows.of(60 * 60 * 1000L), > collectorSerde, "prt_minute_agg_stream"); > > – > > Best regards, > Radek Gruchalski > ra...@gruchalski.com > > > On December 6, 2016 at 6:47:38 PM, Jon Yeargers (jon.yearg...@cedexis.com) > wrote: > > If I comment out the aggregation step and just .print the .map step I > don't hit the error. It's coming from aggregating the non-String key. > > On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski > wrote: > >> Jon, >> >> Looking at your code: >> >> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, >> Serdes.String().getClass().getName()); >> >> and later: >> >> KStream rtDetailLines = >> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC); >> >> Is RtDetailLogLine inheriting from String? It is not, as the error >> suggests. >> You may have to write your own Serializer / Deserializer for >> RtDetailLogLine. >> >> – >> Best regards, >> Radek Gruchalski >> ra...@gruchalski.com >> >> >> On December 6, 2016 at 6:28:23 PM, Jon Yeargers (jon.yearg...@cedexis.com) >> wrote: >> >> Using 0.10.1.0 >> >> This is my topology: >> >> Properties config = new Properties(); >> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP); >> config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP); >> config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" ); >> config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, AggKey.class.getName()); >> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, >> Serdes.String().getClass().getName()); >> >> JsonSerializer sumRecordsSerializer = new >> JsonSerializer<>(); >> JsonDeserializer sumRecordsDeserializer = >> new >> JsonDeserializer<>(BqRtDetailLogLine_aggregate.class); >> Serde collectorSerde = >> Serdes.serdeFrom(sumRecordsSerializer,sumRecordsDeserializer); >> >> StringSerializer stringSerializer = new StringSerializer(); >> StringDeserializer stringDeserializer = new StringDeserializer(); >> Serde stringSerde = >> Serdes.serdeFrom(stringSerializer,stringDeserializer); >> >> JsonDeserializer prtRecordDeserializer = new >> JsonDeserializer<>(RtDetailLogLine.class); >> JsonSerializer prtRecordJsonSerializer = new >> JsonSerializer<>(); >> Serde prtRecordSerde = >> Serdes.serdeFrom(prtRecordJsonSerializer,prtRecordDeserializer); >> >> KStreamBuilder kStreamBuilder = new KStreamBuilder(); >> >> KStream rtDetailLines = >> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC); >> >> // change the keying >> KStream rtRekey = rtDetailLines.map((key, value) >> -> new KeyValue<>(new AggKey(value), value)); >> >> KTable, BqRtDetailLogLine_aggregate> ktRtDetail = >> rtRekey.groupByKey().aggregate( >> BqRtDetailLogLine_aggregate::new, >> new PRTAggregate(), >> TimeWindows.of(60 * 60 * 1000L), >> collectorSerde, "prt_minute_agg_stream"); >> >> ktRtDetail.toStream().print(); >> >> KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config); >> >> kafkaStreams.start(); >> >> >> On Tue, Dec 6, 2016 at 6:48 AM, Damian Guy wrote: >> >> > Hi Jon, >> > >> > A couple of things: Which version are you using? >> > Can you share the code you are using to the build the topology? >> > >> > Thanks, >> > Damian >> > >> > On Tue, 6 Dec 2016 at 14:44 Jon Yeargers >> wrote: >> > >> > > Im using .map to convert my (k/v) string/Object to Object/Object but >> > when I >> > > chain this to an aggregation step Im getting this exception: >> > > >> > > Exception in thread "StreamThread-1" java.lang.ClassCastException: >> > > com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to >> > > java.lang.String >> > > at >> > > >> > > org.apache.kafka.common.serialization.StringSerializer.serialize( >> > StringSerializer.java:24) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals.RecordCollector.send( >> > RecordCollector.java:73) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals.SinkNode. >> > process(SinkNode.java:72) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals. >> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) >> > > at >> > > >> > > org.apache.kafka.streams.kstream.internals.KStreamFilter$ >> > KStreamFilterProcessor.process(KStreamFilter.java:44) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( >> > ProcessorNode.java:82) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals. >> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) >> > > at >> > > >> > > org.apa
Re: Implementing custom key serializer
The 2nd. On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski wrote: > Is the error happening at this stage? > > KStream rtRekey = rtDetailLines.map((key, value) -> > new KeyValue<>(new AggKey(value), value)); > > or here: > > KTable, BqRtDetailLogLine_aggregate> ktRtDetail = > rtRekey.groupByKey().aggregate( > BqRtDetailLogLine_aggregate::new, > new PRTAggregate(), > TimeWindows.of(60 * 60 * 1000L), > collectorSerde, "prt_minute_agg_stream"); > > – > > Best regards, > Radek Gruchalski > ra...@gruchalski.com > > > On December 6, 2016 at 6:47:38 PM, Jon Yeargers (jon.yearg...@cedexis.com) > wrote: > > If I comment out the aggregation step and just .print the .map step I > don't hit the error. It's coming from aggregating the non-String key. > > On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski > wrote: > >> Jon, >> >> Looking at your code: >> >> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, >> Serdes.String().getClass().getName()); >> >> and later: >> >> KStream rtDetailLines = >> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC); >> >> Is RtDetailLogLine inheriting from String? It is not, as the error >> suggests. >> You may have to write your own Serializer / Deserializer for >> RtDetailLogLine. >> >> – >> Best regards, >> Radek Gruchalski >> ra...@gruchalski.com >> >> >> On December 6, 2016 at 6:28:23 PM, Jon Yeargers (jon.yearg...@cedexis.com) >> wrote: >> >> Using 0.10.1.0 >> >> This is my topology: >> >> Properties config = new Properties(); >> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP); >> config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP); >> config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" ); >> config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, AggKey.class.getName()); >> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, >> Serdes.String().getClass().getName()); >> >> JsonSerializer sumRecordsSerializer = new >> JsonSerializer<>(); >> JsonDeserializer sumRecordsDeserializer = >> new >> JsonDeserializer<>(BqRtDetailLogLine_aggregate.class); >> Serde collectorSerde = >> Serdes.serdeFrom(sumRecordsSerializer,sumRecordsDeserializer); >> >> StringSerializer stringSerializer = new StringSerializer(); >> StringDeserializer stringDeserializer = new StringDeserializer(); >> Serde stringSerde = >> Serdes.serdeFrom(stringSerializer,stringDeserializer); >> >> JsonDeserializer prtRecordDeserializer = new >> JsonDeserializer<>(RtDetailLogLine.class); >> JsonSerializer prtRecordJsonSerializer = new >> JsonSerializer<>(); >> Serde prtRecordSerde = >> Serdes.serdeFrom(prtRecordJsonSerializer,prtRecordDeserializer); >> >> KStreamBuilder kStreamBuilder = new KStreamBuilder(); >> >> KStream rtDetailLines = >> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC); >> >> // change the keying >> KStream rtRekey = rtDetailLines.map((key, value) >> -> new KeyValue<>(new AggKey(value), value)); >> >> KTable, BqRtDetailLogLine_aggregate> ktRtDetail = >> rtRekey.groupByKey().aggregate( >> BqRtDetailLogLine_aggregate::new, >> new PRTAggregate(), >> TimeWindows.of(60 * 60 * 1000L), >> collectorSerde, "prt_minute_agg_stream"); >> >> ktRtDetail.toStream().print(); >> >> KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config); >> >> kafkaStreams.start(); >> >> >> On Tue, Dec 6, 2016 at 6:48 AM, Damian Guy wrote: >> >> > Hi Jon, >> > >> > A couple of things: Which version are you using? >> > Can you share the code you are using to the build the topology? >> > >> > Thanks, >> > Damian >> > >> > On Tue, 6 Dec 2016 at 14:44 Jon Yeargers >> wrote: >> > >> > > Im using .map to convert my (k/v) string/Object to Object/Object but >> > when I >> > > chain this to an aggregation step Im getting this exception: >> > > >> > > Exception in thread "StreamThread-1" java.lang.ClassCastException: >> > > com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to >> > > java.lang.String >> > > at >> > > >> > > org.apache.kafka.common.serialization.StringSerializer.serialize( >> > StringSerializer.java:24) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals.RecordCollector.send( >> > RecordCollector.java:73) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals.SinkNode. >> > process(SinkNode.java:72) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals. >> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) >> > > at >> > > >> > > org.apache.kafka.streams.kstream.internals.KStreamFilter$ >> > KStreamFilterProcessor.process(KStreamFilter.java:44) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( >> > ProcessorNode.java:82) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals. >> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) >> > > at >> > > >> > > org.apache.kafka.streams.kstream.internals.KStreamMap$ >> > KStreamMapProcessor.process(KStreamMap.java:43) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals.ProcessorNode.pr
Re: Implementing custom key serializer
Is the error happening at this stage? KStream rtRekey = rtDetailLines.map((key, value) -> new KeyValue<>(new AggKey(value), value)); or here: KTable, BqRtDetailLogLine_aggregate> ktRtDetail = rtRekey.groupByKey().aggregate( BqRtDetailLogLine_aggregate::new, new PRTAggregate(), TimeWindows.of(60 * 60 * 1000L), collectorSerde, "prt_minute_agg_stream"); – Best regards, Radek Gruchalski ra...@gruchalski.com On December 6, 2016 at 6:47:38 PM, Jon Yeargers (jon.yearg...@cedexis.com) wrote: If I comment out the aggregation step and just .print the .map step I don't hit the error. It's coming from aggregating the non-String key. On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski wrote: > Jon, > > Looking at your code: > > config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > Serdes.String().getClass().getName()); > > and later: > > KStream rtDetailLines = > kStreamBuilder.stream(stringSerde, > prtRecordSerde, TOPIC); > > Is RtDetailLogLine inheriting from String? It is not, as the error > suggests. > You may have to write your own Serializer / Deserializer for > RtDetailLogLine. > > – > Best regards, > Radek Gruchalski > ra...@gruchalski.com > > > On December 6, 2016 at 6:28:23 PM, Jon Yeargers (jon.yearg...@cedexis.com) > wrote: > > Using 0.10.1.0 > > This is my topology: > > Properties config = new Properties(); > config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP); > config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP); > config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" ); > config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, AggKey.class.getName()); > config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > Serdes.String().getClass().getName()); > > JsonSerializer sumRecordsSerializer = new > JsonSerializer<>(); > JsonDeserializer sumRecordsDeserializer = new > JsonDeserializer<>(BqRtDetailLogLine_aggregate.class); > Serde collectorSerde = > Serdes.serdeFrom(sumRecordsSerializer,sumRecordsDeserializer); > > StringSerializer stringSerializer = new StringSerializer(); > StringDeserializer stringDeserializer = new StringDeserializer(); > Serde stringSerde = > Serdes.serdeFrom(stringSerializer,stringDeserializer); > > JsonDeserializer prtRecordDeserializer = new > JsonDeserializer<>(RtDetailLogLine.class); > JsonSerializer prtRecordJsonSerializer = new > JsonSerializer<>(); > Serde prtRecordSerde = > Serdes.serdeFrom(prtRecordJsonSerializer,prtRecordDeserializer); > > KStreamBuilder kStreamBuilder = new KStreamBuilder(); > > KStream rtDetailLines = > kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC); > > // change the keying > KStream rtRekey = rtDetailLines.map((key, value) > -> new KeyValue<>(new AggKey(value), value)); > > KTable, BqRtDetailLogLine_aggregate> ktRtDetail = > rtRekey.groupByKey().aggregate( > BqRtDetailLogLine_aggregate::new, > new PRTAggregate(), > TimeWindows.of(60 * 60 * 1000L), > collectorSerde, "prt_minute_agg_stream"); > > ktRtDetail.toStream().print(); > > KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config); > > kafkaStreams.start(); > > > On Tue, Dec 6, 2016 at 6:48 AM, Damian Guy wrote: > > > Hi Jon, > > > > A couple of things: Which version are you using? > > Can you share the code you are using to the build the topology? > > > > Thanks, > > Damian > > > > On Tue, 6 Dec 2016 at 14:44 Jon Yeargers > wrote: > > > > > Im using .map to convert my (k/v) string/Object to Object/Object but > > when I > > > chain this to an aggregation step Im getting this exception: > > > > > > Exception in thread "StreamThread-1" java.lang.ClassCastException: > > > com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to > > > java.lang.String > > > at > > > > > > org.apache.kafka.common.serialization.StringSerializer.serialize( > > StringSerializer.java:24) > > > at > > > > > > org.apache.kafka.streams.processor.internals.RecordCollector.send( > > RecordCollector.java:73) > > > at > > > > > > org.apache.kafka.streams.processor.internals.SinkNode. > > process(SinkNode.java:72) > > > at > > > > > > org.apache.kafka.streams.processor.internals. > > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > > > at > > > > > > org.apache.kafka.streams.kstream.internals.KStreamFilter$ > > KStreamFilterProcessor.process(KStreamFilter.java:44) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > > ProcessorNode.java:82) > > > at > > > > > > org.apache.kafka.streams.processor.internals. > > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > > > at > > > > > > org.apache.kafka.streams.kstream.internals.KStreamMap$ > > KStreamMapProcessor.process(KStreamMap.java:43) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > > ProcessorNode.java:82) > > > at > > > > > > org.apache.kafka.streams.processor.internals. > > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > > > at > > > > > > org.apache.kafka.streams.processor.internals. > > SourceNode.process
Re: Implementing custom key serializer
If I comment out the aggregation step and just .print the .map step I don't hit the error. It's coming from aggregating the non-String key. On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski wrote: > Jon, > > Looking at your code: > > config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > Serdes.String().getClass().getName()); > > and later: > > KStream rtDetailLines = > kStreamBuilder.stream(stringSerde, > prtRecordSerde, TOPIC); > > Is RtDetailLogLine inheriting from String? It is not, as the error > suggests. > You may have to write your own Serializer / Deserializer for > RtDetailLogLine. > > – > Best regards, > Radek Gruchalski > ra...@gruchalski.com > > > On December 6, 2016 at 6:28:23 PM, Jon Yeargers (jon.yearg...@cedexis.com) > wrote: > > Using 0.10.1.0 > > This is my topology: > > Properties config = new Properties(); > config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP); > config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP); > config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" ); > config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, AggKey.class.getName()); > config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > Serdes.String().getClass().getName()); > > JsonSerializer sumRecordsSerializer = new > JsonSerializer<>(); > JsonDeserializer sumRecordsDeserializer = > new > JsonDeserializer<>(BqRtDetailLogLine_aggregate.class); > Serde collectorSerde = > Serdes.serdeFrom(sumRecordsSerializer,sumRecordsDeserializer); > > StringSerializer stringSerializer = new StringSerializer(); > StringDeserializer stringDeserializer = new StringDeserializer(); > Serde stringSerde = > Serdes.serdeFrom(stringSerializer,stringDeserializer); > > JsonDeserializer prtRecordDeserializer = new > JsonDeserializer<>(RtDetailLogLine.class); > JsonSerializer prtRecordJsonSerializer = new > JsonSerializer<>(); > Serde prtRecordSerde = > Serdes.serdeFrom(prtRecordJsonSerializer,prtRecordDeserializer); > > KStreamBuilder kStreamBuilder = new KStreamBuilder(); > > KStream rtDetailLines = > kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC); > > // change the keying > KStream rtRekey = rtDetailLines.map((key, value) > -> new KeyValue<>(new AggKey(value), value)); > > KTable, BqRtDetailLogLine_aggregate> ktRtDetail = > rtRekey.groupByKey().aggregate( > BqRtDetailLogLine_aggregate::new, > new PRTAggregate(), > TimeWindows.of(60 * 60 * 1000L), > collectorSerde, "prt_minute_agg_stream"); > > ktRtDetail.toStream().print(); > > KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config); > > kafkaStreams.start(); > > > On Tue, Dec 6, 2016 at 6:48 AM, Damian Guy wrote: > > > Hi Jon, > > > > A couple of things: Which version are you using? > > Can you share the code you are using to the build the topology? > > > > Thanks, > > Damian > > > > On Tue, 6 Dec 2016 at 14:44 Jon Yeargers > wrote: > > > > > Im using .map to convert my (k/v) string/Object to Object/Object but > > when I > > > chain this to an aggregation step Im getting this exception: > > > > > > Exception in thread "StreamThread-1" java.lang.ClassCastException: > > > com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to > > > java.lang.String > > > at > > > > > > org.apache.kafka.common.serialization.StringSerializer.serialize( > > StringSerializer.java:24) > > > at > > > > > > org.apache.kafka.streams.processor.internals.RecordCollector.send( > > RecordCollector.java:73) > > > at > > > > > > org.apache.kafka.streams.processor.internals.SinkNode. > > process(SinkNode.java:72) > > > at > > > > > > org.apache.kafka.streams.processor.internals. > > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > > > at > > > > > > org.apache.kafka.streams.kstream.internals.KStreamFilter$ > > KStreamFilterProcessor.process(KStreamFilter.java:44) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > > ProcessorNode.java:82) > > > at > > > > > > org.apache.kafka.streams.processor.internals. > > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > > > at > > > > > > org.apache.kafka.streams.kstream.internals.KStreamMap$ > > KStreamMapProcessor.process(KStreamMap.java:43) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > > ProcessorNode.java:82) > > > at > > > > > > org.apache.kafka.streams.processor.internals. > > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > > > at > > > > > > org.apache.kafka.streams.processor.internals. > > SourceNode.process(SourceNode.java:66) > > > at > > > > > > org.apache.kafka.streams.processor.internals. > > StreamTask.process(StreamTask.java:181) > > > at > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > > StreamThread.java:436) > > > at > > > > > > org.apache.kafka.streams.processor.internals. > > StreamThread.run(StreamThread.java:242) > > > > > > My key object implements Serde and returns a JsonSerializer for the > > > 'Serializer()' override. > > > > > > In the config
Re: Implementing custom key serializer
Jon, Looking at your code: config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); and later: KStream rtDetailLines = kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC); Is RtDetailLogLine inheriting from String? It is not, as the error suggests. You may have to write your own Serializer / Deserializer for RtDetailLogLine. – Best regards, Radek Gruchalski ra...@gruchalski.com On December 6, 2016 at 6:28:23 PM, Jon Yeargers (jon.yearg...@cedexis.com) wrote: Using 0.10.1.0 This is my topology: Properties config = new Properties(); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP); config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" ); config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, AggKey.class.getName()); config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); JsonSerializer sumRecordsSerializer = new JsonSerializer<>(); JsonDeserializer sumRecordsDeserializer = new JsonDeserializer<>(BqRtDetailLogLine_aggregate.class); Serde collectorSerde = Serdes.serdeFrom(sumRecordsSerializer,sumRecordsDeserializer); StringSerializer stringSerializer = new StringSerializer(); StringDeserializer stringDeserializer = new StringDeserializer(); Serde stringSerde = Serdes.serdeFrom(stringSerializer,stringDeserializer); JsonDeserializer prtRecordDeserializer = new JsonDeserializer<>(RtDetailLogLine.class); JsonSerializer prtRecordJsonSerializer = new JsonSerializer<>(); Serde prtRecordSerde = Serdes.serdeFrom(prtRecordJsonSerializer,prtRecordDeserializer); KStreamBuilder kStreamBuilder = new KStreamBuilder(); KStream rtDetailLines = kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC); // change the keying KStream rtRekey = rtDetailLines.map((key, value) -> new KeyValue<>(new AggKey(value), value)); KTable, BqRtDetailLogLine_aggregate> ktRtDetail = rtRekey.groupByKey().aggregate( BqRtDetailLogLine_aggregate::new, new PRTAggregate(), TimeWindows.of(60 * 60 * 1000L), collectorSerde, "prt_minute_agg_stream"); ktRtDetail.toStream().print(); KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config); kafkaStreams.start(); On Tue, Dec 6, 2016 at 6:48 AM, Damian Guy wrote: > Hi Jon, > > A couple of things: Which version are you using? > Can you share the code you are using to the build the topology? > > Thanks, > Damian > > On Tue, 6 Dec 2016 at 14:44 Jon Yeargers wrote: > > > Im using .map to convert my (k/v) string/Object to Object/Object but > when I > > chain this to an aggregation step Im getting this exception: > > > > Exception in thread "StreamThread-1" java.lang.ClassCastException: > > com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to > > java.lang.String > > at > > > > org.apache.kafka.common.serialization.StringSerializer.serialize( > StringSerializer.java:24) > > at > > > > org.apache.kafka.streams.processor.internals.RecordCollector.send( > RecordCollector.java:73) > > at > > > > org.apache.kafka.streams.processor.internals.SinkNode. > process(SinkNode.java:72) > > at > > > > org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > > at > > > > org.apache.kafka.streams.kstream.internals.KStreamFilter$ > KStreamFilterProcessor.process(KStreamFilter.java:44) > > at > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > ProcessorNode.java:82) > > at > > > > org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > > at > > > > org.apache.kafka.streams.kstream.internals.KStreamMap$ > KStreamMapProcessor.process(KStreamMap.java:43) > > at > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > ProcessorNode.java:82) > > at > > > > org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > > at > > > > org.apache.kafka.streams.processor.internals. > SourceNode.process(SourceNode.java:66) > > at > > > > org.apache.kafka.streams.processor.internals. > StreamTask.process(StreamTask.java:181) > > at > > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:436) > > at > > > > org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:242) > > > > My key object implements Serde and returns a JsonSerializer for the > > 'Serializer()' override. > > > > In the config for the topology Im > > setting: config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > > AggKey.class.getName()); > > > > Where else do I need to specify the (de)serializer for my key class? > > >
Re: Implementing custom key serializer
Using 0.10.1.0 This is my topology: Properties config = new Properties(); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP); config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" ); config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, AggKey.class.getName()); config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); JsonSerializer sumRecordsSerializer = new JsonSerializer<>(); JsonDeserializer sumRecordsDeserializer = new JsonDeserializer<>(BqRtDetailLogLine_aggregate.class); Serde collectorSerde = Serdes.serdeFrom(sumRecordsSerializer,sumRecordsDeserializer); StringSerializer stringSerializer = new StringSerializer(); StringDeserializer stringDeserializer = new StringDeserializer(); Serde stringSerde = Serdes.serdeFrom(stringSerializer,stringDeserializer); JsonDeserializer prtRecordDeserializer = new JsonDeserializer<>(RtDetailLogLine.class); JsonSerializer prtRecordJsonSerializer = new JsonSerializer<>(); Serde prtRecordSerde = Serdes.serdeFrom(prtRecordJsonSerializer,prtRecordDeserializer); KStreamBuilder kStreamBuilder = new KStreamBuilder(); KStream rtDetailLines = kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC); // change the keying KStream rtRekey = rtDetailLines.map((key, value) -> new KeyValue<>(new AggKey(value), value)); KTable, BqRtDetailLogLine_aggregate> ktRtDetail = rtRekey.groupByKey().aggregate( BqRtDetailLogLine_aggregate::new, new PRTAggregate(), TimeWindows.of(60 * 60 * 1000L), collectorSerde, "prt_minute_agg_stream"); ktRtDetail.toStream().print(); KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config); kafkaStreams.start(); On Tue, Dec 6, 2016 at 6:48 AM, Damian Guy wrote: > Hi Jon, > > A couple of things: Which version are you using? > Can you share the code you are using to the build the topology? > > Thanks, > Damian > > On Tue, 6 Dec 2016 at 14:44 Jon Yeargers wrote: > > > Im using .map to convert my (k/v) string/Object to Object/Object but > when I > > chain this to an aggregation step Im getting this exception: > > > > Exception in thread "StreamThread-1" java.lang.ClassCastException: > > com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to > > java.lang.String > > at > > > > org.apache.kafka.common.serialization.StringSerializer.serialize( > StringSerializer.java:24) > > at > > > > org.apache.kafka.streams.processor.internals.RecordCollector.send( > RecordCollector.java:73) > > at > > > > org.apache.kafka.streams.processor.internals.SinkNode. > process(SinkNode.java:72) > > at > > > > org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > > at > > > > org.apache.kafka.streams.kstream.internals.KStreamFilter$ > KStreamFilterProcessor.process(KStreamFilter.java:44) > > at > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > ProcessorNode.java:82) > > at > > > > org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > > at > > > > org.apache.kafka.streams.kstream.internals.KStreamMap$ > KStreamMapProcessor.process(KStreamMap.java:43) > > at > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > ProcessorNode.java:82) > > at > > > > org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > > at > > > > org.apache.kafka.streams.processor.internals. > SourceNode.process(SourceNode.java:66) > > at > > > > org.apache.kafka.streams.processor.internals. > StreamTask.process(StreamTask.java:181) > > at > > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:436) > > at > > > > org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:242) > > > > My key object implements Serde and returns a JsonSerializer for the > > 'Serializer()' override. > > > > In the config for the topology Im > > setting: config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > > AggKey.class.getName()); > > > > Where else do I need to specify the (de)serializer for my key class? > > >
Re: Implementing custom key serializer
Hi Jon, A couple of things: Which version are you using? Can you share the code you are using to the build the topology? Thanks, Damian On Tue, 6 Dec 2016 at 14:44 Jon Yeargers wrote: > Im using .map to convert my (k/v) string/Object to Object/Object but when I > chain this to an aggregation step Im getting this exception: > > Exception in thread "StreamThread-1" java.lang.ClassCastException: > com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to > java.lang.String > at > > org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:24) > at > > org.apache.kafka.streams.processor.internals.RecordCollector.send(RecordCollector.java:73) > at > > org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:72) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > at > > org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44) > at > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > at > > org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43) > at > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > at > > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66) > at > > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:181) > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436) > at > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) > > My key object implements Serde and returns a JsonSerializer for the > 'Serializer()' override. > > In the config for the topology Im > setting: config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > AggKey.class.getName()); > > Where else do I need to specify the (de)serializer for my key class? >