[ https://issues.apache.org/jira/browse/KAFKA-888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Manikumar resolved KAFKA-888. ----------------------------- Resolution: Cannot Reproduce Pl reopen if you think the issue still exists > problems when shutting down the java consumer . > ----------------------------------------------- > > Key: KAFKA-888 > URL: https://issues.apache.org/jira/browse/KAFKA-888 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 0.8.0 > Environment: Linux kacper-pc 3.2.0-40-generic #64-Ubuntu SMP 2013 > x86_64 x86_64 x86_64 GNU/Linux, scala 2.9.2 > Reporter: kacper chwialkowski > Assignee: Neha Narkhede > Priority: Minor > Labels: bug, consumer, exception > > I got the following error when shutting down the consumer : > ConsumerFetcherThread-test-consumer-group_kacper-pc-1367268338957-12cb5a0b-0-0] > INFO kafka.consumer.SimpleConsumer - Reconnect due to socket error: > java.nio.channels.ClosedByInterruptException: null > at > java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202) > ~[na:1.7.0_21] > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:386) > ~[na:1.7.0_21] > at > sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:220) > ~[na:1.7.0_21] > at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) > ~[na:1.7.0_21] > at > java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) > ~[na:1.7.0_21] > at kafka.utils.Utils$.read(Utils.scala:394) > ~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > at > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > ~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) > ~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > at > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > ~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) > ~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:75) > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > at > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:73) > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112) > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111) > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96) > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > and this is how I create my Consumer > public Boolean call() throws Exception { > Map<String, Integer> topicCountMap = new HashMap<>(); > topicCountMap.put(topic, new Integer(1)); > Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = > consumer.createMessageStreams(topicCountMap); > KafkaStream<byte[], byte[]> stream = > consumerMap.get(topic).get(0); > ConsumerIterator<byte[], byte[]> it = stream.iterator(); > it.next(); > LOGGER.info("Received the message. Shutting down"); > consumer.commitOffsets(); > consumer.shutdown(); > return true; > } -- This message was sent by Atlassian JIRA (v6.4.14#64029)