Re: Implementing custom key serializer

2016-12-07 Thread Guozhang Wang
I'm not sure why you observed that aggregation works ok if String typed key
is used. I think I agree with Radek that the problem comes from the value,
and here is my understanding:

1. The source stream read from the topic named "rtDetailLines" is in type

2. After the map call, the result stream named "rtRekey" is in type


3. Then in aggregateByKey, when repartitioning is auto executed (i.e. the
filter -> sink operators you saw in the stack trace), the default serdes
for type < AggKey, String> is used, and hence value Serializer
failed to serialize an RtDetailLogLine object, which matches the error
message

"Exception in thread "StreamThread-1" java.lang.ClassCastException:
com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to
java.lang.String"

as well. Note that although we are only "grouping by key", we will use the
value serde anyways for repartitioning.


Anyways, I double checked the source code but cannot find any obvious bugs
that causes not using the default serdes.

Guozhang


On Tue, Dec 6, 2016 at 12:40 PM, Jon Yeargers 
wrote:

> Here's the solution (props to Damian G)
>
> JsonSerializer keySerializer = new JsonSerializer<>();
> JsonDeserializer keyDeserializer = new
> JsonDeserializer<>(AggKey.class);
> Serde keySerde = Serdes.serdeFrom(keySerializer, keyDeserializer);
>
> then for the aggregator call 'groupByKey(keySerde, prtRecordSerde)'.
>
> In the documentation where it says the 'no param' groupByKey will use the
> default serializers - this doesn't seem to be true.
>
> On Tue, Dec 6, 2016 at 12:28 PM, Jon Yeargers 
> wrote:
>
> > Hmm. That's odd as the aggregation works ok if I use a String value for
> > the key (and the corresponding String serde).
> >
> > This error only started occurring when I tried to substitute my 'custom'
> > key for the original String.
> >
> > On Tue, Dec 6, 2016 at 12:24 PM, Radek Gruchalski 
> > wrote:
> >
> >> Yeah, I knew that already, this part of the error:
> >>
> >> > > >>> > > org.apache.kafka.streams.processor.internals.
> >> > RecordCollector.send(
> >> > > >>> > RecordCollector.java:73)
> >>
> >> points to this line: https://github.com/apache/
> kafka/blob/0.10.1/streams/
> >> src/main/java/org/apache/kafka/streams/processor/
> >> internals/RecordCollector.java#L73
> >>
> >> which means that your error happens on the value, not the key.
> >>
> >> –
> >> Best regards,
> >> Radek Gruchalski
> >> ra...@gruchalski.com
> >>
> >>
> >> On December 6, 2016 at 9:18:53 PM, Jon Yeargers (
> jon.yearg...@cedexis.com)
> >> wrote:
> >>
> >> 0.10.1.0
> >>
> >> On Tue, Dec 6, 2016 at 11:11 AM, Radek Gruchalski  >
> >> wrote:
> >>
> >> > Jon,
> >> >
> >> > Are you using 0.10.1 or 0.10.0.1?
> >> >
> >> > –
> >> > Best regards,
> >> > Radek Gruchalski
> >> > ra...@gruchalski.com
> >> >
> >> >
> >> > On December 6, 2016 at 7:55:30 PM, Damian Guy (damian@gmail.com)
> >> > wrote:
> >> >
> >> > Hi Jon,
> >> >
> >> > At a glance the code looks ok, i.e, i believe the aggregate() should
> >> have
> >> > picked up the default Serde set in your StreamsConfig. However, you
> >> could
> >> > try adding the Serdes to the groupBy(..)
> >> >
> >> > i.e.,
> >> > rtRekey.groupByKey(new AggKeySerDe(), yourValueSerde).aggregate(...)
> >> >
> >> > Thanks,
> >> > Damian
> >> >
> >> > On Tue, 6 Dec 2016 at 18:42 Jon Yeargers 
> >> wrote:
> >> >
> >> > > It's just a bunch of public 'int' and 'String' values. There's an
> >> empty
> >> > > constructor and a copy constructor.
> >> > >
> >> > > For functions I override 'equals' and the requirements for 'serde'
> >> > (close,
> >> > > configure, serializer and deserializer).
> >> > >
> >> > > @Override
> >> > > public Serializer serializer() {
> >> > > JsonSerializer jsonSerializer = new JsonSerializer<>();
> >> > > return jsonSerializer;
> >> > > }
> >> > >
> >> > > @Override
> >> > > public Deserializer deserializer() {
> >> > > JsonDeserializer jsonDeserializer = new
> >> > > JsonDeserializer<>();
> >> > > return jsonDeserializer;
> >> > > }
> >> > >
> >> > >
> >> > >
> >> > >
> >> > > Which relates to:
> >> > >
> >> > > public class JsonSerializer implements Serializer {
> >> > >
> >> > > private Gson gson = new Gson();
> >> > >
> >> > > @Override
> >> > > public void configure(Map map, boolean b) {
> >> > >
> >> > > }
> >> > >
> >> > > @Override
> >> > > public byte[] serialize(String topic, T t) {
> >> > > return gson.toJson(t).getBytes(Charset.forName("UTF-8"));
> >> > > }
> >> > >
> >> > > @Override
> >> > > public void close() {
> >> > >
> >> > > }
> >> > > }
> >> > >
> >> > >
> >> > >
> >> > > public class JsonDeserializer implements Deserializer {
> >> > >
> >> > > private Gson gson = new Gson();
> >> > > private Class deserializedClass;
> >> > >
> >> > > public JsonDeserializer(Class deserializedClass) {
> >> > > this.deserializedClass 

Re: Implementing custom key serializer

2016-12-06 Thread Jon Yeargers
Here's the solution (props to Damian G)

JsonSerializer keySerializer = new JsonSerializer<>();
JsonDeserializer keyDeserializer = new
JsonDeserializer<>(AggKey.class);
Serde keySerde = Serdes.serdeFrom(keySerializer, keyDeserializer);

then for the aggregator call 'groupByKey(keySerde, prtRecordSerde)'.

In the documentation where it says the 'no param' groupByKey will use the
default serializers - this doesn't seem to be true.

On Tue, Dec 6, 2016 at 12:28 PM, Jon Yeargers 
wrote:

> Hmm. That's odd as the aggregation works ok if I use a String value for
> the key (and the corresponding String serde).
>
> This error only started occurring when I tried to substitute my 'custom'
> key for the original String.
>
> On Tue, Dec 6, 2016 at 12:24 PM, Radek Gruchalski 
> wrote:
>
>> Yeah, I knew that already, this part of the error:
>>
>> > > >>> > > org.apache.kafka.streams.processor.internals.
>> > RecordCollector.send(
>> > > >>> > RecordCollector.java:73)
>>
>> points to this line: https://github.com/apache/kafka/blob/0.10.1/streams/
>> src/main/java/org/apache/kafka/streams/processor/
>> internals/RecordCollector.java#L73
>>
>> which means that your error happens on the value, not the key.
>>
>> –
>> Best regards,
>> Radek Gruchalski
>> ra...@gruchalski.com
>>
>>
>> On December 6, 2016 at 9:18:53 PM, Jon Yeargers (jon.yearg...@cedexis.com)
>> wrote:
>>
>> 0.10.1.0
>>
>> On Tue, Dec 6, 2016 at 11:11 AM, Radek Gruchalski 
>> wrote:
>>
>> > Jon,
>> >
>> > Are you using 0.10.1 or 0.10.0.1?
>> >
>> > –
>> > Best regards,
>> > Radek Gruchalski
>> > ra...@gruchalski.com
>> >
>> >
>> > On December 6, 2016 at 7:55:30 PM, Damian Guy (damian@gmail.com)
>> > wrote:
>> >
>> > Hi Jon,
>> >
>> > At a glance the code looks ok, i.e, i believe the aggregate() should
>> have
>> > picked up the default Serde set in your StreamsConfig. However, you
>> could
>> > try adding the Serdes to the groupBy(..)
>> >
>> > i.e.,
>> > rtRekey.groupByKey(new AggKeySerDe(), yourValueSerde).aggregate(...)
>> >
>> > Thanks,
>> > Damian
>> >
>> > On Tue, 6 Dec 2016 at 18:42 Jon Yeargers 
>> wrote:
>> >
>> > > It's just a bunch of public 'int' and 'String' values. There's an
>> empty
>> > > constructor and a copy constructor.
>> > >
>> > > For functions I override 'equals' and the requirements for 'serde'
>> > (close,
>> > > configure, serializer and deserializer).
>> > >
>> > > @Override
>> > > public Serializer serializer() {
>> > > JsonSerializer jsonSerializer = new JsonSerializer<>();
>> > > return jsonSerializer;
>> > > }
>> > >
>> > > @Override
>> > > public Deserializer deserializer() {
>> > > JsonDeserializer jsonDeserializer = new
>> > > JsonDeserializer<>();
>> > > return jsonDeserializer;
>> > > }
>> > >
>> > >
>> > >
>> > >
>> > > Which relates to:
>> > >
>> > > public class JsonSerializer implements Serializer {
>> > >
>> > > private Gson gson = new Gson();
>> > >
>> > > @Override
>> > > public void configure(Map map, boolean b) {
>> > >
>> > > }
>> > >
>> > > @Override
>> > > public byte[] serialize(String topic, T t) {
>> > > return gson.toJson(t).getBytes(Charset.forName("UTF-8"));
>> > > }
>> > >
>> > > @Override
>> > > public void close() {
>> > >
>> > > }
>> > > }
>> > >
>> > >
>> > >
>> > > public class JsonDeserializer implements Deserializer {
>> > >
>> > > private Gson gson = new Gson();
>> > > private Class deserializedClass;
>> > >
>> > > public JsonDeserializer(Class deserializedClass) {
>> > > this.deserializedClass = deserializedClass;
>> > > }
>> > >
>> > > public JsonDeserializer() {
>> > > }
>> > >
>> > > @Override
>> > > @SuppressWarnings("unchecked")
>> > > public void configure(Map map, boolean b) {
>> > > if(deserializedClass == null) {
>> > > deserializedClass = (Class) map.get("serializedClass");
>> > > }
>> > > }
>> > >
>> > > @Override
>> > > public T deserialize(String s, byte[] bytes) {
>> > > if(bytes == null){
>> > > return null;
>> > > }
>> > >
>> > > return gson.fromJson(new String(bytes),deserializedClass);
>> > >
>> > > }
>> > >
>> > > @Override
>> > > public void close() {
>> > >
>> > > }
>> > > }
>> > >
>> > > On Tue, Dec 6, 2016 at 10:37 AM, Radek Gruchalski <
>> ra...@gruchalski.com>
>> > > wrote:
>> > >
>> > > > Do you mind sharing the code of AggKey class?
>> > > >
>> > > > –
>> > > > Best regards,
>> > > > Radek Gruchalski
>> > > > ra...@gruchalski.com
>> > > >
>> > > >
>> > > > On December 6, 2016 at 7:26:51 PM, Jon Yeargers (
>> > > jon.yearg...@cedexis.com)
>> > > > wrote:
>> > > >
>> > > > The 2nd.
>> > > >
>> > > > On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski <
>> > ra...@gruchalski.com>
>> > > > wrote:
>> > > >
>> > > >> Is the error happening at this stage?
>> > > >>
>> > > >> KStream rtRekey = rtDetailLines.map((key,
>> > > value)
>> > > >> -> new KeyValue<>(new AggKey(value), value));
>> > > >>
>> > > >> or here:
>> > > >>
>> > > >> 

Re: Implementing custom key serializer

2016-12-06 Thread Jon Yeargers
Hmm. That's odd as the aggregation works ok if I use a String value for the
key (and the corresponding String serde).

This error only started occurring when I tried to substitute my 'custom'
key for the original String.

On Tue, Dec 6, 2016 at 12:24 PM, Radek Gruchalski 
wrote:

> Yeah, I knew that already, this part of the error:
>
> > > >>> > > org.apache.kafka.streams.processor.internals.
> > RecordCollector.send(
> > > >>> > RecordCollector.java:73)
>
> points to this line: https://github.com/apache/kafka/blob/0.10.1/
> streams/src/main/java/org/apache/kafka/streams/processor/internals/
> RecordCollector.java#L73
>
> which means that your error happens on the value, not the key.
>
> –
> Best regards,
> Radek Gruchalski
> ra...@gruchalski.com
>
>
> On December 6, 2016 at 9:18:53 PM, Jon Yeargers (jon.yearg...@cedexis.com)
> wrote:
>
> 0.10.1.0
>
> On Tue, Dec 6, 2016 at 11:11 AM, Radek Gruchalski 
> wrote:
>
> > Jon,
> >
> > Are you using 0.10.1 or 0.10.0.1?
> >
> > –
> > Best regards,
> > Radek Gruchalski
> > ra...@gruchalski.com
> >
> >
> > On December 6, 2016 at 7:55:30 PM, Damian Guy (damian@gmail.com)
> > wrote:
> >
> > Hi Jon,
> >
> > At a glance the code looks ok, i.e, i believe the aggregate() should
> have
> > picked up the default Serde set in your StreamsConfig. However, you
> could
> > try adding the Serdes to the groupBy(..)
> >
> > i.e.,
> > rtRekey.groupByKey(new AggKeySerDe(), yourValueSerde).aggregate(...)
> >
> > Thanks,
> > Damian
> >
> > On Tue, 6 Dec 2016 at 18:42 Jon Yeargers 
> wrote:
> >
> > > It's just a bunch of public 'int' and 'String' values. There's an
> empty
> > > constructor and a copy constructor.
> > >
> > > For functions I override 'equals' and the requirements for 'serde'
> > (close,
> > > configure, serializer and deserializer).
> > >
> > > @Override
> > > public Serializer serializer() {
> > > JsonSerializer jsonSerializer = new JsonSerializer<>();
> > > return jsonSerializer;
> > > }
> > >
> > > @Override
> > > public Deserializer deserializer() {
> > > JsonDeserializer jsonDeserializer = new
> > > JsonDeserializer<>();
> > > return jsonDeserializer;
> > > }
> > >
> > >
> > >
> > >
> > > Which relates to:
> > >
> > > public class JsonSerializer implements Serializer {
> > >
> > > private Gson gson = new Gson();
> > >
> > > @Override
> > > public void configure(Map map, boolean b) {
> > >
> > > }
> > >
> > > @Override
> > > public byte[] serialize(String topic, T t) {
> > > return gson.toJson(t).getBytes(Charset.forName("UTF-8"));
> > > }
> > >
> > > @Override
> > > public void close() {
> > >
> > > }
> > > }
> > >
> > >
> > >
> > > public class JsonDeserializer implements Deserializer {
> > >
> > > private Gson gson = new Gson();
> > > private Class deserializedClass;
> > >
> > > public JsonDeserializer(Class deserializedClass) {
> > > this.deserializedClass = deserializedClass;
> > > }
> > >
> > > public JsonDeserializer() {
> > > }
> > >
> > > @Override
> > > @SuppressWarnings("unchecked")
> > > public void configure(Map map, boolean b) {
> > > if(deserializedClass == null) {
> > > deserializedClass = (Class) map.get("serializedClass");
> > > }
> > > }
> > >
> > > @Override
> > > public T deserialize(String s, byte[] bytes) {
> > > if(bytes == null){
> > > return null;
> > > }
> > >
> > > return gson.fromJson(new String(bytes),deserializedClass);
> > >
> > > }
> > >
> > > @Override
> > > public void close() {
> > >
> > > }
> > > }
> > >
> > > On Tue, Dec 6, 2016 at 10:37 AM, Radek Gruchalski <
> ra...@gruchalski.com>
> > > wrote:
> > >
> > > > Do you mind sharing the code of AggKey class?
> > > >
> > > > –
> > > > Best regards,
> > > > Radek Gruchalski
> > > > ra...@gruchalski.com
> > > >
> > > >
> > > > On December 6, 2016 at 7:26:51 PM, Jon Yeargers (
> > > jon.yearg...@cedexis.com)
> > > > wrote:
> > > >
> > > > The 2nd.
> > > >
> > > > On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski <
> > ra...@gruchalski.com>
> > > > wrote:
> > > >
> > > >> Is the error happening at this stage?
> > > >>
> > > >> KStream rtRekey = rtDetailLines.map((key,
> > > value)
> > > >> -> new KeyValue<>(new AggKey(value), value));
> > > >>
> > > >> or here:
> > > >>
> > > >> KTable ktRtDetail =
> > > >> rtRekey.groupByKey().aggregate(
> > > >> BqRtDetailLogLine_aggregate::new,
> > > >> new PRTAggregate(),
> > > >> TimeWindows.of(60 * 60 * 1000L),
> > > >> collectorSerde, "prt_minute_agg_stream");
> > > >>
> > > >> –
> > > >>
> > > >> Best regards,
> > > >> Radek Gruchalski
> > > >> ra...@gruchalski.com
> > > >>
> > > >>
> > > >> On December 6, 2016 at 6:47:38 PM, Jon Yeargers (
> > > jon.yearg...@cedexis.com)
> > > >> wrote:
> > > >>
> > > >> If I comment out the aggregation step and just .print the .map step
> I
> > > >> don't hit the error. It's coming from aggregating the non-String
> key.
> > > >>
> > > >> On Tue, Dec 

Re: Implementing custom key serializer

2016-12-06 Thread Radek Gruchalski
Yeah, I knew that already, this part of the error:

> > >>> > > org.apache.kafka.streams.processor.internals.
> RecordCollector.send(
> > >>> > RecordCollector.java:73)

points to this line:
https://github.com/apache/kafka/blob/0.10.1/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java#L73

which means that your error happens on the value, not the key.

–
Best regards,
Radek Gruchalski
ra...@gruchalski.com


On December 6, 2016 at 9:18:53 PM, Jon Yeargers (jon.yearg...@cedexis.com)
wrote:

0.10.1.0

On Tue, Dec 6, 2016 at 11:11 AM, Radek Gruchalski 
wrote:

> Jon,
>
> Are you using 0.10.1 or 0.10.0.1?
>
> –
> Best regards,
> Radek Gruchalski
> ra...@gruchalski.com
>
>
> On December 6, 2016 at 7:55:30 PM, Damian Guy (damian@gmail.com)
> wrote:
>
> Hi Jon,
>
> At a glance the code looks ok, i.e, i believe the aggregate() should have
> picked up the default Serde set in your StreamsConfig. However, you could
> try adding the Serdes to the groupBy(..)
>
> i.e.,
> rtRekey.groupByKey(new AggKeySerDe(), yourValueSerde).aggregate(...)
>
> Thanks,
> Damian
>
> On Tue, 6 Dec 2016 at 18:42 Jon Yeargers 
wrote:
>
> > It's just a bunch of public 'int' and 'String' values. There's an empty
> > constructor and a copy constructor.
> >
> > For functions I override 'equals' and the requirements for 'serde'
> (close,
> > configure, serializer and deserializer).
> >
> > @Override
> > public Serializer serializer() {
> > JsonSerializer jsonSerializer = new JsonSerializer<>();
> > return jsonSerializer;
> > }
> >
> > @Override
> > public Deserializer deserializer() {
> > JsonDeserializer jsonDeserializer = new
> > JsonDeserializer<>();
> > return jsonDeserializer;
> > }
> >
> >
> >
> >
> > Which relates to:
> >
> > public class JsonSerializer implements Serializer {
> >
> > private Gson gson = new Gson();
> >
> > @Override
> > public void configure(Map map, boolean b) {
> >
> > }
> >
> > @Override
> > public byte[] serialize(String topic, T t) {
> > return gson.toJson(t).getBytes(Charset.forName("UTF-8"));
> > }
> >
> > @Override
> > public void close() {
> >
> > }
> > }
> >
> >
> >
> > public class JsonDeserializer implements Deserializer {
> >
> > private Gson gson = new Gson();
> > private Class deserializedClass;
> >
> > public JsonDeserializer(Class deserializedClass) {
> > this.deserializedClass = deserializedClass;
> > }
> >
> > public JsonDeserializer() {
> > }
> >
> > @Override
> > @SuppressWarnings("unchecked")
> > public void configure(Map map, boolean b) {
> > if(deserializedClass == null) {
> > deserializedClass = (Class) map.get("serializedClass");
> > }
> > }
> >
> > @Override
> > public T deserialize(String s, byte[] bytes) {
> > if(bytes == null){
> > return null;
> > }
> >
> > return gson.fromJson(new String(bytes),deserializedClass);
> >
> > }
> >
> > @Override
> > public void close() {
> >
> > }
> > }
> >
> > On Tue, Dec 6, 2016 at 10:37 AM, Radek Gruchalski 

> > wrote:
> >
> > > Do you mind sharing the code of AggKey class?
> > >
> > > –
> > > Best regards,
> > > Radek Gruchalski
> > > ra...@gruchalski.com
> > >
> > >
> > > On December 6, 2016 at 7:26:51 PM, Jon Yeargers (
> > jon.yearg...@cedexis.com)
> > > wrote:
> > >
> > > The 2nd.
> > >
> > > On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski <
> ra...@gruchalski.com>
> > > wrote:
> > >
> > >> Is the error happening at this stage?
> > >>
> > >> KStream rtRekey = rtDetailLines.map((key,
> > value)
> > >> -> new KeyValue<>(new AggKey(value), value));
> > >>
> > >> or here:
> > >>
> > >> KTable ktRtDetail =
> > >> rtRekey.groupByKey().aggregate(
> > >> BqRtDetailLogLine_aggregate::new,
> > >> new PRTAggregate(),
> > >> TimeWindows.of(60 * 60 * 1000L),
> > >> collectorSerde, "prt_minute_agg_stream");
> > >>
> > >> –
> > >>
> > >> Best regards,
> > >> Radek Gruchalski
> > >> ra...@gruchalski.com
> > >>
> > >>
> > >> On December 6, 2016 at 6:47:38 PM, Jon Yeargers (
> > jon.yearg...@cedexis.com)
> > >> wrote:
> > >>
> > >> If I comment out the aggregation step and just .print the .map step
I
> > >> don't hit the error. It's coming from aggregating the non-String
key.
> > >>
> > >> On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski <
> ra...@gruchalski.com>
> > >> wrote:
> > >>
> > >>> Jon,
> > >>>
> > >>> Looking at your code:
> > >>>
> > >>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > >>> Serdes.String().getClass().getName());
> > >>>
> > >>> and later:
> > >>>
> > >>> KStream rtDetailLines =
> > >>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);
> > >>>
> > >>> Is RtDetailLogLine inheriting from String? It is not, as the error
> > >>> suggests.
> > >>> You may have to write your own Serializer / Deserializer for
> > >>> RtDetailLogLine.
> > >>>
> > >>> –
> > >>> Best regards,
> > >>> Radek Gruchalski
> > >>> 

