Hi, You should provide the serdes in the `groupByKey()` operation. The `map` will trigger a re-partition in the `groupByKey` as you have changed the key. In fact you could replace the `map` and `groupByKey` with:
m.groupBy(mapper, Serdes.String(), Serdes.String()).count("HostAggregateCount") Thanks, Damian On Tue, 20 Jun 2017 at 12:08 Debasish Ghosh <ghosh.debas...@gmail.com> wrote: > Hi - > > I have the following Scala snippet in a Kafka streams application .. > > > val builder = new KStreamBuilder() > val logRecords: KStream[Array[Byte], LogRecord] = > builder.stream(Serdes.ByteArray(), logRecordSerde, config.toTopic) > val m: KStream[Array[Byte], String] = logRecords.mapValues(hostExtractor) > val n: KStream[String, String] = m.map( > new KeyValueMapper[Array[Byte], String, KeyValue[String, String]]() { > override def apply(key: Array[Byte], value: String): KeyValue[String, > String] = new KeyValue(value, value) > }) > > val counts: KTable[String, java.lang.Long] = > n.groupByKey().count("HostAggregateCount") > > > In the above snippet logRecordSerde is a custom implementation of a Serde > based on a Scala case class. > > When I run this application, I get the following exception .. > > Exception in thread "StreamThread-1" > > org.apache.kafka.streams.errors.StreamsException: Exception caught in > > process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, > > topic=processed-log, partition=0, offset=0 > > at > > > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216) > > at > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:627) > > at > > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) > > Caused by: org.apache.kafka.streams.errors.StreamsException: *A > > serializer (key: > org.apache.kafka.common.serialization.ByteArraySerializer > > / value: org.apache.kafka.common.serialization.ByteArraySerializer) is > not > > compatible to the actual key or value type (key type: java.lang.String / > > value type: java.lang.String).* Change the default Serdes in StreamConfig > > or provide correct Serdes via method parameters. > > at > > > org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:81) > > at > > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83) > > > Possibly the exception comes when I try to do a map on the KStream, which > is supposed to return KStream[String, String]. My question is how do I > inject the proper Serde here ? map does not take any Serde as arguments. > The initial topic from where I read has ByteArray and a custom Serde and it > works fine. But then the map blows up .. > > Any help will be appreciated .. > > regards. > > > > -- > Debasish Ghosh > http://manning.com/ghosh2 > http://manning.com/ghosh > > Twttr: @debasishg > Blog: http://debasishg.blogspot.com > Code: http://github.com/debasishg >