Hi - I have the following Scala snippet in a Kafka streams application ..
val builder = new KStreamBuilder() val logRecords: KStream[Array[Byte], LogRecord] = builder.stream(Serdes.ByteArray(), logRecordSerde, config.toTopic) val m: KStream[Array[Byte], String] = logRecords.mapValues(hostExtractor) val n: KStream[String, String] = m.map( new KeyValueMapper[Array[Byte], String, KeyValue[String, String]]() { override def apply(key: Array[Byte], value: String): KeyValue[String, String] = new KeyValue(value, value) }) val counts: KTable[String, java.lang.Long] = n.groupByKey().count("HostAggregateCount") In the above snippet logRecordSerde is a custom implementation of a Serde based on a Scala case class. When I run this application, I get the following exception .. Exception in thread "StreamThread-1" > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, > topic=processed-log, partition=0, offset=0 > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:627) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) > Caused by: org.apache.kafka.streams.errors.StreamsException: *A > serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer > / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not > compatible to the actual key or value type (key type: java.lang.String / > value type: java.lang.String).* Change the default Serdes in StreamConfig > or provide correct Serdes via method parameters. > at > org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:81) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83) Possibly the exception comes when I try to do a map on the KStream, which is supposed to return KStream[String, String]. My question is how do I inject the proper Serde here ? map does not take any Serde as arguments. The initial topic from where I read has ByteArray and a custom Serde and it works fine. But then the map blows up .. Any help will be appreciated .. regards. -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg