It's just a bunch of public 'int' and 'String' values. There's an empty
constructor and a copy constructor.

For functions I override 'equals'  and the requirements for 'serde' (close,
configure, serializer and deserializer).

 @Override
    public Serializer serializer() {
        JsonSerializer<AggKey> jsonSerializer = new JsonSerializer<>();
        return jsonSerializer;
    }

    @Override
    public Deserializer deserializer() {
        JsonDeserializer<AggKey> jsonDeserializer = new
JsonDeserializer<>();
        return jsonDeserializer;
    }




Which relates to:

public class JsonSerializer<T> implements Serializer<T> {

    private Gson gson = new Gson();

    @Override
    public void configure(Map<String, ?> map, boolean b) {

    }

    @Override
    public byte[] serialize(String topic, T t) {
        return gson.toJson(t).getBytes(Charset.forName("UTF-8"));
    }

    @Override
    public void close() {

    }
}



public class JsonDeserializer<T> implements Deserializer<T> {

    private Gson gson = new Gson();
    private Class<T> deserializedClass;

    public JsonDeserializer(Class<T> deserializedClass) {
        this.deserializedClass = deserializedClass;
    }

    public JsonDeserializer() {
    }

    @Override
    @SuppressWarnings("unchecked")
    public void configure(Map<String, ?> map, boolean b) {
        if(deserializedClass == null) {
            deserializedClass = (Class<T>) map.get("serializedClass");
        }
    }

    @Override
    public T deserialize(String s, byte[] bytes) {
        if(bytes == null){
            return null;
        }

        return gson.fromJson(new String(bytes),deserializedClass);

    }

    @Override
    public void close() {

    }
}

On Tue, Dec 6, 2016 at 10:37 AM, Radek Gruchalski <ra...@gruchalski.com>
wrote:

