Do you mind sharing the code of AggKey class?

–
Best regards,
Radek Gruchalski
ra...@gruchalski.com


On December 6, 2016 at 7:26:51 PM, Jon Yeargers (jon.yearg...@cedexis.com)
wrote:

The 2nd.

On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski <ra...@gruchalski.com>
wrote:

> Is the error happening at this stage?
>
> KStream<AggKey, RtDetailLogLine> rtRekey = rtDetailLines.map((key, value) ->
> new KeyValue<>(new AggKey(value), value));
>
> or here:
>
> KTable<Windowed<AggKey>, BqRtDetailLogLine_aggregate> ktRtDetail =
> rtRekey.groupByKey().aggregate(
> BqRtDetailLogLine_aggregate::new,
> new PRTAggregate(),
> TimeWindows.of(60 * 60 * 1000L),
> collectorSerde, "prt_minute_agg_stream");
>
> –
>
> Best regards,
> Radek Gruchalski
> ra...@gruchalski.com
>
>
> On December 6, 2016 at 6:47:38 PM, Jon Yeargers (jon.yearg...@cedexis.com)
> wrote:
>
> If I comment out the aggregation step and just .print the .map step I
> don't hit the error. It's coming from aggregating the non-String key.
>
> On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski <ra...@gruchalski.com>
> wrote:
>
>> Jon,
>>
>> Looking at your code:
>>
>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>> Serdes.String().getClass().getName());
>>
>> and later:
>>
>> KStream<String, RtDetailLogLine> rtDetailLines =
>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);
>>
>> Is RtDetailLogLine inheriting from String? It is not, as the error
>> suggests.
>> You may have to write your own Serializer / Deserializer for
>> RtDetailLogLine.
>>
>> –
>> Best regards,
>> Radek Gruchalski
>> ra...@gruchalski.com
>>
>>
>> On December 6, 2016 at 6:28:23 PM, Jon Yeargers (jon.yearg...@cedexis.com)
>> wrote:
>>
>> Using 0.10.1.0
>>
>> This is my topology:
>>
>> Properties config = new Properties();
>> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
>> config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP);
>> config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" );
>> config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, AggKey.class.getName());
>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>> Serdes.String().getClass().getName());
>>
>> JsonSerializer<BqRtDetailLogLine_aggregate> sumRecordsSerializer = new
>> JsonSerializer<>();
>> JsonDeserializer<BqRtDetailLogLine_aggregate> sumRecordsDeserializer =
>> new
>> JsonDeserializer<>(BqRtDetailLogLine_aggregate.class);
>> Serde<BqRtDetailLogLine_aggregate> collectorSerde =
>> Serdes.serdeFrom(sumRecordsSerializer,sumRecordsDeserializer);
>>
>> StringSerializer stringSerializer = new StringSerializer();
>> StringDeserializer stringDeserializer = new StringDeserializer();
>> Serde<String> stringSerde =
>> Serdes.serdeFrom(stringSerializer,stringDeserializer);
>>
>> JsonDeserializer<RtDetailLogLine> prtRecordDeserializer = new
>> JsonDeserializer<>(RtDetailLogLine.class);
>> JsonSerializer<RtDetailLogLine> prtRecordJsonSerializer = new
>> JsonSerializer<>();
>> Serde<RtDetailLogLine> prtRecordSerde =
>> Serdes.serdeFrom(prtRecordJsonSerializer,prtRecordDeserializer);
>>
>> KStreamBuilder kStreamBuilder = new KStreamBuilder();
>>
>> KStream<String, RtDetailLogLine> rtDetailLines =
>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);
>>
>> // change the keying
>> KStream<AggKey, RtDetailLogLine> rtRekey = rtDetailLines.map((key, value)
>> -> new KeyValue<>(new AggKey(value), value));
>>
>> KTable<Windowed<AggKey>, BqRtDetailLogLine_aggregate> ktRtDetail =
>> rtRekey.groupByKey().aggregate(
>> BqRtDetailLogLine_aggregate::new,
>> new PRTAggregate(),
>> TimeWindows.of(60 * 60 * 1000L),
>> collectorSerde, "prt_minute_agg_stream");
>>
>> ktRtDetail.toStream().print();
>>
>> KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config);
>>
>> kafkaStreams.start();
>>
>>
>> On Tue, Dec 6, 2016 at 6:48 AM, Damian Guy <damian....@gmail.com> wrote:
>>
>> > Hi Jon,
>> >
>> > A couple of things: Which version are you using?
>> > Can you share the code you are using to the build the topology?
>> >
>> > Thanks,
>> > Damian
>> >
>> > On Tue, 6 Dec 2016 at 14:44 Jon Yeargers <jon.yearg...@cedexis.com>
>> wrote:
>> >
>> > > Im using .map to convert my (k/v) string/Object to Object/Object but
>> > when I
>> > > chain this to an aggregation step Im getting this exception:
>> > >
>> > > Exception in thread "StreamThread-1" java.lang.ClassCastException:
>> > > com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to
>> > > java.lang.String
>> > > at
>> > >
>> > > org.apache.kafka.common.serialization.StringSerializer.serialize(
>> > StringSerializer.java:24)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.RecordCollector.send(
>> > RecordCollector.java:73)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.SinkNode.
>> > process(SinkNode.java:72)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.KStreamFilter$
>> > KStreamFilterProcessor.process(KStreamFilter.java:44)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>> > ProcessorNode.java:82)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.KStreamMap$
>> > KStreamMapProcessor.process(KStreamMap.java:43)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>> > ProcessorNode.java:82)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.
>> > SourceNode.process(SourceNode.java:66)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.
>> > StreamTask.process(StreamTask.java:181)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>> > StreamThread.java:436)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.
>> > StreamThread.run(StreamThread.java:242)
>> > >
>> > > My key object implements Serde and returns a JsonSerializer for the
>> > > 'Serializer()' override.
>> > >
>> > > In the config for the topology Im
>> > > setting: config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>> > > AggKey.class.getName());
>> > >
>> > > Where else do I need to specify the (de)serializer for my key class?
>> > >
>> >
>>
>>
>

Reply via email to