If I'm reading right, your question is more about how to successfully de(serialise) java object? You might want to take a look at the confluent avro schema registry. Using avro schema's you can easily store messages in a java object created by the schema. This way the messages will also be a lot smaller, witch helps performance. And you don't have to maintain you own de(serialiser).
On Tue, Mar 22, 2016 at 3:38 AM Ratha v <vijayara...@gmail.com> wrote: > Hi all; > Im a newbie to kafka. Im trying to publish my java object to kafka topic an > try to consume. > I see there are some API changes in the latest version of the kafka. can > anybody point some samples for how to publish and consume java objects? I > have written my own data serializer, but could not publish that to a topic. > Any guide/samples would be appreciate.. > > > *Customserilaizer* > > > > import java.io.ByteArrayInputStream; > import java.io.ByteArrayOutputStream; > import java.io.IOException; > import java.io.ObjectInput; > import java.io.ObjectInputStream; > import java.io.ObjectOutput; > import java.io.ObjectOutputStream; > > > import kafka.serializer.Decoder; > import kafka.serializer.Encoder; > > public class CustomSerializer implements Encoder<FileObj>, > Decoder< FileObj > { > > @Override > public byte[] toBytes(FileObj file) { > try { > > ByteArrayOutputStream bos = new ByteArrayOutputStream(); > ObjectOutput out = null; > byte[] rawFileBytes; > try { > out = new ObjectOutputStream(bos); > out.writeObject(file); > rawFileBytes = bos.toByteArray(); > > } finally { > try { > if (out != null) { > out.close(); > bos.close(); > } > } catch (Exception ex) { > ex.getLocalizedMessage(); > } > > } > return rawFileBytes; > } catch (IOException e) { > e.printStackTrace(); > return null; > } > > } > > @Override > public FileObj fromBytes(byte[] fileContent) { > ByteArrayInputStream bis = new ByteArrayInputStream(fileContent); > ObjectInput in = null; > Object obj = null; > try { > in = new ObjectInputStream(bis); > obj = in.readObject(); > > } catch (IOException e) { > > e.printStackTrace(); > } catch (ClassNotFoundException e) { > > e.printStackTrace(); > } finally { > try { > bis.close(); > if (in != null) { > in.close(); > } > } catch (IOException ex) { > // ignore > } > > } > return (FileObj) obj; > } > > } > > > > -Ratha > > http://vvratha.blogspot.com/ >