> Do you mind sharing the code of AggKey class?
>
> –
> Best regards,
> Radek Gruchalski
> ra...@gruchalski.com
>
>
> On December 6, 2016 at 7:26:51 PM, Jon Yeargers (jon.yearg...@cedexis.com)
> wrote:
>
> The 2nd.
>
> On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski <ra...@gruchalski.com>
> wrote:
>
>> Is the error happening at this stage?
>>
>> KStream<AggKey, RtDetailLogLine> rtRekey = rtDetailLines.map((key, value)
>>  -> new KeyValue<>(new AggKey(value), value));
>>
>> or here:
>>
>> KTable<Windowed<AggKey>, BqRtDetailLogLine_aggregate> ktRtDetail =
>> rtRekey.groupByKey().aggregate(
>> BqRtDetailLogLine_aggregate::new,
>> new PRTAggregate(),
>> TimeWindows.of(60 * 60 * 1000L),
>> collectorSerde, "prt_minute_agg_stream");
>>
>> –
>>
>> Best regards,
>> Radek Gruchalski
>> ra...@gruchalski.com
>>
>>
>> On December 6, 2016 at 6:47:38 PM, Jon Yeargers (jon.yearg...@cedexis.com)
>> wrote:
>>
>> If I comment out the aggregation step and just .print the .map step I
>> don't hit the error. It's coming from aggregating the non-String key.
>>
>> On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski <ra...@gruchalski.com>
>> wrote:
>>
>>> Jon,
>>>
>>> Looking at your code:
>>>
>>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>>> Serdes.String().getClass().getName());
>>>
>>> and later:
>>>
>>> KStream<String, RtDetailLogLine> rtDetailLines =
>>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);
>>>
>>> Is RtDetailLogLine inheriting from String? It is not, as the error
>>> suggests.
>>> You may have to write your own Serializer / Deserializer for
>>> RtDetailLogLine.
>>>
>>> –
>>> Best regards,
>>> Radek Gruchalski
>>> ra...@gruchalski.com
>>>
>>>
>>> On December 6, 2016 at 6:28:23 PM, Jon Yeargers (
>>> jon.yearg...@cedexis.com) wrote:
>>>
>>> Using 0.10.1.0
>>>
>>> This is my topology:
>>>
>>> Properties config = new Properties();
>>> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
>>> config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP);
>>> config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" );
>>> config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>>> AggKey.class.getName());
>>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>>> Serdes.String().getClass().getName());
>>>
>>> JsonSerializer<BqRtDetailLogLine_aggregate> sumRecordsSerializer = new
>>> JsonSerializer<>();
>>> JsonDeserializer<BqRtDetailLogLine_aggregate> sumRecordsDeserializer =
>>> new
>>> JsonDeserializer<>(BqRtDetailLogLine_aggregate.class);
>>> Serde<BqRtDetailLogLine_aggregate> collectorSerde =
>>> Serdes.serdeFrom(sumRecordsSerializer,sumRecordsDeserializer);
>>>
>>> StringSerializer stringSerializer = new StringSerializer();
>>> StringDeserializer stringDeserializer = new StringDeserializer();
>>> Serde<String> stringSerde =
>>> Serdes.serdeFrom(stringSerializer,stringDeserializer);
>>>
>>> JsonDeserializer<RtDetailLogLine> prtRecordDeserializer = new
>>> JsonDeserializer<>(RtDetailLogLine.class);
>>> JsonSerializer<RtDetailLogLine> prtRecordJsonSerializer = new
>>> JsonSerializer<>();
>>> Serde<RtDetailLogLine> prtRecordSerde =
>>> Serdes.serdeFrom(prtRecordJsonSerializer,prtRecordDeserializer);
>>>
>>> KStreamBuilder kStreamBuilder = new KStreamBuilder();
>>>
>>> KStream<String, RtDetailLogLine> rtDetailLines =
>>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);
>>>
>>> // change the keying
>>> KStream<AggKey, RtDetailLogLine> rtRekey = rtDetailLines.map((key, value)
>>> -> new KeyValue<>(new AggKey(value), value));
>>>
>>> KTable<Windowed<AggKey>, BqRtDetailLogLine_aggregate> ktRtDetail =
>>> rtRekey.groupByKey().aggregate(
>>> BqRtDetailLogLine_aggregate::new,
>>> new PRTAggregate(),
>>> TimeWindows.of(60 * 60 * 1000L),
>>> collectorSerde, "prt_minute_agg_stream");
>>>
>>> ktRtDetail.toStream().print();
>>>
>>> KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config);
>>>
>>> kafkaStreams.start();
>>>
>>>
>>> On Tue, Dec 6, 2016 at 6:48 AM, Damian Guy <damian....@gmail.com> wrote:
>>>
>>> > Hi Jon,
>>> >
>>> > A couple of things: Which version are you using?
>>> > Can you share the code you are using to the build the topology?
>>> >
>>> > Thanks,
>>> > Damian
>>> >
>>> > On Tue, 6 Dec 2016 at 14:44 Jon Yeargers <jon.yearg...@cedexis.com>
>>> wrote:
>>> >
>>> > > Im using .map to convert my (k/v) string/Object to Object/Object but
>>> > when I
>>> > > chain this to an aggregation step Im getting this exception:
>>> > >
>>> > > Exception in thread "StreamThread-1" java.lang.ClassCastException:
>>> > > com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to
>>> > > java.lang.String
>>> > > at
>>> > >
>>> > > org.apache.kafka.common.serialization.StringSerializer.serialize(
>>> > StringSerializer.java:24)
>>> > > at
>>> > >
>>> > > org.apache.kafka.streams.processor.internals.RecordCollector.send(
>>> > RecordCollector.java:73)
>>> > > at
>>> > >
>>> > > org.apache.kafka.streams.processor.internals.SinkNode.
>>> > process(SinkNode.java:72)
>>> > > at
>>> > >
>>> > > org.apache.kafka.streams.processor.internals.
>>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>>> > > at
>>> > >
>>> > > org.apache.kafka.streams.kstream.internals.KStreamFilter$
>>> > KStreamFilterProcessor.process(KStreamFilter.java:44)
>>> > > at
>>> > >
>>> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>>> > ProcessorNode.java:82)
>>> > > at
>>> > >
>>> > > org.apache.kafka.streams.processor.internals.
>>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>>> > > at
>>> > >
>>> > > org.apache.kafka.streams.kstream.internals.KStreamMap$
>>> > KStreamMapProcessor.process(KStreamMap.java:43)
>>> > > at
>>> > >
>>> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>>> > ProcessorNode.java:82)
>>> > > at
>>> > >
>>> > > org.apache.kafka.streams.processor.internals.
>>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>>> > > at
>>> > >
>>> > > org.apache.kafka.streams.processor.internals.
>>> > SourceNode.process(SourceNode.java:66)
>>> > > at
>>> > >
>>> > > org.apache.kafka.streams.processor.internals.
>>> > StreamTask.process(StreamTask.java:181)
>>> > > at
>>> > >
>>> > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>>> > StreamThread.java:436)
>>> > > at
>>> > >
>>> > > org.apache.kafka.streams.processor.internals.
>>> > StreamThread.run(StreamThread.java:242)
>>> > >
>>> > > My key object implements Serde and returns a JsonSerializer for the
>>> > > 'Serializer()' override.
>>> > >
>>> > > In the config for the topology Im
>>> > > setting: config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>>> > > AggKey.class.getName());
>>> > >
>>> > > Where else do I need to specify the (de)serializer for my key class?
>>> > >
>>> >
>>>
>>>
>>
>

Reply via email to