Hello,
Specified below is my code base where I am attempting to marshall a complex
type and receiving the follow error.
Am I missing anything here?
Sending Encoded Messages ..
[error] (run-main) java.lang.ClassCastException: java.lang.String cannot be
cast to com.test.groups.MemberRecord
java.lang.ClassCastException: java.lang.String cannot be cast to
com.test.groups.MemberRecord
~~~~~~~~~~~~~~~~~~~~~~~~~
package com.test.groups
import java.util._
import kafka.javaapi.producer.Producer
import kafka.producer.KeyedMessage
import kafka.producer.ProducerConfig
import kafka.serializer.Encoder
import kafka.message.Message
import java.io.ByteArrayOutputStream
import java.io.DataOutputStream
import kafka.serializer.Decoder
import java.io.ByteArrayInputStream
import java.io.DataInputStream
import kafka.utils.VerifiableProperties
class MemberRecord(val memberId: Int, val name: String, val location:
String) {
override def toString = {
"(" + memberId + "," + name + "," + location + ")"
}
}
class MemberRecordEncoder(props: VerifiableProperties = null) extends
Encoder[MemberRecord] {
def toBytes(member: MemberRecord): Array[Byte] = {
val outputStream = new ByteArrayOutputStream()
val dos = new DataOutputStream(outputStream)
dos.writeInt(member.memberId)
dos.writeUTF(member.name)
dos.writeUTF(member.location)
outputStream.flush
outputStream.toByteArray
}
}
class MemberRecordDecoder(props: VerifiableProperties = null) extends
Decoder[MemberRecord] {
def fromBytes(messageByte: Array[Byte]):MemberRecord = {
val message = new Message(messageByte)
val inputStream = new ByteArrayInputStream(message.payload.array,
message.payload.arrayOffset, message.payload.limit)
val dataInputStream = new DataInputStream(inputStream)
new MemberRecord(dataInputStream.readInt, dataInputStream.readUTF,
dataInputStream.readUTF)
}
}
object SimpleDataProducer {
def main(args: Array[String]) {
val events = 100
val eprops = new Properties
eprops.put("metadata.broker.list", "localhost:9092")
eprops.put("serializer.class", "com.test.groups.MemberRecordEncoder");
eprops.put("request.required.acks", "1")
val econfg = new ProducerConfig(eprops)
val eproducer = new Producer[String, MemberRecord](econfg)
val dataP = new SimpleDataProducer
println(" Sending Encoded Messages .. ")
dataP.sendEncodedMessage(10,eproducer)
println(" Shutting down Producer ")
eproducer.close
println(" Successfully shut down Producer ")
}
}
class SimpleDataProducer {
val rnd = new Random
def sendEncodedMessage(nEvents: Int, producer: Producer[String,
MemberRecord]) {
for (nEvents <- 0 to nEvents) {
val message = new MemberRecord(rnd.nextInt(255), "John", "US")
val producerData = new KeyedMessage[String,
MemberRecord]("topic-Encoded", "encode", message)
producer.send(producerData)
}
}
}