Re: Implementing custom key serializer

2016-12-06 Thread Jon Yeargers
0.10.1.0

On Tue, Dec 6, 2016 at 11:11 AM, Radek Gruchalski 
wrote:

> Jon,
>
> Are you using 0.10.1 or 0.10.0.1?
>
> –
> Best regards,
> Radek Gruchalski
> ra...@gruchalski.com
>
>
> On December 6, 2016 at 7:55:30 PM, Damian Guy (damian@gmail.com)
> wrote:
>
> Hi Jon,
>
> At a glance the code looks ok, i.e, i believe the aggregate() should have
> picked up the default Serde set in your StreamsConfig. However, you could
> try adding the Serdes to the groupBy(..)
>
> i.e.,
> rtRekey.groupByKey(new AggKeySerDe(), yourValueSerde).aggregate(...)
>
> Thanks,
> Damian
>
> On Tue, 6 Dec 2016 at 18:42 Jon Yeargers  wrote:
>
> > It's just a bunch of public 'int' and 'String' values. There's an empty
> > constructor and a copy constructor.
> >
> > For functions I override 'equals'  and the requirements for 'serde'
> (close,
> > configure, serializer and deserializer).
> >
> >  @Override
> > public Serializer serializer() {
> > JsonSerializer jsonSerializer = new JsonSerializer<>();
> > return jsonSerializer;
> > }
> >
> > @Override
> > public Deserializer deserializer() {
> > JsonDeserializer jsonDeserializer = new
> > JsonDeserializer<>();
> > return jsonDeserializer;
> > }
> >
> >
> >
> >
> > Which relates to:
> >
> > public class JsonSerializer implements Serializer {
> >
> > private Gson gson = new Gson();
> >
> > @Override
> > public void configure(Map map, boolean b) {
> >
> > }
> >
> > @Override
> > public byte[] serialize(String topic, T t) {
> > return gson.toJson(t).getBytes(Charset.forName("UTF-8"));
> > }
> >
> > @Override
> > public void close() {
> >
> > }
> > }
> >
> >
> >
> > public class JsonDeserializer implements Deserializer {
> >
> > private Gson gson = new Gson();
> > private Class deserializedClass;
> >
> > public JsonDeserializer(Class deserializedClass) {
> > this.deserializedClass = deserializedClass;
> > }
> >
> > public JsonDeserializer() {
> > }
> >
> > @Override
> > @SuppressWarnings("unchecked")
> > public void configure(Map map, boolean b) {
> > if(deserializedClass == null) {
> > deserializedClass = (Class) map.get("serializedClass");
> > }
> > }
> >
> > @Override
> > public T deserialize(String s, byte[] bytes) {
> > if(bytes == null){
> > return null;
> > }
> >
> > return gson.fromJson(new String(bytes),deserializedClass);
> >
> > }
> >
> > @Override
> > public void close() {
> >
> > }
> > }
> >
> > On Tue, Dec 6, 2016 at 10:37 AM, Radek Gruchalski 
> > wrote:
> >
> > > Do you mind sharing the code of AggKey class?
> > >
> > > –
> > > Best regards,
> > > Radek Gruchalski
> > > ra...@gruchalski.com
> > >
> > >
> > > On December 6, 2016 at 7:26:51 PM, Jon Yeargers (
> > jon.yearg...@cedexis.com)
> > > wrote:
> > >
> > > The 2nd.
> > >
> > > On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski <
> ra...@gruchalski.com>
> > > wrote:
> > >
> > >> Is the error happening at this stage?
> > >>
> > >> KStream rtRekey = rtDetailLines.map((key,
> > value)
> > >>  -> new KeyValue<>(new AggKey(value), value));
> > >>
> > >> or here:
> > >>
> > >> KTable ktRtDetail =
> > >> rtRekey.groupByKey().aggregate(
> > >> BqRtDetailLogLine_aggregate::new,
> > >> new PRTAggregate(),
> > >> TimeWindows.of(60 * 60 * 1000L),
> > >> collectorSerde, "prt_minute_agg_stream");
> > >>
> > >> –
> > >>
> > >> Best regards,
> > >> Radek Gruchalski
> > >> ra...@gruchalski.com
> > >>
> > >>
> > >> On December 6, 2016 at 6:47:38 PM, Jon Yeargers (
> > jon.yearg...@cedexis.com)
> > >> wrote:
> > >>
> > >> If I comment out the aggregation step and just .print the .map step I
> > >> don't hit the error. It's coming from aggregating the non-String key.
> > >>
> > >> On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski <
> ra...@gruchalski.com>
> > >> wrote:
> > >>
> > >>> Jon,
> > >>>
> > >>> Looking at your code:
> > >>>
> > >>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > >>> Serdes.String().getClass().getName());
> > >>>
> > >>> and later:
> > >>>
> > >>> KStream rtDetailLines =
> > >>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);
> > >>>
> > >>> Is RtDetailLogLine inheriting from String? It is not, as the error
> > >>> suggests.
> > >>> You may have to write your own Serializer / Deserializer for
> > >>> RtDetailLogLine.
> > >>>
> > >>> –
> > >>> Best regards,
> > >>> Radek Gruchalski
> > >>> ra...@gruchalski.com
> > >>>
> > >>>
> > >>> On December 6, 2016 at 6:28:23 PM, Jon Yeargers (
> > >>> jon.yearg...@cedexis.com) wrote:
> > >>>
> > >>> Using 0.10.1.0
> > >>>
> > >>> This is my topology:
> > >>>
> > >>> Properties config = new Properties();
> > >>> 

