Hi,

  I'm trying to send java object using kryo object serializer . The
producer is able to send the payload to the queue, but I'm having issues
reading the data in consumer. I'm using consumer group using KafkaStream.
The consumer code is based out of the example in kafka documentation.
Here's the consumer code and the corresponding error:

public void run(int a_numThreads) {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
executor = Executors.newFixedThreadPool(a_numThreads);
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new ConsumerGroupSerializerObject(stream, threadNumber));
threadNumber++;
}
}

Inside ConsumerGroupSerializerObject's run method,

private KafkaStream m_stream;

public void run() {
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
ByteArrayInputStream in = null;
ObjectInputStream is = null;
while (it.hasNext()){
try{
in = new ByteArrayInputStream(it.next().message());
is = new ObjectInputStream(in);
TextAnalysisRequest req = (TextAnalysisRequest)is.readObject();
}catch(ClassNotFoundException ex){
ex.printStackTrace();
}catch(IOException ex){
ex.printStackTrace();
}finally{
try{
in.close();
is.close();
}catch(IOException ex){
ex.printStackTrace();
}
}
}
}

I'm getting exception at the following line:

is = new ObjectInputStream(in);

java.io.StreamCorruptedException: invalid stream header: 01746573
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:806)
at java.io.ObjectInputStream.<init>(ObjectInputStream.java:299)
at
com.test.kafka.consumer.ConsumerGroupSerializerObject.run(ConsumerGroupSerializerObject.java:43)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


Here's the property:

Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "smallest");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", KryoReadingSerializer.class.getName());

I'm new to kafka, so not entirely sure if this is right approach of
consuming message using custom serializer. Moreover, I'm using KafkaStream
, can it be an issue as well ?

Any pointers will be appreciated.

Thanks,
Shamik

Reply via email to