Is the error happening at this stage?

KStream<AggKey, RtDetailLogLine> rtRekey = rtDetailLines.map((key, value) ->
new KeyValue<>(new AggKey(value), value));

or here:

KTable<Windowed<AggKey>, BqRtDetailLogLine_aggregate> ktRtDetail =
rtRekey.groupByKey().aggregate(
BqRtDetailLogLine_aggregate::new,
new PRTAggregate(),
TimeWindows.of(60 * 60 * 1000L),
collectorSerde, "prt_minute_agg_stream");

–

Best regards,
Radek Gruchalski
ra...@gruchalski.com


On December 6, 2016 at 6:47:38 PM, Jon Yeargers (jon.yearg...@cedexis.com)
wrote:

If I comment out the aggregation step and just .print the .map step I don't
hit the error. It's coming from aggregating the non-String key.

On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski <ra...@gruchalski.com>
wrote:

> Jon,
>
> Looking at your code:
>
> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> Serdes.String().getClass().getName());
>
> and later:
>
> KStream<String, RtDetailLogLine> rtDetailLines = 
> kStreamBuilder.stream(stringSerde,
> prtRecordSerde, TOPIC);
>
> Is RtDetailLogLine inheriting from String? It is not, as the error
> suggests.
> You may have to write your own Serializer / Deserializer for
> RtDetailLogLine.
>
> –
> Best regards,
> Radek Gruchalski
> ra...@gruchalski.com
>
>
> On December 6, 2016 at 6:28:23 PM, Jon Yeargers (jon.yearg...@cedexis.com)
> wrote:
>
> Using 0.10.1.0
>
> This is my topology:
>
> Properties config = new Properties();
> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
> config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP);
> config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" );
> config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, AggKey.class.getName());
> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> Serdes.String().getClass().getName());
>
> JsonSerializer<BqRtDetailLogLine_aggregate> sumRecordsSerializer = new
> JsonSerializer<>();
> JsonDeserializer<BqRtDetailLogLine_aggregate> sumRecordsDeserializer = new
> JsonDeserializer<>(BqRtDetailLogLine_aggregate.class);
> Serde<BqRtDetailLogLine_aggregate> collectorSerde =
> Serdes.serdeFrom(sumRecordsSerializer,sumRecordsDeserializer);
>
> StringSerializer stringSerializer = new StringSerializer();
> StringDeserializer stringDeserializer = new StringDeserializer();
> Serde<String> stringSerde =
> Serdes.serdeFrom(stringSerializer,stringDeserializer);
>
> JsonDeserializer<RtDetailLogLine> prtRecordDeserializer = new
> JsonDeserializer<>(RtDetailLogLine.class);
> JsonSerializer<RtDetailLogLine> prtRecordJsonSerializer = new
> JsonSerializer<>();
> Serde<RtDetailLogLine> prtRecordSerde =
> Serdes.serdeFrom(prtRecordJsonSerializer,prtRecordDeserializer);
>
> KStreamBuilder kStreamBuilder = new KStreamBuilder();
>
> KStream<String, RtDetailLogLine> rtDetailLines =
> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);
>
> // change the keying
> KStream<AggKey, RtDetailLogLine> rtRekey = rtDetailLines.map((key, value)
> -> new KeyValue<>(new AggKey(value), value));
>
> KTable<Windowed<AggKey>, BqRtDetailLogLine_aggregate> ktRtDetail =
> rtRekey.groupByKey().aggregate(
> BqRtDetailLogLine_aggregate::new,
> new PRTAggregate(),
> TimeWindows.of(60 * 60 * 1000L),
> collectorSerde, "prt_minute_agg_stream");
>
> ktRtDetail.toStream().print();
>
> KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config);
>
> kafkaStreams.start();
>
>
> On Tue, Dec 6, 2016 at 6:48 AM, Damian Guy <damian....@gmail.com> wrote:
>
> > Hi Jon,
> >
> > A couple of things: Which version are you using?
> > Can you share the code you are using to the build the topology?
> >
> > Thanks,
> > Damian
> >
> > On Tue, 6 Dec 2016 at 14:44 Jon Yeargers <jon.yearg...@cedexis.com>
> wrote:
> >
> > > Im using .map to convert my (k/v) string/Object to Object/Object but
> > when I
> > > chain this to an aggregation step Im getting this exception:
> > >
> > > Exception in thread "StreamThread-1" java.lang.ClassCastException:
> > > com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to
> > > java.lang.String
> > > at
> > >
> > > org.apache.kafka.common.serialization.StringSerializer.serialize(
> > StringSerializer.java:24)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.RecordCollector.send(
> > RecordCollector.java:73)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.SinkNode.
> > process(SinkNode.java:72)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > > at
> > >
> > > org.apache.kafka.streams.kstream.internals.KStreamFilter$
> > KStreamFilterProcessor.process(KStreamFilter.java:44)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > ProcessorNode.java:82)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > > at
> > >
> > > org.apache.kafka.streams.kstream.internals.KStreamMap$
> > KStreamMapProcessor.process(KStreamMap.java:43)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > ProcessorNode.java:82)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > SourceNode.process(SourceNode.java:66)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > StreamTask.process(StreamTask.java:181)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:436)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:242)
> > >
> > > My key object implements Serde and returns a JsonSerializer for the
> > > 'Serializer()' override.
> > >
> > > In the config for the topology Im
> > > setting: config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > > AggKey.class.getName());
> > >
> > > Where else do I need to specify the (de)serializer for my key class?
> > >
> >
>
>

Reply via email to