Re: Implementing custom key serializer

2016-12-06 Thread Radek Gruchalski
Jon,

Are you using 0.10.1 or 0.10.0.1?

–
Best regards,
Radek Gruchalski
ra...@gruchalski.com


On December 6, 2016 at 7:55:30 PM, Damian Guy (damian@gmail.com) wrote:

Hi Jon,

At a glance the code looks ok, i.e, i believe the aggregate() should have
picked up the default Serde set in your StreamsConfig. However, you could
try adding the Serdes to the groupBy(..)

i.e.,
rtRekey.groupByKey(new AggKeySerDe(), yourValueSerde).aggregate(...)

Thanks,
Damian

On Tue, 6 Dec 2016 at 18:42 Jon Yeargers  wrote:

> It's just a bunch of public 'int' and 'String' values. There's an empty
> constructor and a copy constructor.
>
> For functions I override 'equals'  and the requirements for 'serde' (close,
> configure, serializer and deserializer).
>
>  @Override
> public Serializer serializer() {
> JsonSerializer jsonSerializer = new JsonSerializer<>();
> return jsonSerializer;
> }
>
> @Override
> public Deserializer deserializer() {
> JsonDeserializer jsonDeserializer = new
> JsonDeserializer<>();
> return jsonDeserializer;
> }
>
>
>
>
> Which relates to:
>
> public class JsonSerializer implements Serializer {
>
> private Gson gson = new Gson();
>
> @Override
> public void configure(Map map, boolean b) {
>
> }
>
> @Override
> public byte[] serialize(String topic, T t) {
> return gson.toJson(t).getBytes(Charset.forName("UTF-8"));
> }
>
> @Override
> public void close() {
>
> }
> }
>
>
>
> public class JsonDeserializer implements Deserializer {
>
> private Gson gson = new Gson();
> private Class deserializedClass;
>
> public JsonDeserializer(Class deserializedClass) {
> this.deserializedClass = deserializedClass;
> }
>
> public JsonDeserializer() {
> }
>
> @Override
> @SuppressWarnings("unchecked")
> public void configure(Map map, boolean b) {
> if(deserializedClass == null) {
> deserializedClass = (Class) map.get("serializedClass");
> }
> }
>
> @Override
> public T deserialize(String s, byte[] bytes) {
> if(bytes == null){
> return null;
> }
>
> return gson.fromJson(new String(bytes),deserializedClass);
>
> }
>
> @Override
> public void close() {
>
> }
> }
>
> On Tue, Dec 6, 2016 at 10:37 AM, Radek Gruchalski 
> wrote:
>
> > Do you mind sharing the code of AggKey class?
> >
> > –
> > Best regards,
> > Radek Gruchalski
> > ra...@gruchalski.com
> >
> >
> > On December 6, 2016 at 7:26:51 PM, Jon Yeargers (
> jon.yearg...@cedexis.com)
> > wrote:
> >
> > The 2nd.
> >
> > On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski 
> > wrote:
> >
> >> Is the error happening at this stage?
> >>
> >> KStream rtRekey = rtDetailLines.map((key,
> value)
> >>  -> new KeyValue<>(new AggKey(value), value));
> >>
> >> or here:
> >>
> >> KTable ktRtDetail =
> >> rtRekey.groupByKey().aggregate(
> >> BqRtDetailLogLine_aggregate::new,
> >> new PRTAggregate(),
> >> TimeWindows.of(60 * 60 * 1000L),
> >> collectorSerde, "prt_minute_agg_stream");
> >>
> >> –
> >>
> >> Best regards,
> >> Radek Gruchalski
> >> ra...@gruchalski.com
> >>
> >>
> >> On December 6, 2016 at 6:47:38 PM, Jon Yeargers (
> jon.yearg...@cedexis.com)
> >> wrote:
> >>
> >> If I comment out the aggregation step and just .print the .map step I
> >> don't hit the error. It's coming from aggregating the non-String key.
> >>
> >> On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski 
> >> wrote:
> >>
> >>> Jon,
> >>>
> >>> Looking at your code:
> >>>
> >>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> >>> Serdes.String().getClass().getName());
> >>>
> >>> and later:
> >>>
> >>> KStream rtDetailLines =
> >>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);
> >>>
> >>> Is RtDetailLogLine inheriting from String? It is not, as the error
> >>> suggests.
> >>> You may have to write your own Serializer / Deserializer for
> >>> RtDetailLogLine.
> >>>
> >>> –
> >>> Best regards,
> >>> Radek Gruchalski
> >>> ra...@gruchalski.com
> >>>
> >>>
> >>> On December 6, 2016 at 6:28:23 PM, Jon Yeargers (
> >>> jon.yearg...@cedexis.com) wrote:
> >>>
> >>> Using 0.10.1.0
> >>>
> >>> This is my topology:
> >>>
> >>> Properties config = new Properties();
> >>> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
> >>> config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP);
> >>> config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" );
> >>> config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> >>> AggKey.class.getName());
> >>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> >>> Serdes.String().getClass().getName());
> >>>
> >>> JsonSerializer sumRecordsSerializer = new
> >>> JsonSerializer<>();
> >>> JsonDeserializer 

