Hi all;
Im a newbie to kafka. Im trying to publish my java object to kafka topic an
try to consume.
I see there are some API changes in the latest version of the kafka. can
anybody point some samples for how to publish and consume java objects? I
have written my own data serializer, but could not publish that to a topic.
Any guide/samples would be appreciate..


*Customserilaizer*



import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;


import kafka.serializer.Decoder;
import kafka.serializer.Encoder;

public class CustomSerializer implements Encoder<FileObj>,
Decoder< FileObj > {

@Override
public byte[] toBytes(FileObj file) {
try {

ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutput out = null;
byte[] rawFileBytes;
try {
out = new ObjectOutputStream(bos);
out.writeObject(file);
rawFileBytes = bos.toByteArray();

} finally {
try {
if (out != null) {
out.close();
bos.close();
}
} catch (Exception ex) {
ex.getLocalizedMessage();
}

}
return rawFileBytes;
} catch (IOException e) {
e.printStackTrace();
return null;
}

}

@Override
public FileObj fromBytes(byte[] fileContent) {
ByteArrayInputStream bis = new ByteArrayInputStream(fileContent);
ObjectInput in = null;
Object obj = null;
try {
in = new ObjectInputStream(bis);
obj = in.readObject();

} catch (IOException e) {

e.printStackTrace();
} catch (ClassNotFoundException e) {

e.printStackTrace();
} finally {
try {
bis.close();
if (in != null) {
in.close();
}
} catch (IOException ex) {
// ignore
}

}
return (FileObj) obj;
}

}



-Ratha

http://vvratha.blogspot.com/

Reply via email to