Yeah, I knew that already, this part of the error:

> > >>> > > org.apache.kafka.streams.processor.internals.
> RecordCollector.send(
> > >>> > RecordCollector.java:73)

points to this line:
https://github.com/apache/kafka/blob/0.10.1/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java#L73

which means that your error happens on the value, not the key.

–
Best regards,
Radek Gruchalski
ra...@gruchalski.com


On December 6, 2016 at 9:18:53 PM, Jon Yeargers (jon.yearg...@cedexis.com)
wrote:

0.10.1.0

On Tue, Dec 6, 2016 at 11:11 AM, Radek Gruchalski <ra...@gruchalski.com>
wrote:

> Jon,
>
> Are you using 0.10.1 or 0.10.0.1?
>
> –
> Best regards,
> Radek Gruchalski
> ra...@gruchalski.com
>
>
> On December 6, 2016 at 7:55:30 PM, Damian Guy (damian....@gmail.com)
> wrote:
>
> Hi Jon,
>
> At a glance the code looks ok, i.e, i believe the aggregate() should have
> picked up the default Serde set in your StreamsConfig. However, you could
> try adding the Serdes to the groupBy(..)
>
> i.e.,
> rtRekey.groupByKey(new AggKeySerDe(), yourValueSerde).aggregate(...)
>
> Thanks,
> Damian
>
> On Tue, 6 Dec 2016 at 18:42 Jon Yeargers <jon.yearg...@cedexis.com>
wrote:
>
> > It's just a bunch of public 'int' and 'String' values. There's an empty
> > constructor and a copy constructor.
> >
> > For functions I override 'equals' and the requirements for 'serde'
> (close,
> > configure, serializer and deserializer).
> >
> > @Override
> > public Serializer serializer() {
> > JsonSerializer<AggKey> jsonSerializer = new JsonSerializer<>();
> > return jsonSerializer;
> > }
> >
> > @Override
> > public Deserializer deserializer() {
> > JsonDeserializer<AggKey> jsonDeserializer = new
> > JsonDeserializer<>();
> > return jsonDeserializer;
> > }
> >
> >
> >
> >
> > Which relates to:
> >
> > public class JsonSerializer<T> implements Serializer<T> {
> >
> > private Gson gson = new Gson();
> >
> > @Override
> > public void configure(Map<String, ?> map, boolean b) {
> >
> > }
> >
> > @Override
> > public byte[] serialize(String topic, T t) {
> > return gson.toJson(t).getBytes(Charset.forName("UTF-8"));
> > }
> >
> > @Override
> > public void close() {
> >
> > }
> > }
> >
> >
> >
> > public class JsonDeserializer<T> implements Deserializer<T> {
> >
> > private Gson gson = new Gson();
> > private Class<T> deserializedClass;
> >
> > public JsonDeserializer(Class<T> deserializedClass) {
> > this.deserializedClass = deserializedClass;
> > }
> >
> > public JsonDeserializer() {
> > }
> >
> > @Override
> > @SuppressWarnings("unchecked")
> > public void configure(Map<String, ?> map, boolean b) {
> > if(deserializedClass == null) {
> > deserializedClass = (Class<T>) map.get("serializedClass");
> > }
> > }
> >
> > @Override
> > public T deserialize(String s, byte[] bytes) {
> > if(bytes == null){
> > return null;
> > }
> >
> > return gson.fromJson(new String(bytes),deserializedClass);
> >
> > }
> >
> > @Override
> > public void close() {
> >
> > }
> > }
> >
> > On Tue, Dec 6, 2016 at 10:37 AM, Radek Gruchalski <ra...@gruchalski.com>