Re: Implementing custom key serializer

2016-12-06 Thread Damian Guy
Hi Jon,

At a glance the code looks ok, i.e, i believe the aggregate() should have
picked up the default Serde set in your StreamsConfig. However, you could
try adding the Serdes to the groupBy(..)

i.e.,
rtRekey.groupByKey(new AggKeySerDe(), yourValueSerde).aggregate(...)

Thanks,
Damian

On Tue, 6 Dec 2016 at 18:42 Jon Yeargers  wrote:

> It's just a bunch of public 'int' and 'String' values. There's an empty
> constructor and a copy constructor.
>
> For functions I override 'equals'  and the requirements for 'serde' (close,
> configure, serializer and deserializer).
>
>  @Override
> public Serializer serializer() {
> JsonSerializer jsonSerializer = new JsonSerializer<>();
> return jsonSerializer;
> }
>
> @Override
> public Deserializer deserializer() {
> JsonDeserializer jsonDeserializer = new
> JsonDeserializer<>();
> return jsonDeserializer;
> }
>
>
>
>
> Which relates to:
>
> public class JsonSerializer implements Serializer {
>
> private Gson gson = new Gson();
>
> @Override
> public void configure(Map map, boolean b) {
>
> }
>
> @Override
> public byte[] serialize(String topic, T t) {
> return gson.toJson(t).getBytes(Charset.forName("UTF-8"));
> }
>
> @Override
> public void close() {
>
> }
> }
>
>
>
> public class JsonDeserializer implements Deserializer {
>
> private Gson gson = new Gson();
> private Class deserializedClass;
>
> public JsonDeserializer(Class deserializedClass) {
> this.deserializedClass = deserializedClass;
> }
>
> public JsonDeserializer() {
> }
>
> @Override
> @SuppressWarnings("unchecked")
> public void configure(Map map, boolean b) {
> if(deserializedClass == null) {
> deserializedClass = (Class) map.get("serializedClass");
> }
> }
>
> @Override
> public T deserialize(String s, byte[] bytes) {
> if(bytes == null){
> return null;
> }
>
> return gson.fromJson(new String(bytes),deserializedClass);
>
> }
>
> @Override
> public void close() {
>
> }
> }
>
> On Tue, Dec 6, 2016 at 10:37 AM, Radek Gruchalski 
> wrote:
>
> > Do you mind sharing the code of AggKey class?
> >
> > –
> > Best regards,
> > Radek Gruchalski
> > ra...@gruchalski.com
> >
> >
> > On December 6, 2016 at 7:26:51 PM, Jon Yeargers (
> jon.yearg...@cedexis.com)
> > wrote:
> >
> > The 2nd.
> >
> > On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski 
> > wrote:
> >
> >> Is the error happening at this stage?
> >>
> >> KStream rtRekey = rtDetailLines.map((key,
> value)
> >>  -> new KeyValue<>(new AggKey(value), value));
> >>
> >> or here:
> >>
> >> KTable ktRtDetail =
> >> rtRekey.groupByKey().aggregate(
> >> BqRtDetailLogLine_aggregate::new,
> >> new PRTAggregate(),
> >> TimeWindows.of(60 * 60 * 1000L),
> >> collectorSerde, "prt_minute_agg_stream");
> >>
> >> –
> >>
> >> Best regards,
> >> Radek Gruchalski
> >> ra...@gruchalski.com
> >>
> >>
> >> On December 6, 2016 at 6:47:38 PM, Jon Yeargers (
> jon.yearg...@cedexis.com)
> >> wrote:
> >>
> >> If I comment out the aggregation step and just .print the .map step I
> >> don't hit the error. It's coming from aggregating the non-String key.
> >>
> >> On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski 
> >> wrote:
> >>
> >>> Jon,
> >>>
> >>> Looking at your code:
> >>>
> >>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> >>> Serdes.String().getClass().getName());
> >>>
> >>> and later:
> >>>
> >>> KStream rtDetailLines =
> >>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);
> >>>
> >>> Is RtDetailLogLine inheriting from String? It is not, as the error
> >>> suggests.
> >>> You may have to write your own Serializer / Deserializer for
> >>> RtDetailLogLine.
> >>>
> >>> –
> >>> Best regards,
> >>> Radek Gruchalski
> >>> ra...@gruchalski.com
> >>>
> >>>
> >>> On December 6, 2016 at 6:28:23 PM, Jon Yeargers (
> >>> jon.yearg...@cedexis.com) wrote:
> >>>
> >>> Using 0.10.1.0
> >>>
> >>> This is my topology:
> >>>
> >>> Properties config = new Properties();
> >>> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
> >>> config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP);
> >>> config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" );
> >>> config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> >>> AggKey.class.getName());
> >>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> >>> Serdes.String().getClass().getName());
> >>>
> >>> JsonSerializer sumRecordsSerializer = new
> >>> JsonSerializer<>();
> >>> JsonDeserializer sumRecordsDeserializer =
> >>> new
> >>> JsonDeserializer<>(BqRtDetailLogLine_aggregate.class);
> >>> Serde collectorSerde =
> >>> 

