Producer<String, byte[]> producer = new Producer<String, byte[]>(config);

Try this.



On Wed, May 21, 2014 at 12:26 AM, Neha Narkhede <neha.narkh...@gmail.com>wrote:

> Pradeep,
>
> If you are writing a POC, I'd suggest you do that using the new producer
> APIs<
> http://people.apache.org/~nehanarkhede/kafka-0.9-producer-javadoc/doc/org/apache/kafka/clients/producer/Producer.html
> >.
> These are much easier to use, exposes more functionality and the new
> producer is faster than the older one. It is currently in beta, slated for
> release in 0.8.2 or 0.9 and we are working on stabilizing it, but it should
> work great for your POC. We'd love to hear feedback on the APIs.
>
> Thanks,
> Neha
>
>
> On Tue, May 20, 2014 at 10:51 AM, Kumar Pradeep <kprad...@novell.com>
> wrote:
>
> > Thanks Pushkar for your response.
> >
> > I tried to send my own byte array; however the Kafka Producer Class does
> > not take byte [] as input type. Do you have an example of this? Please
> > share if you do; really appreciate.
> >
> > Here is my code:
> >
> >
> > public class TestEventProducer {
> >     public static void main(String[] args) {
> >
> >      String topic = "test-topic";
> >      long eventsNum = 10;
> >
> >         Properties props = new Properties();
> >         props.put("metadata.broker.list", "localhost:9092");
> >         props.put("serializer.class", "kafka.serializer.DefaultEncoder
> ");
> >         props.put("request.required.acks", "0");
> >         ProducerConfig config = new ProducerConfig(props);
> >
> >         byte [] rawData;
> >         Producer<String, rawData> producer = new Producer<String,
> > rawData>(config); //compillation error rawData cannot be resolved to a
> type
> >
> >         long start = System.currentTimeMillis();
> >
> >         for (long nEvents = 0; nEvents < eventsNum; nEvents++) {
> >
> >              SimulateEvent event = new SimulateEvent();
> >              try {
> >                 rawData = Serializer.serialize(event);
> >               } catch (IOException e) {
> >                 e.printStackTrace();
> >               }
> >             KeyedMessage<String, rawData> data = new KeyedMessage<String,
> > rawData>(topic, event);
> >             producer.send(data);
> >             System.out.println("produced event#:" + nEvents + " "+ data);
> >         }
> >         System.out.println("Took " + (System.currentTimeMillis() - start)
> > + "to produce " + eventsNum + "messages");
> >         producer.close();
> >     }
> > }
> >
> > public class Serializer {
> >     public static byte[] serialize(Object obj) throws IOException {
> >         ByteArrayOutputStream b = new ByteArrayOutputStream();
> >         ObjectOutputStream o = new ObjectOutputStream(b);
> >         o.writeObject(obj);
> >         return b.toByteArray();
> >     }
> >
> >     public static Object deserialize(byte[] bytes) throws IOException,
> > ClassNotFoundException {
> >         ByteArrayInputStream b = new ByteArrayInputStream(bytes);
> >         ObjectInputStream o = new ObjectInputStream(b);
> >         return o.readObject();
> >     }
> > }
> >
> > >>> pushkar priyadarshi <priyadarshi.push...@gmail.com> 5/20/2014 5:11
> PM
> > >>>
> > you can send byte[] that you get by using your own serializer ; through
> >
> > kafka ().On the reciving side u can deseraialize from the byte[] and read
> >
> > back your object.for using this you will have to
> >
> > supply serializer.class=kafka.serializer.DefaultEncoder in the
> properties.
> >
> >
> >
> > On Tue, May 20, 2014 at 4:23 PM, Kumar Pradeep <kprad...@novell.com>
> > wrote:
> >
> >
> > > I am trying to build a POC with Kafka 0.8.1. I am using my own java
> class
> >
> > > as a Kafka message which has a bunch of String data types. For
> >
> > > serializer.class property in my producer, I cannot use the default
> >
> > > serializer class or the String serializer class that comes with Kafka
> >
> > > library. I guess I need to write my own serializer and feed it to the
> >
> > > producer properties. If you are aware of writing an example custom
> >
> > > serializer in Kafka (in java), please do share. Appreciate a lot,
> thanks
> >
> > > much.
> >
> > >
> >
> > > I tried to use something like below, but I get the exception: Exception
> > in
> >
> > > thread "main" java.lang.NoSuchMethodException:
> >
> > > test.EventsDataSerializer.<init>(kafka.utils.VerifiableProperties)
> >
> > >  at java.lang.Class.getConstructor0(Class.java:2971)
> >
> > >
> >
> > >
> >
> > > package test;
> >
> > >
> >
> > > import java.io.IOException;
> >
> > >
> >
> > > import com.fasterxml.jackson.core.JsonFactory;
> >
> > > import com.fasterxml.jackson.databind.ObjectMapper;
> >
> > >
> >
> > > import kafka.message.Message;
> >
> > > import kafka.serializer.Decoder;
> >
> > > import kafka.serializer.Encoder;
> >
> > >
> >
> > > public  class EventsDataSerializer implements Encoder<SimulateEvent>,
> >
> > > Decoder<SimulateEvent> {
> >
> > >
> >
> > >  public Message toMessage(SimulateEvent eventDetails) {
> >
> > >         try {
> >
> > >             ObjectMapper mapper = new ObjectMapper(new JsonFactory());
> >
> > >             byte[] serialized = mapper.writeValueAsBytes(eventDetails);
> >
> > >             return new Message(serialized);
> >
> > >         } catch (IOException e) {
> >
> > >             e.printStackTrace();
> >
> > >             return null;   // TODO
> >
> > >         }
> >
> > > }
> >
> > >     public SimulateEvent toEvent(Message message) {
> >
> > >      SimulateEvent event = new SimulateEvent();
> >
> > >
> >
> > >         ObjectMapper mapper = new ObjectMapper(new JsonFactory());
> >
> > >         try {
> >
> > >             //TODO handle error
> >
> > >             return mapper.readValue(message.payload().array(),
> >
> > > SimulateEvent.class);
> >
> > >         } catch (IOException e) {
> >
> > >             e.printStackTrace();
> >
> > >             return null;
> >
> > >         }
> >
> > >
> >
> > >     }
> >
> > >
> >
> > >  public byte[] toBytes(SimulateEvent arg0) {
> >
> > >   // TODO Auto-generated method stub
> >
> > >   return null;
> >
> > >  }
> >
> > >  public SimulateEvent fromBytes(byte[] arg0) {
> >
> > >   // TODO Auto-generated method stub
> >
> > >   return null;
> >
> > >  }
> >
> > > }
> >
> > >
> >
> > >
> >
> > >
> >
> >
>

Reply via email to