> > wrote:
> >
> > > Do you mind sharing the code of AggKey class?
> > >
> > > –
> > > Best regards,
> > > Radek Gruchalski
> > > ra...@gruchalski.com
> > >
> > >
> > > On December 6, 2016 at 7:26:51 PM, Jon Yeargers (
> > jon.yearg...@cedexis.com)
> > > wrote:
> > >
> > > The 2nd.
> > >
> > > On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski <
> ra...@gruchalski.com>
> > > wrote:
> > >
> > >> Is the error happening at this stage?
> > >>
> > >> KStream<AggKey, RtDetailLogLine> rtRekey = rtDetailLines.map((key,
> > value)
> > >> -> new KeyValue<>(new AggKey(value), value));
> > >>
> > >> or here:
> > >>
> > >> KTable<Windowed<AggKey>, BqRtDetailLogLine_aggregate> ktRtDetail =
> > >> rtRekey.groupByKey().aggregate(
> > >> BqRtDetailLogLine_aggregate::new,
> > >> new PRTAggregate(),
> > >> TimeWindows.of(60 * 60 * 1000L),
> > >> collectorSerde, "prt_minute_agg_stream");
> > >>
> > >> –
> > >>
> > >> Best regards,
> > >> Radek Gruchalski
> > >> ra...@gruchalski.com
> > >>
> > >>
> > >> On December 6, 2016 at 6:47:38 PM, Jon Yeargers (
> > jon.yearg...@cedexis.com)
> > >> wrote:
> > >>
> > >> If I comment out the aggregation step and just .print the .map step
I
> > >> don't hit the error. It's coming from aggregating the non-String
key.
> > >>
> > >> On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski <
> ra...@gruchalski.com>
> > >> wrote:
> > >>
> > >>> Jon,
> > >>>
> > >>> Looking at your code:
> > >>>
> > >>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > >>> Serdes.String().getClass().getName());
> > >>>
> > >>> and later:
> > >>>
> > >>> KStream<String, RtDetailLogLine> rtDetailLines =
> > >>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);
> > >>>
> > >>> Is RtDetailLogLine inheriting from String? It is not, as the error
> > >>> suggests.
> > >>> You may have to write your own Serializer / Deserializer for
> > >>> RtDetailLogLine.
> > >>>
> > >>> –
> > >>> Best regards,
> > >>> Radek Gruchalski
> > >>> ra...@gruchalski.com
> > >>>
> > >>>
> > >>> On December 6, 2016 at 6:28:23 PM, Jon Yeargers (
> > >>> jon.yearg...@cedexis.com) wrote:
> > >>>
> > >>> Using 0.10.1.0
> > >>>
> > >>> This is my topology:
> > >>>
> > >>> Properties config = new Properties();
> > >>> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
> > >>> config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP);
> > >>> config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" );
> > >>> config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > >>> AggKey.class.getName());
> > >>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > >>> Serdes.String().getClass().getName());
> > >>>
> > >>> JsonSerializer<BqRtDetailLogLine_aggregate> sumRecordsSerializer =
> new
> > >>> JsonSerializer<>();
> > >>> JsonDeserializer<BqRtDetailLogLine_aggregate>
> sumRecordsDeserializer =
> > >>> new
> > >>> JsonDeserializer<>(BqRtDetailLogLine_aggregate.class);
> > >>> Serde<BqRtDetailLogLine_aggregate> collectorSerde =
> > >>> Serdes.serdeFrom(sumRecordsSerializer,sumRecordsDeserializer);
> > >>>
> > >>> StringSerializer stringSerializer = new StringSerializer();
> > >>> StringDeserializer stringDeserializer = new StringDeserializer();
> > >>> Serde<String> stringSerde =
> > >>> Serdes.serdeFrom(stringSerializer,stringDeserializer);
> > >>>
> > >>> JsonDeserializer<RtDetailLogLine> prtRecordDeserializer = new
> > >>> JsonDeserializer<>(RtDetailLogLine.class);
> > >>> JsonSerializer<RtDetailLogLine> prtRecordJsonSerializer = new
> > >>> JsonSerializer<>();
> > >>> Serde<RtDetailLogLine> prtRecordSerde =
> > >>> Serdes.serdeFrom(prtRecordJsonSerializer,prtRecordDeserializer);
> > >>>
> > >>> KStreamBuilder kStreamBuilder = new KStreamBuilder();
> > >>>
> > >>> KStream<String, RtDetailLogLine> rtDetailLines =
> > >>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);
> > >>>
> > >>> // change the keying
> > >>> KStream<AggKey, RtDetailLogLine> rtRekey = rtDetailLines.map((key,
> > value)
> > >>> -> new KeyValue<>(new AggKey(value), value));
> > >>>
> > >>> KTable<Windowed<AggKey>, BqRtDetailLogLine_aggregate> ktRtDetail =
> > >>> rtRekey.groupByKey().aggregate(
> > >>> BqRtDetailLogLine_aggregate::new,
> > >>> new PRTAggregate(),
> > >>> TimeWindows.of(60 * 60 * 1000L),
> > >>> collectorSerde, "prt_minute_agg_stream");
> > >>>
> > >>> ktRtDetail.toStream().print();
> > >>>
> > >>> KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder,
config);
> > >>>
> > >>> kafkaStreams.start();
> > >>>
> > >>>
> > >>> On Tue, Dec 6, 2016 at 6:48 AM, Damian Guy <damian....@gmail.com>
> > wrote:
> > >>>
> > >>> > Hi Jon,
> > >>> >
> > >>> > A couple of things: Which version are you using?
> > >>> > Can you share the code you are using to the build the topology?
> > >>> >
> > >>> > Thanks,
> > >>> > Damian
> > >>> >
> > >>> > On Tue, 6 Dec 2016 at 14:44 Jon Yeargers <jon.yearg...@cedexis.com
> >
> > >>> wrote:
> > >>> >
> > >>> > > Im using .map to convert my (k/v) string/Object to
Object/Object
> > but
> > >>> > when I
> > >>> > > chain this to an aggregation step Im getting this exception:
> > >>> > >
> > >>> > > Exception in thread "StreamThread-1"
> java.lang.ClassCastException:
> > >>> > > com.company.prtminuteagg.types.RtDetailLogLine cannot be cast
to
> > >>> > > java.lang.String
> > >>> > > at
> > >>> > >
> > >>> > > org.apache.kafka.common.serialization.
> StringSerializer.serialize(
> > >>> > StringSerializer.java:24)
> > >>> > > at
> > >>> > >
> > >>> > > org.apache.kafka.streams.processor.internals.
> RecordCollector.send(
> > >>> > RecordCollector.java:73)
> > >>> > > at
> > >>> > >
> > >>> > > org.apache.kafka.streams.processor.internals.SinkNode.
> > >>> > process(SinkNode.java:72)
> > >>> > > at
> > >>> > >
> > >>> > > org.apache.kafka.streams.processor.internals.
> > >>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > >>> > > at
> > >>> > >
> > >>> > > org.apache.kafka.streams.kstream.internals.KStreamFilter$
> > >>> > KStreamFilterProcessor.process(KStreamFilter.java:44)
> > >>> > > at
> > >>> > >
> > >>> > > org.apache.kafka.streams.processor.internals.
> ProcessorNode.process(
> > >>> > ProcessorNode.java:82)
> > >>> > > at
> > >>> > >
> > >>> > > org.apache.kafka.streams.processor.internals.
> > >>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > >>> > > at
> > >>> > >
> > >>> > > org.apache.kafka.streams.kstream.internals.KStreamMap$
> > >>> > KStreamMapProcessor.process(KStreamMap.java:43)
> > >>> > > at
> > >>> > >
> > >>> > > org.apache.kafka.streams.processor.internals.
> ProcessorNode.process(
> > >>> > ProcessorNode.java:82)
> > >>> > > at
> > >>> > >
> > >>> > > org.apache.kafka.streams.processor.internals.
> > >>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > >>> > > at
> > >>> > >
> > >>> > > org.apache.kafka.streams.processor.internals.
> > >>> > SourceNode.process(SourceNode.java:66)
> > >>> > > at
> > >>> > >
> > >>> > > org.apache.kafka.streams.processor.internals.
> > >>> > StreamTask.process(StreamTask.java:181)
> > >>> > > at
> > >>> > >
> > >>> > > org.apache.kafka.streams.processor.internals.
> StreamThread.runLoop(
> > >>> > StreamThread.java:436)
> > >>> > > at
> > >>> > >
> > >>> > > org.apache.kafka.streams.processor.internals.
> > >>> > StreamThread.run(StreamThread.java:242)
> > >>> > >
> > >>> > > My key object implements Serde and returns a JsonSerializer for
> the
> > >>> > > 'Serializer()' override.
> > >>> > >
> > >>> > > In the config for the topology Im
> > >>> > > setting: config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > >>> > > AggKey.class.getName());
> > >>> > >
> > >>> > > Where else do I need to specify the (de)serializer for my key
> > class?
> > >>> > >
> > >>> >
> > >>>
> > >>>
> > >>
> > >
> >
>

Reply via email to