Re: Implementing custom key serializer

2016-12-06 Thread Jon Yeargers
It's just a bunch of public 'int' and 'String' values. There's an empty
constructor and a copy constructor.

For functions I override 'equals'  and the requirements for 'serde' (close,
configure, serializer and deserializer).

 @Override
public Serializer serializer() {
JsonSerializer jsonSerializer = new JsonSerializer<>();
return jsonSerializer;
}

@Override
public Deserializer deserializer() {
JsonDeserializer jsonDeserializer = new
JsonDeserializer<>();
return jsonDeserializer;
}




Which relates to:

public class JsonSerializer implements Serializer {

private Gson gson = new Gson();

@Override
public void configure(Map map, boolean b) {

}

@Override
public byte[] serialize(String topic, T t) {
return gson.toJson(t).getBytes(Charset.forName("UTF-8"));
}

@Override
public void close() {

}
}



public class JsonDeserializer implements Deserializer {

private Gson gson = new Gson();
private Class deserializedClass;

public JsonDeserializer(Class deserializedClass) {
this.deserializedClass = deserializedClass;
}

public JsonDeserializer() {
}

@Override
@SuppressWarnings("unchecked")
public void configure(Map map, boolean b) {
if(deserializedClass == null) {
deserializedClass = (Class) map.get("serializedClass");
}
}

@Override
public T deserialize(String s, byte[] bytes) {
if(bytes == null){
return null;
}

return gson.fromJson(new String(bytes),deserializedClass);

}

@Override
public void close() {

}
}

On Tue, Dec 6, 2016 at 10:37 AM, Radek Gruchalski 
wrote:

> Do you mind sharing the code of AggKey class?
>
> –
> Best regards,
> Radek Gruchalski
> ra...@gruchalski.com
>
>
> On December 6, 2016 at 7:26:51 PM, Jon Yeargers (jon.yearg...@cedexis.com)
> wrote:
>
> The 2nd.
>
> On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski 
> wrote:
>
>> Is the error happening at this stage?
>>
>> KStream rtRekey = rtDetailLines.map((key, value)
>>  -> new KeyValue<>(new AggKey(value), value));
>>
>> or here:
>>
>> KTable ktRtDetail =
>> rtRekey.groupByKey().aggregate(
>> BqRtDetailLogLine_aggregate::new,
>> new PRTAggregate(),
>> TimeWindows.of(60 * 60 * 1000L),
>> collectorSerde, "prt_minute_agg_stream");
>>
>> –
>>
>> Best regards,
>> Radek Gruchalski
>> ra...@gruchalski.com
>>
>>
>> On December 6, 2016 at 6:47:38 PM, Jon Yeargers (jon.yearg...@cedexis.com)
>> wrote:
>>
>> If I comment out the aggregation step and just .print the .map step I
>> don't hit the error. It's coming from aggregating the non-String key.
>>
>> On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski 
>> wrote:
>>
>>> Jon,
>>>
>>> Looking at your code:
>>>
>>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>>> Serdes.String().getClass().getName());
>>>
>>> and later:
>>>
>>> KStream rtDetailLines =
>>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);
>>>
>>> Is RtDetailLogLine inheriting from String? It is not, as the error
>>> suggests.
>>> You may have to write your own Serializer / Deserializer for
>>> RtDetailLogLine.
>>>
>>> –
>>> Best regards,
>>> Radek Gruchalski
>>> ra...@gruchalski.com
>>>
>>>
>>> On December 6, 2016 at 6:28:23 PM, Jon Yeargers (
>>> jon.yearg...@cedexis.com) wrote:
>>>
>>> Using 0.10.1.0
>>>
>>> This is my topology:
>>>
>>> Properties config = new Properties();
>>> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
>>> config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP);
>>> config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" );
>>> config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>>> AggKey.class.getName());
>>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>>> Serdes.String().getClass().getName());
>>>
>>> JsonSerializer sumRecordsSerializer = new
>>> JsonSerializer<>();
>>> JsonDeserializer sumRecordsDeserializer =
>>> new
>>> JsonDeserializer<>(BqRtDetailLogLine_aggregate.class);
>>> Serde collectorSerde =
>>> Serdes.serdeFrom(sumRecordsSerializer,sumRecordsDeserializer);
>>>
>>> StringSerializer stringSerializer = new StringSerializer();
>>> StringDeserializer stringDeserializer = new StringDeserializer();
>>> Serde stringSerde =
>>> Serdes.serdeFrom(stringSerializer,stringDeserializer);
>>>
>>> JsonDeserializer prtRecordDeserializer = new
>>> JsonDeserializer<>(RtDetailLogLine.class);
>>> JsonSerializer prtRecordJsonSerializer = new
>>> JsonSerializer<>();
>>> Serde prtRecordSerde =
>>> Serdes.serdeFrom(prtRecordJsonSerializer,prtRecordDeserializer);
>>>
>>> KStreamBuilder kStreamBuilder = new KStreamBuilder();
>>>
>>> KStream rtDetailLines =
>>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);
>>>
>>> // change the 

Re: Implementing custom key serializer

2016-12-06 Thread Radek Gruchalski
Do you mind sharing the code of AggKey class?

–
Best regards,
Radek Gruchalski
ra...@gruchalski.com


On December 6, 2016 at 7:26:51 PM, Jon Yeargers (jon.yearg...@cedexis.com)
wrote:

The 2nd.

On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski 
wrote:

