Thanks Pushkar for your response.

I tried to send my own byte array; however the Kafka Producer Class does not 
take byte [] as input type. Do you have an example of this? Please share if you 
do; really appreciate.

Here is my code:

 
public class TestEventProducer {
    public static void main(String[] args) {
            
        String topic = "test-topic";
        long eventsNum = 10;
            
            Properties props = new Properties();
            props.put("metadata.broker.list", "localhost:9092");
            props.put("serializer.class", "kafka.serializer.DefaultEncoder ");
            props.put("request.required.acks", "0");
            ProducerConfig config = new ProducerConfig(props);
            
            byte [] rawData;
            Producer<String, rawData> producer = new Producer<String, 
rawData>(config); //compillation error rawData cannot be resolved to a type
            
            long start = System.currentTimeMillis();
 
            for (long nEvents = 0; nEvents < eventsNum; nEvents++) { 
                         SimulateEvent event = new SimulateEvent(); 
                         try {
                                rawData = Serializer.serialize(event);
                          } catch (IOException e) {
                                e.printStackTrace();
                          }
                    KeyedMessage<String, rawData> data = new 
KeyedMessage<String, rawData>(topic, event);
                    producer.send(data);
                    System.out.println("produced event#:" + nEvents + " "+ 
data);
            }
            System.out.println("Took " + (System.currentTimeMillis() - start) + 
"to produce " + eventsNum + "messages");
            producer.close();
    }
}
public class Serializer {
    public static byte[] serialize(Object obj) throws IOException {
            ByteArrayOutputStream b = new ByteArrayOutputStream();
            ObjectOutputStream o = new ObjectOutputStream(b);
            o.writeObject(obj);
            return b.toByteArray();
    }

    public static Object deserialize(byte[] bytes) throws IOException, 
ClassNotFoundException {
            ByteArrayInputStream b = new ByteArrayInputStream(bytes);
            ObjectInputStream o = new ObjectInputStream(b);
            return o.readObject();
    }
}
 >>> pushkar priyadarshi <priyadarshi.push...@gmail.com> 5/20/2014 5:11 PM >>>
you can send byte[] that you get by using your own serializer ; through

kafka ().On the reciving side u can deseraialize from the byte[] and read

back your object.for using this you will have to

supply serializer.class=kafka.serializer.DefaultEncoder in the properties.



On Tue, May 20, 2014 at 4:23 PM, Kumar Pradeep <kprad...@novell.com> wrote:


> I am trying to build a POC with Kafka 0.8.1. I am using my own java class

> as a Kafka message which has a bunch of String data types. For

> serializer.class property in my producer, I cannot use the default

> serializer class or the String serializer class that comes with Kafka

> library. I guess I need to write my own serializer and feed it to the

> producer properties. If you are aware of writing an example custom

> serializer in Kafka (in java), please do share. Appreciate a lot, thanks

> much.

>

> I tried to use something like below, but I get the exception: Exception in

> thread "main" java.lang.NoSuchMethodException:

> test.EventsDataSerializer.<init>(kafka.utils.VerifiableProperties)

>  at java.lang.Class.getConstructor0(Class.java:2971)

>

>

> package test;

>

> import java.io.IOException;

>

> import com.fasterxml.jackson.core.JsonFactory;

> import com.fasterxml.jackson.databind.ObjectMapper;

>

> import kafka.message.Message;

> import kafka.serializer.Decoder;

> import kafka.serializer.Encoder;

>

> public  class EventsDataSerializer implements Encoder<SimulateEvent>,

> Decoder<SimulateEvent> {

>

>  public Message toMessage(SimulateEvent eventDetails) {

>                try {

>                        ObjectMapper mapper = new ObjectMapper(new 
> JsonFactory());

>                        byte[] serialized = 
> mapper.writeValueAsBytes(eventDetails);

>                        return new Message(serialized);

>                } catch (IOException e) {

>                        e.printStackTrace();

>                        return null;   // TODO

>                }

> }

>        public SimulateEvent toEvent(Message message) {

>         SimulateEvent event = new SimulateEvent();

>

>                ObjectMapper mapper = new ObjectMapper(new JsonFactory());

>                try {

>                        //TODO handle error

>                        return mapper.readValue(message.payload().array(),

> SimulateEvent.class);

>                } catch (IOException e) {

>                        e.printStackTrace();

>                        return null;

>                }

>

>        }

>

>  public byte[] toBytes(SimulateEvent arg0) {

>   // TODO Auto-generated method stub

>   return null;

>  }

>  public SimulateEvent fromBytes(byte[] arg0) {

>   // TODO Auto-generated method stub

>   return null;

>  }

> }

>

>

>

Reply via email to