Hi, I encountered a issue when using kafka 0.8.2.1 scala consumer. The program cannot exit normally since it was blocking on committing the offset to broker (the offset storage is "kafka").
When committing offset to offset manager, the method ZookeeperConsumerConnector.ensureOffsetManagerConnected is called to ensure the offset manager is connected. In this method, it depends on the method ClientUtils.channelToOffsetManager to establish a channel to offset manager. The following is part of channelToOffsetManager: def channelToOffsetManager(group: String, zkClient: ZkClient, socketTimeoutMs: Int = 3000, retryBackOffMs: Int = 1000) = { ... while (!offsetManagerChannelOpt.isDefined) { var coordinatorOpt: Option[Broker] = None while (!coordinatorOpt.isDefined) { try { if (!queryChannel.isConnected) queryChannel = channelToAnyBroker(zkClient) debug("Querying %s:%d to locate offset manager for %s.".format(queryChannel.host, queryChannel.port, group)) queryChannel.send(ConsumerMetadataRequest(group)) val response = queryChannel.receive() val consumerMetadataResponse = ConsumerMetadataResponse.readFrom(response.buffer) debug("Consumer metadata response: " + consumerMetadataResponse.toString) if (consumerMetadataResponse.errorCode == ErrorMapping.NoError) coordinatorOpt = consumerMetadataResponse.coordinatorOpt else { debug("Query to %s:%d to locate offset manager for %s failed - will retry in %d milliseconds." .format(queryChannel.host, queryChannel.port, group, retryBackOffMs)) Thread.sleep(retryBackOffMs) } } catch { case ioe: IOException => info("Failed to fetch consumer metadata from %s:%d.".format(queryChannel.host, queryChannel.port)) queryChannel.disconnect() } } ... We can see that the method will break the loop until it find a available channel to offset manager. While, if the thread is interrupted by someone, the queryChannel cannot be connected. Since the socket.connect will throw ClosedByInterruptException if the thread has been interrupted. In this case, the channelToOffsetManager get an IOException and continue to next round. So that the method enters an infinite loop, and the program can not exit normally. I wonder if this is a expected way, or we need to figure out a better way to handle the interruption. How about add the following code in the loop? if (Thread.currentThread().isInterrupted) { throw new InterruptedException() } Thanks, Meng