Producer<String, byte[]> producer = new Producer<String, byte[]>(config);
Try this. On Wed, May 21, 2014 at 12:26 AM, Neha Narkhede <neha.narkh...@gmail.com>wrote: > Pradeep, > > If you are writing a POC, I'd suggest you do that using the new producer > APIs< > http://people.apache.org/~nehanarkhede/kafka-0.9-producer-javadoc/doc/org/apache/kafka/clients/producer/Producer.html > >. > These are much easier to use, exposes more functionality and the new > producer is faster than the older one. It is currently in beta, slated for > release in 0.8.2 or 0.9 and we are working on stabilizing it, but it should > work great for your POC. We'd love to hear feedback on the APIs. > > Thanks, > Neha > > > On Tue, May 20, 2014 at 10:51 AM, Kumar Pradeep <kprad...@novell.com> > wrote: > > > Thanks Pushkar for your response. > > > > I tried to send my own byte array; however the Kafka Producer Class does > > not take byte [] as input type. Do you have an example of this? Please > > share if you do; really appreciate. > > > > Here is my code: > > > > > > public class TestEventProducer { > > public static void main(String[] args) { > > > > String topic = "test-topic"; > > long eventsNum = 10; > > > > Properties props = new Properties(); > > props.put("metadata.broker.list", "localhost:9092"); > > props.put("serializer.class", "kafka.serializer.DefaultEncoder > "); > > props.put("request.required.acks", "0"); > > ProducerConfig config = new ProducerConfig(props); > > > > byte [] rawData; > > Producer<String, rawData> producer = new Producer<String, > > rawData>(config); //compillation error rawData cannot be resolved to a > type > > > > long start = System.currentTimeMillis(); > > > > for (long nEvents = 0; nEvents < eventsNum; nEvents++) { > > > > SimulateEvent event = new SimulateEvent(); > > try { > > rawData = Serializer.serialize(event); > > } catch (IOException e) { > > e.printStackTrace(); > > } > > KeyedMessage<String, rawData> data = new KeyedMessage<String, > > rawData>(topic, event); > > producer.send(data); > > System.out.println("produced event#:" + nEvents + " "+ data); > > } > > System.out.println("Took " + (System.currentTimeMillis() - start) > > + "to produce " + eventsNum + "messages"); > > producer.close(); > > } > > } > > > > public class Serializer { > > public static byte[] serialize(Object obj) throws IOException { > > ByteArrayOutputStream b = new ByteArrayOutputStream(); > > ObjectOutputStream o = new ObjectOutputStream(b); > > o.writeObject(obj); > > return b.toByteArray(); > > } > > > > public static Object deserialize(byte[] bytes) throws IOException, > > ClassNotFoundException { > > ByteArrayInputStream b = new ByteArrayInputStream(bytes); > > ObjectInputStream o = new ObjectInputStream(b); > > return o.readObject(); > > } > > } > > > > >>> pushkar priyadarshi <priyadarshi.push...@gmail.com> 5/20/2014 5:11 > PM > > >>> > > you can send byte[] that you get by using your own serializer ; through > > > > kafka ().On the reciving side u can deseraialize from the byte[] and read > > > > back your object.for using this you will have to > > > > supply serializer.class=kafka.serializer.DefaultEncoder in the > properties. > > > > > > > > On Tue, May 20, 2014 at 4:23 PM, Kumar Pradeep <kprad...@novell.com> > > wrote: > > > > > > > I am trying to build a POC with Kafka 0.8.1. I am using my own java > class > > > > > as a Kafka message which has a bunch of String data types. For > > > > > serializer.class property in my producer, I cannot use the default > > > > > serializer class or the String serializer class that comes with Kafka > > > > > library. I guess I need to write my own serializer and feed it to the > > > > > producer properties. If you are aware of writing an example custom > > > > > serializer in Kafka (in java), please do share. Appreciate a lot, > thanks > > > > > much. > > > > > > > > > > I tried to use something like below, but I get the exception: Exception > > in > > > > > thread "main" java.lang.NoSuchMethodException: > > > > > test.EventsDataSerializer.<init>(kafka.utils.VerifiableProperties) > > > > > at java.lang.Class.getConstructor0(Class.java:2971) > > > > > > > > > > > > > > > package test; > > > > > > > > > > import java.io.IOException; > > > > > > > > > > import com.fasterxml.jackson.core.JsonFactory; > > > > > import com.fasterxml.jackson.databind.ObjectMapper; > > > > > > > > > > import kafka.message.Message; > > > > > import kafka.serializer.Decoder; > > > > > import kafka.serializer.Encoder; > > > > > > > > > > public class EventsDataSerializer implements Encoder<SimulateEvent>, > > > > > Decoder<SimulateEvent> { > > > > > > > > > > public Message toMessage(SimulateEvent eventDetails) { > > > > > try { > > > > > ObjectMapper mapper = new ObjectMapper(new JsonFactory()); > > > > > byte[] serialized = mapper.writeValueAsBytes(eventDetails); > > > > > return new Message(serialized); > > > > > } catch (IOException e) { > > > > > e.printStackTrace(); > > > > > return null; // TODO > > > > > } > > > > > } > > > > > public SimulateEvent toEvent(Message message) { > > > > > SimulateEvent event = new SimulateEvent(); > > > > > > > > > > ObjectMapper mapper = new ObjectMapper(new JsonFactory()); > > > > > try { > > > > > //TODO handle error > > > > > return mapper.readValue(message.payload().array(), > > > > > SimulateEvent.class); > > > > > } catch (IOException e) { > > > > > e.printStackTrace(); > > > > > return null; > > > > > } > > > > > > > > > > } > > > > > > > > > > public byte[] toBytes(SimulateEvent arg0) { > > > > > // TODO Auto-generated method stub > > > > > return null; > > > > > } > > > > > public SimulateEvent fromBytes(byte[] arg0) { > > > > > // TODO Auto-generated method stub > > > > > return null; > > > > > } > > > > > } > > > > > > > > > > > > > > > > > > > >