Hi,

You should provide the serdes in the `groupByKey()` operation. The `map`
will trigger a re-partition in the `groupByKey` as you have changed the
key.
In fact you could replace the `map` and `groupByKey` with:

m.groupBy(mapper, Serdes.String(),
Serdes.String()).count("HostAggregateCount")

Thanks,
Damian

On Tue, 20 Jun 2017 at 12:08 Debasish Ghosh <ghosh.debas...@gmail.com>
wrote:

> Hi -
>
> I have the following Scala snippet in a Kafka streams application ..
>
>
> val builder = new KStreamBuilder()
> val logRecords: KStream[Array[Byte], LogRecord] =
> builder.stream(Serdes.ByteArray(), logRecordSerde, config.toTopic)
> val m: KStream[Array[Byte], String] = logRecords.mapValues(hostExtractor)
> val n: KStream[String, String] = m.map(
>   new KeyValueMapper[Array[Byte], String, KeyValue[String, String]]() {
>     override def apply(key: Array[Byte], value: String): KeyValue[String,
> String] = new KeyValue(value, value)
>   })
>
> val counts: KTable[String, java.lang.Long] =
> n.groupByKey().count("HostAggregateCount")
>
>
> In the above snippet logRecordSerde is a custom implementation of a Serde
> based on a Scala case class.
>
> When I run this application, I get the following exception ..
>
> Exception in thread "StreamThread-1"
> > org.apache.kafka.streams.errors.StreamsException: Exception caught in
> > process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000,
> > topic=processed-log, partition=0, offset=0
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:627)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
> > Caused by: org.apache.kafka.streams.errors.StreamsException: *A
> > serializer (key:
> org.apache.kafka.common.serialization.ByteArraySerializer
> > / value: org.apache.kafka.common.serialization.ByteArraySerializer) is
> not
> > compatible to the actual key or value type (key type: java.lang.String /
> > value type: java.lang.String).* Change the default Serdes in StreamConfig
> > or provide correct Serdes via method parameters.
> > at
> >
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:81)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
>
>
> Possibly the exception comes when I try to do a map on the KStream, which
> is supposed to return KStream[String, String]. My question is how do I
> inject the proper Serde here ? map does not take any Serde as arguments.
> The initial topic from where I read has ByteArray and a custom Serde and it
> works fine. But then the map blows up ..
>
> Any help will be appreciated ..
>
> regards.
>
>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>

Reply via email to