> Is the error happening at this stage?
>
> KStream rtRekey = rtDetailLines.map((key, value) ->
> new KeyValue<>(new AggKey(value), value));
>
> or here:
>
> KTable ktRtDetail =
> rtRekey.groupByKey().aggregate(
> BqRtDetailLogLine_aggregate::new,
> new PRTAggregate(),
> TimeWindows.of(60 * 60 * 1000L),
> collectorSerde, "prt_minute_agg_stream");
>
> –
>
> Best regards,
> Radek Gruchalski
> ra...@gruchalski.com
>
>
> On December 6, 2016 at 6:47:38 PM, Jon Yeargers (jon.yearg...@cedexis.com)
> wrote:
>
> If I comment out the aggregation step and just .print the .map step I
> don't hit the error. It's coming from aggregating the non-String key.
>
> On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski 
> wrote:
>
>> Jon,
>>
>> Looking at your code:
>>
>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>> Serdes.String().getClass().getName());
>>
>> and later:
>>
>> KStream rtDetailLines =
>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);
>>
>> Is RtDetailLogLine inheriting from String? It is not, as the error
>> suggests.
>> You may have to write your own Serializer / Deserializer for
>> RtDetailLogLine.
>>
>> –
>> Best regards,
>> Radek Gruchalski
>> ra...@gruchalski.com
>>
>>
>> On December 6, 2016 at 6:28:23 PM, Jon Yeargers (jon.yearg...@cedexis.com)
>> wrote:
>>
>> Using 0.10.1.0
>>
>> This is my topology:
>>
>> Properties config = new Properties();
>> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
>> config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP);
>> config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" );
>> config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, AggKey.class.getName());
>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>> Serdes.String().getClass().getName());
>>
>> JsonSerializer sumRecordsSerializer = new
>> JsonSerializer<>();
>> JsonDeserializer sumRecordsDeserializer =
>> new
>> JsonDeserializer<>(BqRtDetailLogLine_aggregate.class);
>> Serde collectorSerde =
>> Serdes.serdeFrom(sumRecordsSerializer,sumRecordsDeserializer);
>>
>> StringSerializer stringSerializer = new StringSerializer();
>> StringDeserializer stringDeserializer = new StringDeserializer();
>> Serde stringSerde =
>> Serdes.serdeFrom(stringSerializer,stringDeserializer);
>>
>> JsonDeserializer prtRecordDeserializer = new
>> JsonDeserializer<>(RtDetailLogLine.class);
>> JsonSerializer prtRecordJsonSerializer = new
>> JsonSerializer<>();
>> Serde prtRecordSerde =
>> Serdes.serdeFrom(prtRecordJsonSerializer,prtRecordDeserializer);
>>
>> KStreamBuilder kStreamBuilder = new KStreamBuilder();
>>
>> KStream rtDetailLines =
>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);
>>
>> // change the keying
>> KStream rtRekey = rtDetailLines.map((key, value)
>> -> new KeyValue<>(new AggKey(value), value));
>>
>> KTable ktRtDetail =
>> rtRekey.groupByKey().aggregate(
>> BqRtDetailLogLine_aggregate::new,
>> new PRTAggregate(),
>> TimeWindows.of(60 * 60 * 1000L),
>> collectorSerde, "prt_minute_agg_stream");
>>
>> ktRtDetail.toStream().print();
>>
>> KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config);
>>
>> kafkaStreams.start();
>>
>>
>> On Tue, Dec 6, 2016 at 6:48 AM, Damian Guy  wrote:
>>
>> > Hi Jon,
>> >
>> > A couple of things: Which version are you using?
>> > Can you share the code you are using to the build the topology?
>> >
>> > Thanks,
>> > Damian
>> >
>> > On Tue, 6 Dec 2016 at 14:44 Jon Yeargers 
>> wrote:
>> >
>> > > Im using .map to convert my (k/v) string/Object to Object/Object but
>> > when I
>> > > chain this to an aggregation step Im getting this exception:
>> > >
>> > > Exception in thread "StreamThread-1" java.lang.ClassCastException:
>> > > com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to
>> > > java.lang.String
>> > > at
>> > >
>> > > org.apache.kafka.common.serialization.StringSerializer.serialize(
>> > StringSerializer.java:24)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.RecordCollector.send(
>> > RecordCollector.java:73)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.SinkNode.
>> > process(SinkNode.java:72)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.KStreamFilter$
>> > KStreamFilterProcessor.process(KStreamFilter.java:44)
>> > > at
>> > >
>> > > 

Re: Implementing custom key serializer

2016-12-06 Thread Jon Yeargers
The 2nd.

On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski 
wrote:

> Is the error happening at this stage?
>
> KStream rtRekey = rtDetailLines.map((key, value) ->
> new KeyValue<>(new AggKey(value), value));
>
> or here:
>
> KTable ktRtDetail =
> rtRekey.groupByKey().aggregate(
> BqRtDetailLogLine_aggregate::new,
> new PRTAggregate(),
> TimeWindows.of(60 * 60 * 1000L),
> collectorSerde, "prt_minute_agg_stream");
>
> –
>
> Best regards,
> Radek Gruchalski
> ra...@gruchalski.com
>
>
> On December 6, 2016 at 6:47:38 PM, Jon Yeargers (jon.yearg...@cedexis.com)
> wrote:
>
> If I comment out the aggregation step and just .print the .map step I
> don't hit the error. It's coming from aggregating the non-String key.
>
> On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski 
> wrote:
>
>> Jon,
>>
>> Looking at your code:
>>
>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>> Serdes.String().getClass().getName());
>>
>> and later:
>>
>> KStream rtDetailLines =
>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);
>>
>> Is RtDetailLogLine inheriting from String? It is not, as the error
>> suggests.
>> You may have to write your own Serializer / Deserializer for
>> RtDetailLogLine.
>>
>> –
>> Best regards,
>> Radek Gruchalski
>> ra...@gruchalski.com
>>
>>
>> On December 6, 2016 at 6:28:23 PM, Jon Yeargers (jon.yearg...@cedexis.com)
>> wrote:
>>
>> Using 0.10.1.0
>>
>> This is my topology:
>>
>> Properties config = new Properties();
>> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
>> config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP);
>> config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" );
>> config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, AggKey.class.getName());
>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>> Serdes.String().getClass().getName());
>>
>> JsonSerializer sumRecordsSerializer = new
>> JsonSerializer<>();
>> JsonDeserializer sumRecordsDeserializer =
>> new
>> JsonDeserializer<>(BqRtDetailLogLine_aggregate.class);
>> Serde collectorSerde =
>> Serdes.serdeFrom(sumRecordsSerializer,sumRecordsDeserializer);
>>
>> StringSerializer stringSerializer = new StringSerializer();
>> StringDeserializer stringDeserializer = new StringDeserializer();
>> Serde stringSerde =
>> Serdes.serdeFrom(stringSerializer,stringDeserializer);
>>
>> JsonDeserializer prtRecordDeserializer = new
>> JsonDeserializer<>(RtDetailLogLine.class);
>> JsonSerializer prtRecordJsonSerializer = new
>> JsonSerializer<>();
>> Serde prtRecordSerde =
>> Serdes.serdeFrom(prtRecordJsonSerializer,prtRecordDeserializer);
>>
>> KStreamBuilder kStreamBuilder = new KStreamBuilder();
>>
>> KStream rtDetailLines =
>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);
>>
>> // change the keying
>> KStream rtRekey = rtDetailLines.map((key, value)
>> -> new KeyValue<>(new AggKey(value), value));
>>
>> KTable ktRtDetail =
>> rtRekey.groupByKey().aggregate(
>> BqRtDetailLogLine_aggregate::new,
>> new PRTAggregate(),
>> TimeWindows.of(60 * 60 * 1000L),
>> collectorSerde, "prt_minute_agg_stream");
>>
>> ktRtDetail.toStream().print();
>>
>> KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config);
>>
>> kafkaStreams.start();
>>
>>
>> On Tue, Dec 6, 2016 at 6:48 AM, Damian Guy  wrote:
>>
>> > Hi Jon,
>> >
>> > A couple of things: Which version are you using?
>> > Can you share the code you are using to the build the topology?
>> >
>> > Thanks,
>> > Damian
>> >
>> > On Tue, 6 Dec 2016 at 14:44 Jon Yeargers 
>> wrote:
>> >
>> > > Im using .map to convert my (k/v) string/Object to Object/Object but
>> > when I
>> > > chain this to an aggregation step Im getting this exception:
>> > >
>> > > Exception in thread "StreamThread-1" java.lang.ClassCastException:
>> > > com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to
>> > > java.lang.String
>> > > at
>> > >
>> > > org.apache.kafka.common.serialization.StringSerializer.serialize(
>> > StringSerializer.java:24)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.RecordCollector.send(
>> > RecordCollector.java:73)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.SinkNode.
>> > process(SinkNode.java:72)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.KStreamFilter$
>> > KStreamFilterProcessor.process(KStreamFilter.java:44)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>> > ProcessorNode.java:82)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>> > > 

