Jon,

Are you using 0.10.1 or 0.10.0.1?

–
Best regards,
Radek Gruchalski
ra...@gruchalski.com


On December 6, 2016 at 7:55:30 PM, Damian Guy (damian....@gmail.com) wrote:

Hi Jon,

At a glance the code looks ok, i.e, i believe the aggregate() should have
picked up the default Serde set in your StreamsConfig. However, you could
try adding the Serdes to the groupBy(..)

i.e.,
rtRekey.groupByKey(new AggKeySerDe(), yourValueSerde).aggregate(...)

Thanks,
Damian

On Tue, 6 Dec 2016 at 18:42 Jon Yeargers <jon.yearg...@cedexis.com> wrote:

> It's just a bunch of public 'int' and 'String' values. There's an empty
> constructor and a copy constructor.
>
> For functions I override 'equals'  and the requirements for 'serde' (close,
> configure, serializer and deserializer).
>
>  @Override
>     public Serializer serializer() {
>         JsonSerializer<AggKey> jsonSerializer = new JsonSerializer<>();
>         return jsonSerializer;
>     }
>
>     @Override
>     public Deserializer deserializer() {
>         JsonDeserializer<AggKey> jsonDeserializer = new
> JsonDeserializer<>();
>         return jsonDeserializer;
>     }
>
>
>
>
> Which relates to:
>
> public class JsonSerializer<T> implements Serializer<T> {
>
>     private Gson gson = new Gson();
>
>     @Override
>     public void configure(Map<String, ?> map, boolean b) {
>
>     }
>
>     @Override
>     public byte[] serialize(String topic, T t) {
>         return gson.toJson(t).getBytes(Charset.forName("UTF-8"));
>     }
>
>     @Override
>     public void close() {
>
>     }
> }
>
>
>
> public class JsonDeserializer<T> implements Deserializer<T> {
>
>     private Gson gson = new Gson();
>     private Class<T> deserializedClass;
>
>     public JsonDeserializer(Class<T> deserializedClass) {
>         this.deserializedClass = deserializedClass;
>     }
>
>     public JsonDeserializer() {
>     }
>
>     @Override
>     @SuppressWarnings("unchecked")
>     public void configure(Map<String, ?> map, boolean b) {
>         if(deserializedClass == null) {
>             deserializedClass = (Class<T>) map.get("serializedClass");
>         }
>     }
>
>     @Override
>     public T deserialize(String s, byte[] bytes) {
>         if(bytes == null){
>             return null;
>         }
>
>         return gson.fromJson(new String(bytes),deserializedClass);
>
>     }
>
>     @Override
>     public void close() {
>
>     }
> }
>
> On Tue, Dec 6, 2016 at 10:37 AM, Radek Gruchalski <ra...@gruchalski.com>
> wrote:
>
> > Do you mind sharing the code of AggKey class?
> >
> > –
> > Best regards,
> > Radek Gruchalski
> > ra...@gruchalski.com
> >
> >
> > On December 6, 2016 at 7:26:51 PM, Jon Yeargers (
> jon.yearg...@cedexis.com)
> > wrote:
> >
> > The 2nd.
> >
> > On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski <ra...@gruchalski.com>
> > wrote:
> >
> >> Is the error happening at this stage?
> >>
> >> KStream<AggKey, RtDetailLogLine> rtRekey = rtDetailLines.map((key,
> value)
> >>  -> new KeyValue<>(new AggKey(value), value));
> >>
> >> or here:
> >>
> >> KTable<Windowed<AggKey>, BqRtDetailLogLine_aggregate> ktRtDetail =
> >> rtRekey.groupByKey().aggregate(
> >> BqRtDetailLogLine_aggregate::new,
> >> new PRTAggregate(),
> >> TimeWindows.of(60 * 60 * 1000L),
> >> collectorSerde, "prt_minute_agg_stream");
> >>
> >> –
> >>
> >> Best regards,
> >> Radek Gruchalski
> >> ra...@gruchalski.com
> >>
> >>
> >> On December 6, 2016 at 6:47:38 PM, Jon Yeargers (
> jon.yearg...@cedexis.com)
> >> wrote:
> >>
> >> If I comment out the aggregation step and just .print the .map step I
> >> don't hit the error. It's coming from aggregating the non-String key.
> >>
> >> On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski <ra...@gruchalski.com>
> >> wrote:
> >>
> >>> Jon,
> >>>
> >>> Looking at your code:
> >>>
> >>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> >>> Serdes.String().getClass().getName());
> >>>
> >>> and later:
> >>>
> >>> KStream<String, RtDetailLogLine> rtDetailLines =
> >>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);
> >>>
> >>> Is RtDetailLogLine inheriting from String? It is not, as the error
> >>> suggests.
> >>> You may have to write your own Serializer / Deserializer for
> >>> RtDetailLogLine.
> >>>
> >>> –
> >>> Best regards,
> >>> Radek Gruchalski
> >>> ra...@gruchalski.com
> >>>
> >>>
> >>> On December 6, 2016 at 6:28:23 PM, Jon Yeargers (
> >>> jon.yearg...@cedexis.com) wrote:
> >>>
> >>> Using 0.10.1.0
> >>>
> >>> This is my topology:
> >>>
> >>> Properties config = new Properties();
> >>> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
> >>> config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP);
> >>> config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" );
> >>> config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> >>> AggKey.class.getName());
> >>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> >>> Serdes.String().getClass().getName());
> >>>
> >>> JsonSerializer<BqRtDetailLogLine_aggregate> sumRecordsSerializer = new
> >>> JsonSerializer<>();
> >>> JsonDeserializer<BqRtDetailLogLine_aggregate> sumRecordsDeserializer =
> >>> new
> >>> JsonDeserializer<>(BqRtDetailLogLine_aggregate.class);
> >>> Serde<BqRtDetailLogLine_aggregate> collectorSerde =
> >>> Serdes.serdeFrom(sumRecordsSerializer,sumRecordsDeserializer);
> >>>
> >>> StringSerializer stringSerializer = new StringSerializer();
> >>> StringDeserializer stringDeserializer = new StringDeserializer();
> >>> Serde<String> stringSerde =
> >>> Serdes.serdeFrom(stringSerializer,stringDeserializer);
> >>>
> >>> JsonDeserializer<RtDetailLogLine> prtRecordDeserializer = new
> >>> JsonDeserializer<>(RtDetailLogLine.class);
> >>> JsonSerializer<RtDetailLogLine> prtRecordJsonSerializer = new
> >>> JsonSerializer<>();
> >>> Serde<RtDetailLogLine> prtRecordSerde =
> >>> Serdes.serdeFrom(prtRecordJsonSerializer,prtRecordDeserializer);
> >>>
> >>> KStreamBuilder kStreamBuilder = new KStreamBuilder();
> >>>
> >>> KStream<String, RtDetailLogLine> rtDetailLines =
> >>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);
> >>>
> >>> // change the keying
> >>> KStream<AggKey, RtDetailLogLine> rtRekey = rtDetailLines.map((key,
> value)
> >>> -> new KeyValue<>(new AggKey(value), value));
> >>>
> >>> KTable<Windowed<AggKey>, BqRtDetailLogLine_aggregate> ktRtDetail =
> >>> rtRekey.groupByKey().aggregate(
> >>> BqRtDetailLogLine_aggregate::new,
> >>> new PRTAggregate(),
> >>> TimeWindows.of(60 * 60 * 1000L),
> >>> collectorSerde, "prt_minute_agg_stream");
> >>>
> >>> ktRtDetail.toStream().print();
> >>>
> >>> KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config);
> >>>
> >>> kafkaStreams.start();
> >>>
> >>>
> >>> On Tue, Dec 6, 2016 at 6:48 AM, Damian Guy <damian....@gmail.com>
> wrote:
> >>>
> >>> > Hi Jon,
> >>> >
> >>> > A couple of things: Which version are you using?
> >>> > Can you share the code you are using to the build the topology?
> >>> >
> >>> > Thanks,
> >>> > Damian
> >>> >
> >>> > On Tue, 6 Dec 2016 at 14:44 Jon Yeargers <jon.yearg...@cedexis.com>
> >>> wrote:
> >>> >
> >>> > > Im using .map to convert my (k/v) string/Object to Object/Object
> but
> >>> > when I
> >>> > > chain this to an aggregation step Im getting this exception:
> >>> > >
> >>> > > Exception in thread "StreamThread-1" java.lang.ClassCastException:
> >>> > > com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to
> >>> > > java.lang.String
> >>> > > at
> >>> > >
> >>> > > org.apache.kafka.common.serialization.StringSerializer.serialize(
> >>> > StringSerializer.java:24)
> >>> > > at
> >>> > >
> >>> > > org.apache.kafka.streams.processor.internals.RecordCollector.send(
> >>> > RecordCollector.java:73)
> >>> > > at
> >>> > >
> >>> > > org.apache.kafka.streams.processor.internals.SinkNode.
> >>> > process(SinkNode.java:72)
> >>> > > at
> >>> > >
> >>> > > org.apache.kafka.streams.processor.internals.
> >>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> >>> > > at
> >>> > >
> >>> > > org.apache.kafka.streams.kstream.internals.KStreamFilter$
> >>> > KStreamFilterProcessor.process(KStreamFilter.java:44)
> >>> > > at
> >>> > >
> >>> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> >>> > ProcessorNode.java:82)
> >>> > > at
> >>> > >
> >>> > > org.apache.kafka.streams.processor.internals.
> >>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> >>> > > at
> >>> > >
> >>> > > org.apache.kafka.streams.kstream.internals.KStreamMap$
> >>> > KStreamMapProcessor.process(KStreamMap.java:43)
> >>> > > at
> >>> > >
> >>> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> >>> > ProcessorNode.java:82)
> >>> > > at
> >>> > >
> >>> > > org.apache.kafka.streams.processor.internals.
> >>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> >>> > > at
> >>> > >
> >>> > > org.apache.kafka.streams.processor.internals.
> >>> > SourceNode.process(SourceNode.java:66)
> >>> > > at
> >>> > >
> >>> > > org.apache.kafka.streams.processor.internals.
> >>> > StreamTask.process(StreamTask.java:181)
> >>> > > at
> >>> > >
> >>> > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> >>> > StreamThread.java:436)
> >>> > > at
> >>> > >
> >>> > > org.apache.kafka.streams.processor.internals.
> >>> > StreamThread.run(StreamThread.java:242)
> >>> > >
> >>> > > My key object implements Serde and returns a JsonSerializer for the
> >>> > > 'Serializer()' override.
> >>> > >
> >>> > > In the config for the topology Im
> >>> > > setting: config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> >>> > > AggKey.class.getName());
> >>> > >
> >>> > > Where else do I need to specify the (de)serializer for my key
> class?
> >>> > >
> >>> >
> >>>
> >>>
> >>
> >
>

Reply via email to