Hi,

I encountered a issue when using kafka 0.8.2.1 scala consumer. The program
cannot exit normally since it was blocking on committing the offset to
broker (the offset storage is "kafka").

When committing offset to offset manager, the method
ZookeeperConsumerConnector.ensureOffsetManagerConnected is called to ensure
the offset manager is connected. In this method, it depends on the method
ClientUtils.channelToOffsetManager to establish a channel to offset
manager. The following is part of channelToOffsetManager:


def channelToOffsetManager(group: String, zkClient: ZkClient,
socketTimeoutMs: Int = 3000, retryBackOffMs: Int = 1000) = {

 ...

 while (!offsetManagerChannelOpt.isDefined) {


   var coordinatorOpt: Option[Broker] = None


   while (!coordinatorOpt.isDefined) {

     try {

       if (!queryChannel.isConnected)

         queryChannel = channelToAnyBroker(zkClient)

       debug("Querying %s:%d to locate offset manager for
%s.".format(queryChannel.host, queryChannel.port, group))

       queryChannel.send(ConsumerMetadataRequest(group))

       val response = queryChannel.receive()

       val consumerMetadataResponse =
ConsumerMetadataResponse.readFrom(response.buffer)

       debug("Consumer metadata response: " +
consumerMetadataResponse.toString)

       if (consumerMetadataResponse.errorCode == ErrorMapping.NoError)

         coordinatorOpt = consumerMetadataResponse.coordinatorOpt

       else {

         debug("Query to %s:%d to locate offset manager for %s failed -
will retry in %d milliseconds."

              .format(queryChannel.host, queryChannel.port, group,
retryBackOffMs))

         Thread.sleep(retryBackOffMs)

       }

     }

     catch {

       case ioe: IOException =>

         info("Failed to fetch consumer metadata from
%s:%d.".format(queryChannel.host, queryChannel.port))

         queryChannel.disconnect()

     }

   }

   ...

We can see that the method will break the loop until it find a available
channel to offset manager. While, if the thread is interrupted by someone,
the queryChannel cannot be connected. Since the socket.connect will throw
ClosedByInterruptException if the thread has been interrupted. In this
case, the channelToOffsetManager get an IOException and continue to next
round. So that the method enters an infinite loop, and the program can not
exit normally.

I wonder if this is a expected way, or we need to figure out a better way
to handle the interruption. How about add the following code in the loop?

if (Thread.currentThread().isInterrupted) {

    throw new InterruptedException()

}


Thanks,

Meng

Reply via email to