Re: Implementing custom key serializer

2016-12-07 Thread Guozhang Wang
I'm not sure why you observed that aggregation works ok if String typed key is used. I think I agree with Radek that the problem comes from the value, and here is my understanding: 1. The source stream read from the topic named "rtDetailLines" is in type 2. After the map

Re: Implementing custom key serializer

2016-12-06 Thread Jon Yeargers
Here's the solution (props to Damian G) JsonSerializer keySerializer = new JsonSerializer<>(); JsonDeserializer keyDeserializer = new JsonDeserializer<>(AggKey.class); Serde keySerde = Serdes.serdeFrom(keySerializer, keyDeserializer); then for the aggregator call 'groupByKey(keySerde,

Re: Implementing custom key serializer

2016-12-06 Thread Jon Yeargers
Hmm. That's odd as the aggregation works ok if I use a String value for the key (and the corresponding String serde). This error only started occurring when I tried to substitute my 'custom' key for the original String. On Tue, Dec 6, 2016 at 12:24 PM, Radek Gruchalski

Re: Implementing custom key serializer

2016-12-06 Thread Radek Gruchalski
Yeah, I knew that already, this part of the error: > > >>> > > org.apache.kafka.streams.processor.internals. > RecordCollector.send( > > >>> > RecordCollector.java:73) points to this line:

Re: Implementing custom key serializer

2016-12-06 Thread Jon Yeargers
0.10.1.0 On Tue, Dec 6, 2016 at 11:11 AM, Radek Gruchalski wrote: > Jon, > > Are you using 0.10.1 or 0.10.0.1? > > – > Best regards, > Radek Gruchalski > ra...@gruchalski.com > > > On December 6, 2016 at 7:55:30 PM, Damian Guy (damian@gmail.com) > wrote: > > Hi Jon, >

Re: Implementing custom key serializer

2016-12-06 Thread Radek Gruchalski
Jon, Are you using 0.10.1 or 0.10.0.1? – Best regards, Radek Gruchalski ra...@gruchalski.com On December 6, 2016 at 7:55:30 PM, Damian Guy (damian@gmail.com) wrote: Hi Jon, At a glance the code looks ok, i.e, i believe the aggregate() should have picked up the default Serde set in your

Re: Implementing custom key serializer

2016-12-06 Thread Damian Guy
Hi Jon, At a glance the code looks ok, i.e, i believe the aggregate() should have picked up the default Serde set in your StreamsConfig. However, you could try adding the Serdes to the groupBy(..) i.e., rtRekey.groupByKey(new AggKeySerDe(), yourValueSerde).aggregate(...) Thanks, Damian On Tue,

Re: Implementing custom key serializer

2016-12-06 Thread Jon Yeargers
It's just a bunch of public 'int' and 'String' values. There's an empty constructor and a copy constructor. For functions I override 'equals' and the requirements for 'serde' (close, configure, serializer and deserializer). @Override public Serializer serializer() { JsonSerializer

Re: Implementing custom key serializer

2016-12-06 Thread Radek Gruchalski
Do you mind sharing the code of AggKey class? – Best regards, Radek Gruchalski ra...@gruchalski.com On December 6, 2016 at 7:26:51 PM, Jon Yeargers (jon.yearg...@cedexis.com) wrote: The 2nd. On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski wrote: > Is the error

Re: Implementing custom key serializer

2016-12-06 Thread Jon Yeargers
The 2nd. On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski wrote: > Is the error happening at this stage? > > KStream rtRekey = rtDetailLines.map((key, value) -> > new KeyValue<>(new AggKey(value), value)); > > or here: > > KTable

Re: Implementing custom key serializer

2016-12-06 Thread Radek Gruchalski
Is the error happening at this stage? KStream rtRekey = rtDetailLines.map((key, value) -> new KeyValue<>(new AggKey(value), value)); or here: KTable ktRtDetail = rtRekey.groupByKey().aggregate( BqRtDetailLogLine_aggregate::new, new

Re: Implementing custom key serializer

2016-12-06 Thread Jon Yeargers
If I comment out the aggregation step and just .print the .map step I don't hit the error. It's coming from aggregating the non-String key. On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski wrote: > Jon, > > Looking at your code: > >

Re: Implementing custom key serializer

2016-12-06 Thread Radek Gruchalski
Jon, Looking at your code: config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); and later: KStream rtDetailLines = kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC); Is RtDetailLogLine inheriting from String? It is not, as

Re: Implementing custom key serializer

2016-12-06 Thread Jon Yeargers
Using 0.10.1.0 This is my topology: Properties config = new Properties(); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP); config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" );

Re: Implementing custom key serializer

2016-12-06 Thread Damian Guy
Hi Jon, A couple of things: Which version are you using? Can you share the code you are using to the build the topology? Thanks, Damian On Tue, 6 Dec 2016 at 14:44 Jon Yeargers wrote: > Im using .map to convert my (k/v) string/Object to Object/Object but when I >

Implementing custom key serializer

2016-12-06 Thread Jon Yeargers
Im using .map to convert my (k/v) string/Object to Object/Object but when I chain this to an aggregation step Im getting this exception: Exception in thread "StreamThread-1" java.lang.ClassCastException: com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to java.lang.String at