Hi, I noticed that String Serializer somehow doesn't do well encoding
special characters such as "ü".
I tried to create a ByteBufferEncoder this way:
import java.nio.ByteBuffer;
import kafka.message.Message;
import kafka.serializer.Encoder;
public class ByteBufferEncoder implements Encoder<ByteBuffer> {
public Message toMessage(ByteBuffer buffer) {
return new Message(buffer);
}
}
but I get this exception [1]
Could you guys please advice on how to fix my encoding issue?
Thanks
[1]
Exception in thread "main" java.lang.RuntimeException: Invalid magic byte 34
at kafka.message.Message.compressionCodec(Message.scala:144)
at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(
ByteBufferMessageSet.scala:112)
at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(
ByteBufferMessageSet.scala:138)
at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(
ByteBufferMessageSet.scala:82)
at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
at kafka.message.MessageSet.foreach(MessageSet.scala:87)
at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$verifyMessageSize(
SyncProducer.scala:139)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp(
ProducerPool.scala:116)
at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
at scala.collection.mutable.ResizableArray$class.foreach(
ResizableArray.scala:57)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
at kafka.producer.ProducerPool.send(ProducerPool.scala:102)
at kafka.producer.Producer.zkSend(Producer.scala:143)
at kafka.producer.Producer.send(Producer.scala:105)
at kafka.javaapi.producer.Producer.send(Producer.scala:104)
at com.lucid.dao.queue.impl.kafka.KafkaProducer.send(KafkaProducer.java:63)