Re: Implementing custom key serializer

2016-12-06 Thread Radek Gruchalski
Is the error happening at this stage?

KStream rtRekey = rtDetailLines.map((key, value) ->
new KeyValue<>(new AggKey(value), value));

or here:

KTable ktRtDetail =
rtRekey.groupByKey().aggregate(
BqRtDetailLogLine_aggregate::new,
new PRTAggregate(),
TimeWindows.of(60 * 60 * 1000L),
collectorSerde, "prt_minute_agg_stream");

–

Best regards,
Radek Gruchalski
ra...@gruchalski.com


On December 6, 2016 at 6:47:38 PM, Jon Yeargers (jon.yearg...@cedexis.com)
wrote:

If I comment out the aggregation step and just .print the .map step I don't
hit the error. It's coming from aggregating the non-String key.

On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski 
wrote:

> Jon,
>
> Looking at your code:
>
> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> Serdes.String().getClass().getName());
>
> and later:
>
> KStream rtDetailLines = 
> kStreamBuilder.stream(stringSerde,
> prtRecordSerde, TOPIC);
>
> Is RtDetailLogLine inheriting from String? It is not, as the error
> suggests.
> You may have to write your own Serializer / Deserializer for
> RtDetailLogLine.
>
> –
> Best regards,
> Radek Gruchalski
> ra...@gruchalski.com
>
>
> On December 6, 2016 at 6:28:23 PM, Jon Yeargers (jon.yearg...@cedexis.com)
> wrote:
>
> Using 0.10.1.0
>
> This is my topology:
>
> Properties config = new Properties();
> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
> config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP);
> config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" );
> config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, AggKey.class.getName());
> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> Serdes.String().getClass().getName());
>
> JsonSerializer sumRecordsSerializer = new
> JsonSerializer<>();
> JsonDeserializer sumRecordsDeserializer = new
> JsonDeserializer<>(BqRtDetailLogLine_aggregate.class);
> Serde collectorSerde =
> Serdes.serdeFrom(sumRecordsSerializer,sumRecordsDeserializer);
>
> StringSerializer stringSerializer = new StringSerializer();
> StringDeserializer stringDeserializer = new StringDeserializer();
> Serde stringSerde =
> Serdes.serdeFrom(stringSerializer,stringDeserializer);
>
> JsonDeserializer prtRecordDeserializer = new
> JsonDeserializer<>(RtDetailLogLine.class);
> JsonSerializer prtRecordJsonSerializer = new
> JsonSerializer<>();
> Serde prtRecordSerde =
> Serdes.serdeFrom(prtRecordJsonSerializer,prtRecordDeserializer);
>
> KStreamBuilder kStreamBuilder = new KStreamBuilder();
>
> KStream rtDetailLines =
> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);
>
> // change the keying
> KStream rtRekey = rtDetailLines.map((key, value)
> -> new KeyValue<>(new AggKey(value), value));
>
> KTable ktRtDetail =
> rtRekey.groupByKey().aggregate(
> BqRtDetailLogLine_aggregate::new,
> new PRTAggregate(),
> TimeWindows.of(60 * 60 * 1000L),
> collectorSerde, "prt_minute_agg_stream");
>
> ktRtDetail.toStream().print();
>
> KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config);
>
> kafkaStreams.start();
>
>
> On Tue, Dec 6, 2016 at 6:48 AM, Damian Guy  wrote:
>
> > Hi Jon,
> >
> > A couple of things: Which version are you using?
> > Can you share the code you are using to the build the topology?
> >
> > Thanks,
> > Damian
> >
> > On Tue, 6 Dec 2016 at 14:44 Jon Yeargers 
> wrote:
> >
> > > Im using .map to convert my (k/v) string/Object to Object/Object but
> > when I
> > > chain this to an aggregation step Im getting this exception:
> > >
> > > Exception in thread "StreamThread-1" java.lang.ClassCastException:
> > > com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to
> > > java.lang.String
> > > at
> > >
> > > org.apache.kafka.common.serialization.StringSerializer.serialize(
> > StringSerializer.java:24)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.RecordCollector.send(
> > RecordCollector.java:73)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.SinkNode.
> > process(SinkNode.java:72)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > > at
> > >
> > > org.apache.kafka.streams.kstream.internals.KStreamFilter$
> > KStreamFilterProcessor.process(KStreamFilter.java:44)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > ProcessorNode.java:82)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > > at
> > >
> > > org.apache.kafka.streams.kstream.internals.KStreamMap$
> > KStreamMapProcessor.process(KStreamMap.java:43)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > ProcessorNode.java:82)
> > > at
> > >
> > > 

Re: Implementing custom key serializer

2016-12-06 Thread Jon Yeargers
If I comment out the aggregation step and just .print the .map step I don't
hit the error. It's coming from aggregating the non-String key.

On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski 
wrote:

> Jon,
>
> Looking at your code:
>
> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> Serdes.String().getClass().getName());
>
> and later:
>
> KStream rtDetailLines = 
> kStreamBuilder.stream(stringSerde,
> prtRecordSerde, TOPIC);
>
> Is RtDetailLogLine inheriting from String? It is not, as the error
> suggests.
> You may have to write your own Serializer / Deserializer for
> RtDetailLogLine.
>
> –
> Best regards,
> Radek Gruchalski
> ra...@gruchalski.com
>
>
> On December 6, 2016 at 6:28:23 PM, Jon Yeargers (jon.yearg...@cedexis.com)
> wrote:
>
> Using 0.10.1.0
>
> This is my topology:
>
> Properties config = new Properties();
> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
> config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP);
> config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" );
> config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, AggKey.class.getName());
> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> Serdes.String().getClass().getName());
>
> JsonSerializer sumRecordsSerializer = new
> JsonSerializer<>();
> JsonDeserializer sumRecordsDeserializer =
> new
> JsonDeserializer<>(BqRtDetailLogLine_aggregate.class);
> Serde collectorSerde =
> Serdes.serdeFrom(sumRecordsSerializer,sumRecordsDeserializer);
>
> StringSerializer stringSerializer = new StringSerializer();
> StringDeserializer stringDeserializer = new StringDeserializer();
> Serde stringSerde =
> Serdes.serdeFrom(stringSerializer,stringDeserializer);
>
> JsonDeserializer prtRecordDeserializer = new
> JsonDeserializer<>(RtDetailLogLine.class);
> JsonSerializer prtRecordJsonSerializer = new
> JsonSerializer<>();
> Serde prtRecordSerde =
> Serdes.serdeFrom(prtRecordJsonSerializer,prtRecordDeserializer);
>
> KStreamBuilder kStreamBuilder = new KStreamBuilder();
>
> KStream rtDetailLines =
> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);
>
> // change the keying
> KStream rtRekey = rtDetailLines.map((key, value)
> -> new KeyValue<>(new AggKey(value), value));
>
> KTable ktRtDetail =
> rtRekey.groupByKey().aggregate(
> BqRtDetailLogLine_aggregate::new,
> new PRTAggregate(),
> TimeWindows.of(60 * 60 * 1000L),
> collectorSerde, "prt_minute_agg_stream");
>
> ktRtDetail.toStream().print();
>
> KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config);
>
> kafkaStreams.start();
>
>
> On Tue, Dec 6, 2016 at 6:48 AM, Damian Guy  wrote:
>
> > Hi Jon,
> >
> > A couple of things: Which version are you using?
> > Can you share the code you are using to the build the topology?
> >
> > Thanks,
> > Damian
> >
> > On Tue, 6 Dec 2016 at 14:44 Jon Yeargers 
> wrote:
> >
> > > Im using .map to convert my (k/v) string/Object to Object/Object but
> > when I
> > > chain this to an aggregation step Im getting this exception:
> > >
> > > Exception in thread "StreamThread-1" java.lang.ClassCastException:
> > > com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to
> > > java.lang.String
> > > at
> > >
> > > org.apache.kafka.common.serialization.StringSerializer.serialize(
> > StringSerializer.java:24)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.RecordCollector.send(
> > RecordCollector.java:73)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.SinkNode.
> > process(SinkNode.java:72)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > > at
> > >
> > > org.apache.kafka.streams.kstream.internals.KStreamFilter$
> > KStreamFilterProcessor.process(KStreamFilter.java:44)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > ProcessorNode.java:82)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > > at
> > >
> > > org.apache.kafka.streams.kstream.internals.KStreamMap$
> > KStreamMapProcessor.process(KStreamMap.java:43)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > ProcessorNode.java:82)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > SourceNode.process(SourceNode.java:66)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > StreamTask.process(StreamTask.java:181)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:436)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > 

