Thanks Pushkar for your response. I tried to send my own byte array; however the Kafka Producer Class does not take byte [] as input type. Do you have an example of this? Please share if you do; really appreciate.
Here is my code: public class TestEventProducer { public static void main(String[] args) { String topic = "test-topic"; long eventsNum = 10; Properties props = new Properties(); props.put("metadata.broker.list", "localhost:9092"); props.put("serializer.class", "kafka.serializer.DefaultEncoder "); props.put("request.required.acks", "0"); ProducerConfig config = new ProducerConfig(props); byte [] rawData; Producer<String, rawData> producer = new Producer<String, rawData>(config); //compillation error rawData cannot be resolved to a type long start = System.currentTimeMillis(); for (long nEvents = 0; nEvents < eventsNum; nEvents++) { SimulateEvent event = new SimulateEvent(); try { rawData = Serializer.serialize(event); } catch (IOException e) { e.printStackTrace(); } KeyedMessage<String, rawData> data = new KeyedMessage<String, rawData>(topic, event); producer.send(data); System.out.println("produced event#:" + nEvents + " "+ data); } System.out.println("Took " + (System.currentTimeMillis() - start) + "to produce " + eventsNum + "messages"); producer.close(); } } public class Serializer { public static byte[] serialize(Object obj) throws IOException { ByteArrayOutputStream b = new ByteArrayOutputStream(); ObjectOutputStream o = new ObjectOutputStream(b); o.writeObject(obj); return b.toByteArray(); } public static Object deserialize(byte[] bytes) throws IOException, ClassNotFoundException { ByteArrayInputStream b = new ByteArrayInputStream(bytes); ObjectInputStream o = new ObjectInputStream(b); return o.readObject(); } } >>> pushkar priyadarshi <priyadarshi.push...@gmail.com> 5/20/2014 5:11 PM >>> you can send byte[] that you get by using your own serializer ; through kafka ().On the reciving side u can deseraialize from the byte[] and read back your object.for using this you will have to supply serializer.class=kafka.serializer.DefaultEncoder in the properties. On Tue, May 20, 2014 at 4:23 PM, Kumar Pradeep <kprad...@novell.com> wrote: > I am trying to build a POC with Kafka 0.8.1. I am using my own java class > as a Kafka message which has a bunch of String data types. For > serializer.class property in my producer, I cannot use the default > serializer class or the String serializer class that comes with Kafka > library. I guess I need to write my own serializer and feed it to the > producer properties. If you are aware of writing an example custom > serializer in Kafka (in java), please do share. Appreciate a lot, thanks > much. > > I tried to use something like below, but I get the exception: Exception in > thread "main" java.lang.NoSuchMethodException: > test.EventsDataSerializer.<init>(kafka.utils.VerifiableProperties) > at java.lang.Class.getConstructor0(Class.java:2971) > > > package test; > > import java.io.IOException; > > import com.fasterxml.jackson.core.JsonFactory; > import com.fasterxml.jackson.databind.ObjectMapper; > > import kafka.message.Message; > import kafka.serializer.Decoder; > import kafka.serializer.Encoder; > > public class EventsDataSerializer implements Encoder<SimulateEvent>, > Decoder<SimulateEvent> { > > public Message toMessage(SimulateEvent eventDetails) { > try { > ObjectMapper mapper = new ObjectMapper(new > JsonFactory()); > byte[] serialized = > mapper.writeValueAsBytes(eventDetails); > return new Message(serialized); > } catch (IOException e) { > e.printStackTrace(); > return null; // TODO > } > } > public SimulateEvent toEvent(Message message) { > SimulateEvent event = new SimulateEvent(); > > ObjectMapper mapper = new ObjectMapper(new JsonFactory()); > try { > //TODO handle error > return mapper.readValue(message.payload().array(), > SimulateEvent.class); > } catch (IOException e) { > e.printStackTrace(); > return null; > } > > } > > public byte[] toBytes(SimulateEvent arg0) { > // TODO Auto-generated method stub > return null; > } > public SimulateEvent fromBytes(byte[] arg0) { > // TODO Auto-generated method stub > return null; > } > } > > >