Anyone ?

On Tue, Sep 6, 2016 at 4:21 PM, Shamik Bandopadhyay <sham...@gmail.com>
wrote:

> Hi,
>
>   I'm trying to send java object using kryo object serializer . The
> producer is able to send the payload to the queue, but I'm having issues
> reading the data in consumer. I'm using consumer group using KafkaStream.
> The consumer code is based out of the example in kafka documentation.
> Here's the consumer code and the corresponding error:
>
> public void run(int a_numThreads) {
> Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
> topicCountMap.put(topic, new Integer(a_numThreads));
> Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
> List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
> executor = Executors.newFixedThreadPool(a_numThreads);
> int threadNumber = 0;
> for (final KafkaStream stream : streams) {
> executor.submit(new ConsumerGroupSerializerObject(stream, threadNumber));
> threadNumber++;
> }
> }
>
> Inside ConsumerGroupSerializerObject's run method,
>
> private KafkaStream m_stream;
>
> public void run() {
> ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
> ByteArrayInputStream in = null;
> ObjectInputStream is = null;
> while (it.hasNext()){
> try{
> in = new ByteArrayInputStream(it.next().message());
> is = new ObjectInputStream(in);
> TextAnalysisRequest req = (TextAnalysisRequest)is.readObject();
> }catch(ClassNotFoundException ex){
> ex.printStackTrace();
> }catch(IOException ex){
> ex.printStackTrace();
> }finally{
> try{
> in.close();
> is.close();
> }catch(IOException ex){
> ex.printStackTrace();
> }
> }
> }
> }
>
> I'm getting exception at the following line:
>
> is = new ObjectInputStream(in);
>
> java.io.StreamCorruptedException: invalid stream header: 01746573
> at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:806)
> at java.io.ObjectInputStream.<init>(ObjectInputStream.java:299)
> at com.test.kafka.consumer.ConsumerGroupSerializerObject.run(
> ConsumerGroupSerializerObject.java:43)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
> Here's the property:
>
> Properties props = new Properties();
> props.put("zookeeper.connect", a_zookeeper);
> props.put("group.id", a_groupId);
> props.put("zookeeper.session.timeout.ms", "400");
> props.put("zookeeper.sync.time.ms", "200");
> props.put("auto.commit.interval.ms", "1000");
> props.put("auto.offset.reset", "smallest");
> props.put("key.deserializer", "org.apache.kafka.common.serialization.
> StringDeserializer");
> props.put("value.deserializer", KryoReadingSerializer.class.getName());
>
> I'm new to kafka, so not entirely sure if this is right approach of
> consuming message using custom serializer. Moreover, I'm using KafkaStream
> , can it be an issue as well ?
>
> Any pointers will be appreciated.
>
> Thanks,
> Shamik
>

Reply via email to