Anyone ? On Tue, Sep 6, 2016 at 4:21 PM, Shamik Bandopadhyay <sham...@gmail.com> wrote:
> Hi, > > I'm trying to send java object using kryo object serializer . The > producer is able to send the payload to the queue, but I'm having issues > reading the data in consumer. I'm using consumer group using KafkaStream. > The consumer code is based out of the example in kafka documentation. > Here's the consumer code and the corresponding error: > > public void run(int a_numThreads) { > Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); > topicCountMap.put(topic, new Integer(a_numThreads)); > Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = > consumer.createMessageStreams(topicCountMap); > List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); > executor = Executors.newFixedThreadPool(a_numThreads); > int threadNumber = 0; > for (final KafkaStream stream : streams) { > executor.submit(new ConsumerGroupSerializerObject(stream, threadNumber)); > threadNumber++; > } > } > > Inside ConsumerGroupSerializerObject's run method, > > private KafkaStream m_stream; > > public void run() { > ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); > ByteArrayInputStream in = null; > ObjectInputStream is = null; > while (it.hasNext()){ > try{ > in = new ByteArrayInputStream(it.next().message()); > is = new ObjectInputStream(in); > TextAnalysisRequest req = (TextAnalysisRequest)is.readObject(); > }catch(ClassNotFoundException ex){ > ex.printStackTrace(); > }catch(IOException ex){ > ex.printStackTrace(); > }finally{ > try{ > in.close(); > is.close(); > }catch(IOException ex){ > ex.printStackTrace(); > } > } > } > } > > I'm getting exception at the following line: > > is = new ObjectInputStream(in); > > java.io.StreamCorruptedException: invalid stream header: 01746573 > at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:806) > at java.io.ObjectInputStream.<init>(ObjectInputStream.java:299) > at com.test.kafka.consumer.ConsumerGroupSerializerObject.run( > ConsumerGroupSerializerObject.java:43) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > > Here's the property: > > Properties props = new Properties(); > props.put("zookeeper.connect", a_zookeeper); > props.put("group.id", a_groupId); > props.put("zookeeper.session.timeout.ms", "400"); > props.put("zookeeper.sync.time.ms", "200"); > props.put("auto.commit.interval.ms", "1000"); > props.put("auto.offset.reset", "smallest"); > props.put("key.deserializer", "org.apache.kafka.common.serialization. > StringDeserializer"); > props.put("value.deserializer", KryoReadingSerializer.class.getName()); > > I'm new to kafka, so not entirely sure if this is right approach of > consuming message using custom serializer. Moreover, I'm using KafkaStream > , can it be an issue as well ? > > Any pointers will be appreciated. > > Thanks, > Shamik >