Re: Implementing custom key serializer

2016-12-06 Thread Radek Gruchalski
Jon,

Looking at your code:

config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());

and later:

KStream rtDetailLines =
kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);

Is RtDetailLogLine inheriting from String? It is not, as the error suggests.
You may have to write your own Serializer / Deserializer for
RtDetailLogLine.

–
Best regards,
Radek Gruchalski
ra...@gruchalski.com


On December 6, 2016 at 6:28:23 PM, Jon Yeargers (jon.yearg...@cedexis.com)
wrote:

Using 0.10.1.0

This is my topology:

Properties config = new Properties();
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" );
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, AggKey.class.getName());
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());

JsonSerializer sumRecordsSerializer = new
JsonSerializer<>();
JsonDeserializer sumRecordsDeserializer = new
JsonDeserializer<>(BqRtDetailLogLine_aggregate.class);
Serde collectorSerde =
Serdes.serdeFrom(sumRecordsSerializer,sumRecordsDeserializer);

StringSerializer stringSerializer = new StringSerializer();
StringDeserializer stringDeserializer = new StringDeserializer();
Serde stringSerde =
Serdes.serdeFrom(stringSerializer,stringDeserializer);

JsonDeserializer prtRecordDeserializer = new
JsonDeserializer<>(RtDetailLogLine.class);
JsonSerializer prtRecordJsonSerializer = new
JsonSerializer<>();
Serde prtRecordSerde =
Serdes.serdeFrom(prtRecordJsonSerializer,prtRecordDeserializer);

KStreamBuilder kStreamBuilder = new KStreamBuilder();

KStream rtDetailLines =
kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);

// change the keying
KStream rtRekey = rtDetailLines.map((key, value)
-> new KeyValue<>(new AggKey(value), value));

KTable ktRtDetail =
rtRekey.groupByKey().aggregate(
BqRtDetailLogLine_aggregate::new,
new PRTAggregate(),
TimeWindows.of(60 * 60 * 1000L),
collectorSerde, "prt_minute_agg_stream");

ktRtDetail.toStream().print();

KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config);

kafkaStreams.start();


On Tue, Dec 6, 2016 at 6:48 AM, Damian Guy  wrote:

> Hi Jon,
>
> A couple of things: Which version are you using?
> Can you share the code you are using to the build the topology?
>
> Thanks,
> Damian
>
> On Tue, 6 Dec 2016 at 14:44 Jon Yeargers 
wrote:
>
> > Im using .map to convert my (k/v) string/Object to Object/Object but
> when I
> > chain this to an aggregation step Im getting this exception:
> >
> > Exception in thread "StreamThread-1" java.lang.ClassCastException:
> > com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to
> > java.lang.String
> > at
> >
> > org.apache.kafka.common.serialization.StringSerializer.serialize(
> StringSerializer.java:24)
> > at
> >
> > org.apache.kafka.streams.processor.internals.RecordCollector.send(
> RecordCollector.java:73)
> > at
> >
> > org.apache.kafka.streams.processor.internals.SinkNode.
> process(SinkNode.java:72)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > at
> >
> > org.apache.kafka.streams.kstream.internals.KStreamFilter$
> KStreamFilterProcessor.process(KStreamFilter.java:44)
> > at
> >
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:82)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > at
> >
> > org.apache.kafka.streams.kstream.internals.KStreamMap$
> KStreamMapProcessor.process(KStreamMap.java:43)
> > at
> >
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:82)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> SourceNode.process(SourceNode.java:66)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamTask.process(StreamTask.java:181)
> > at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:436)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:242)
> >
> > My key object implements Serde and returns a JsonSerializer for the
> > 'Serializer()' override.
> >
> > In the config for the topology Im
> > setting: config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > AggKey.class.getName());
> >
> > Where else do I need to specify the (de)serializer for my key class?
> >
>


Re: Implementing custom key serializer

2016-12-06 Thread Jon Yeargers
Using 0.10.1.0

This is my topology:

Properties config = new Properties();
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" );
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, AggKey.class.getName());
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());

JsonSerializer sumRecordsSerializer = new
JsonSerializer<>();
JsonDeserializer sumRecordsDeserializer = new
JsonDeserializer<>(BqRtDetailLogLine_aggregate.class);
Serde collectorSerde =
Serdes.serdeFrom(sumRecordsSerializer,sumRecordsDeserializer);

StringSerializer stringSerializer = new StringSerializer();
StringDeserializer stringDeserializer = new StringDeserializer();
Serde stringSerde =
Serdes.serdeFrom(stringSerializer,stringDeserializer);

JsonDeserializer prtRecordDeserializer = new
JsonDeserializer<>(RtDetailLogLine.class);
JsonSerializer prtRecordJsonSerializer = new
JsonSerializer<>();
Serde prtRecordSerde =
Serdes.serdeFrom(prtRecordJsonSerializer,prtRecordDeserializer);

KStreamBuilder kStreamBuilder = new KStreamBuilder();

KStream rtDetailLines =
kStreamBuilder.stream(stringSerde,  prtRecordSerde, TOPIC);

// change the keying
KStream rtRekey = rtDetailLines.map((key, value)
-> new KeyValue<>(new AggKey(value), value));

KTable ktRtDetail =
rtRekey.groupByKey().aggregate(
BqRtDetailLogLine_aggregate::new,
new PRTAggregate(),
TimeWindows.of(60 * 60 * 1000L),
collectorSerde, "prt_minute_agg_stream");

ktRtDetail.toStream().print();

KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config);

kafkaStreams.start();


On Tue, Dec 6, 2016 at 6:48 AM, Damian Guy  wrote:

> Hi Jon,
>
> A couple of things: Which version are you using?
> Can you share the code you are using to the build the topology?
>
> Thanks,
> Damian
>
> On Tue, 6 Dec 2016 at 14:44 Jon Yeargers  wrote:
>
> > Im using .map to convert my (k/v) string/Object to Object/Object but
> when I
> > chain this to an aggregation step Im getting this exception:
> >
> > Exception in thread "StreamThread-1" java.lang.ClassCastException:
> > com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to
> > java.lang.String
> > at
> >
> > org.apache.kafka.common.serialization.StringSerializer.serialize(
> StringSerializer.java:24)
> > at
> >
> > org.apache.kafka.streams.processor.internals.RecordCollector.send(
> RecordCollector.java:73)
> > at
> >
> > org.apache.kafka.streams.processor.internals.SinkNode.
> process(SinkNode.java:72)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > at
> >
> > org.apache.kafka.streams.kstream.internals.KStreamFilter$
> KStreamFilterProcessor.process(KStreamFilter.java:44)
> > at
> >
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:82)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > at
> >
> > org.apache.kafka.streams.kstream.internals.KStreamMap$
> KStreamMapProcessor.process(KStreamMap.java:43)
> > at
> >
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:82)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> SourceNode.process(SourceNode.java:66)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamTask.process(StreamTask.java:181)
> > at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:436)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:242)
> >
> > My key object implements Serde and returns a JsonSerializer for the
> > 'Serializer()' override.
> >
> > In the config for the topology Im
> > setting: config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > AggKey.class.getName());
> >
> > Where else do I need to specify the (de)serializer for my key class?
> >
>


Re: Implementing custom key serializer

2016-12-06 Thread Damian Guy
Hi Jon,

A couple of things: Which version are you using?
Can you share the code you are using to the build the topology?

Thanks,
Damian

On Tue, 6 Dec 2016 at 14:44 Jon Yeargers  wrote:

> Im using .map to convert my (k/v) string/Object to Object/Object but when I
> chain this to an aggregation step Im getting this exception:
>
> Exception in thread "StreamThread-1" java.lang.ClassCastException:
> com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to
> java.lang.String
> at
>
> org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:24)
> at
>
> org.apache.kafka.streams.processor.internals.RecordCollector.send(RecordCollector.java:73)
> at
>
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:72)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
>
> org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
>
> org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
>
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
> at
>
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:181)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
>
> My key object implements Serde and returns a JsonSerializer for the
> 'Serializer()' override.
>
> In the config for the topology Im
> setting: config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> AggKey.class.getName());
>
> Where else do I need to specify the (de)serializer for my key class?
>


Implementing custom key serializer

2016-12-06 Thread Jon Yeargers
Im using .map to convert my (k/v) string/Object to Object/Object but when I
chain this to an aggregation step Im getting this exception:

Exception in thread "StreamThread-1" java.lang.ClassCastException:
com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to
java.lang.String
at
org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:24)
at
org.apache.kafka.streams.processor.internals.RecordCollector.send(RecordCollector.java:73)
at
org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:72)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
at
org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
at
org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:181)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)

My key object implements Serde and returns a JsonSerializer for the
'Serializer()' override.

In the config for the topology Im
setting: config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
AggKey.class.getName());

Where else do I need to specify the (de)serializer for my key class?