Jon,

Looking at your code:

config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());

and later:

KStream<String, RtDetailLogLine> rtDetailLines =
kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);

Is RtDetailLogLine inheriting from String? It is not, as the error suggests.
You may have to write your own Serializer / Deserializer for
RtDetailLogLine.

–
Best regards,
Radek Gruchalski
ra...@gruchalski.com


On December 6, 2016 at 6:28:23 PM, Jon Yeargers (jon.yearg...@cedexis.com)
wrote:

Using 0.10.1.0

This is my topology:

Properties config = new Properties();
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" );
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, AggKey.class.getName());
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());

JsonSerializer<BqRtDetailLogLine_aggregate> sumRecordsSerializer = new
JsonSerializer<>();
JsonDeserializer<BqRtDetailLogLine_aggregate> sumRecordsDeserializer = new
JsonDeserializer<>(BqRtDetailLogLine_aggregate.class);
Serde<BqRtDetailLogLine_aggregate> collectorSerde =
Serdes.serdeFrom(sumRecordsSerializer,sumRecordsDeserializer);

StringSerializer stringSerializer = new StringSerializer();
StringDeserializer stringDeserializer = new StringDeserializer();
Serde<String> stringSerde =
Serdes.serdeFrom(stringSerializer,stringDeserializer);

JsonDeserializer<RtDetailLogLine> prtRecordDeserializer = new
JsonDeserializer<>(RtDetailLogLine.class);
JsonSerializer<RtDetailLogLine> prtRecordJsonSerializer = new
JsonSerializer<>();
Serde<RtDetailLogLine> prtRecordSerde =
Serdes.serdeFrom(prtRecordJsonSerializer,prtRecordDeserializer);

KStreamBuilder kStreamBuilder = new KStreamBuilder();

KStream<String, RtDetailLogLine> rtDetailLines =
kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);

// change the keying
KStream<AggKey, RtDetailLogLine> rtRekey = rtDetailLines.map((key, value)
-> new KeyValue<>(new AggKey(value), value));

KTable<Windowed<AggKey>, BqRtDetailLogLine_aggregate> ktRtDetail =
rtRekey.groupByKey().aggregate(
BqRtDetailLogLine_aggregate::new,
new PRTAggregate(),
TimeWindows.of(60 * 60 * 1000L),
collectorSerde, "prt_minute_agg_stream");

ktRtDetail.toStream().print();

KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config);

kafkaStreams.start();


On Tue, Dec 6, 2016 at 6:48 AM, Damian Guy <damian....@gmail.com> wrote:

> Hi Jon,
>
> A couple of things: Which version are you using?
> Can you share the code you are using to the build the topology?
>
> Thanks,
> Damian
>
> On Tue, 6 Dec 2016 at 14:44 Jon Yeargers <jon.yearg...@cedexis.com>
wrote:
>
> > Im using .map to convert my (k/v) string/Object to Object/Object but
> when I
> > chain this to an aggregation step Im getting this exception:
> >
> > Exception in thread "StreamThread-1" java.lang.ClassCastException:
> > com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to
> > java.lang.String
> > at
> >
> > org.apache.kafka.common.serialization.StringSerializer.serialize(
> StringSerializer.java:24)
> > at
> >
> > org.apache.kafka.streams.processor.internals.RecordCollector.send(
> RecordCollector.java:73)
> > at
> >
> > org.apache.kafka.streams.processor.internals.SinkNode.
> process(SinkNode.java:72)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > at
> >
> > org.apache.kafka.streams.kstream.internals.KStreamFilter$
> KStreamFilterProcessor.process(KStreamFilter.java:44)
> > at
> >
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:82)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > at
> >
> > org.apache.kafka.streams.kstream.internals.KStreamMap$
> KStreamMapProcessor.process(KStreamMap.java:43)
> > at
> >
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:82)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> SourceNode.process(SourceNode.java:66)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamTask.process(StreamTask.java:181)
> > at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:436)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:242)
> >
> > My key object implements Serde and returns a JsonSerializer for the
> > 'Serializer()' override.
> >
> > In the config for the topology Im
> > setting: config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > AggKey.class.getName());
> >
> > Where else do I need to specify the (de)serializer for my key class?
> >
>

Reply via email to