Hi -

I have the following Scala snippet in a Kafka streams application ..


val builder = new KStreamBuilder()
val logRecords: KStream[Array[Byte], LogRecord] =
builder.stream(Serdes.ByteArray(), logRecordSerde, config.toTopic)
val m: KStream[Array[Byte], String] = logRecords.mapValues(hostExtractor)
val n: KStream[String, String] = m.map(
  new KeyValueMapper[Array[Byte], String, KeyValue[String, String]]() {
    override def apply(key: Array[Byte], value: String): KeyValue[String,
String] = new KeyValue(value, value)
  })

val counts: KTable[String, java.lang.Long] =
n.groupByKey().count("HostAggregateCount")


In the above snippet logRecordSerde is a custom implementation of a Serde
based on a Scala case class.

When I run this application, I get the following exception ..

Exception in thread "StreamThread-1"
> org.apache.kafka.streams.errors.StreamsException: Exception caught in
> process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000,
> topic=processed-log, partition=0, offset=0
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:627)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
> Caused by: org.apache.kafka.streams.errors.StreamsException: *A
> serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer
> / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not
> compatible to the actual key or value type (key type: java.lang.String /
> value type: java.lang.String).* Change the default Serdes in StreamConfig
> or provide correct Serdes via method parameters.
> at
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:81)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)


Possibly the exception comes when I try to do a map on the KStream, which
is supposed to return KStream[String, String]. My question is how do I
inject the proper Serde here ? map does not take any Serde as arguments.
The initial topic from where I read has ByteArray and a custom Serde and it
works fine. But then the map blows up ..

Any help will be appreciated ..

regards.



-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Reply via email to