Hi all; Im a newbie to kafka. Im trying to publish my java object to kafka topic an try to consume. I see there are some API changes in the latest version of the kafka. can anybody point some samples for how to publish and consume java objects? I have written my own data serializer, but could not publish that to a topic. Any guide/samples would be appreciate..
*Customserilaizer* import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectInputStream; import java.io.ObjectOutput; import java.io.ObjectOutputStream; import kafka.serializer.Decoder; import kafka.serializer.Encoder; public class CustomSerializer implements Encoder<FileObj>, Decoder< FileObj > { @Override public byte[] toBytes(FileObj file) { try { ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutput out = null; byte[] rawFileBytes; try { out = new ObjectOutputStream(bos); out.writeObject(file); rawFileBytes = bos.toByteArray(); } finally { try { if (out != null) { out.close(); bos.close(); } } catch (Exception ex) { ex.getLocalizedMessage(); } } return rawFileBytes; } catch (IOException e) { e.printStackTrace(); return null; } } @Override public FileObj fromBytes(byte[] fileContent) { ByteArrayInputStream bis = new ByteArrayInputStream(fileContent); ObjectInput in = null; Object obj = null; try { in = new ObjectInputStream(bis); obj = in.readObject(); } catch (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } finally { try { bis.close(); if (in != null) { in.close(); } } catch (IOException ex) { // ignore } } return (FileObj) obj; } } -Ratha http://vvratha.blogspot.com/