[ https://issues.apache.org/jira/browse/KAFKA-937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13692411#comment-13692411 ]
Alexey Ozeritskiy commented on KAFKA-937: ----------------------------------------- kafka.tools.ConsumerOffsetChecker uses SimpleConsumer for OffsetRequest To reproduce just make git pull and run bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group group --zkconnect zk-servers --topic topic The problem is in the following diff: diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index bdeee91..1c28328 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -37,6 +37,7 @@ class SimpleConsumer(val host: String, private val blockingChannel = new BlockingChannel(host, port, bufferSize, BlockingChannel.UseDefaultBufferSize, soTimeout) val brokerInfo = "host_%s-port_%s".format(host, port) private val fetchRequestAndResponseStats = FetchRequestAndResponseStatsRegistry.getFetchRequestAndResponseStats(clientId) + private var isClosed = false private def connect(): BlockingChannel = { close @@ -58,7 +59,8 @@ class SimpleConsumer(val host: String, def close() { lock synchronized { - disconnect() + disconnect() + isClosed = true } } @@ -123,7 +125,7 @@ class SimpleConsumer(val host: String, def getOffsetsBefore(request: OffsetRequest) = OffsetResponse.readFrom(sendRequest(request).buffer) private def getOrMakeConnection() { - if(!blockingChannel.isConnected) { + if(!isClosed && !blockingChannel.isConnected) { connect() } } SimpleConsumer stops working after close (ConsumerOffsetChecker.scala, line 77) > ConsumerFetcherThread can deadlock > ---------------------------------- > > Key: KAFKA-937 > URL: https://issues.apache.org/jira/browse/KAFKA-937 > Project: Kafka > Issue Type: Bug > Components: core > Affects Versions: 0.8 > Reporter: Jun Rao > Assignee: Jun Rao > Fix For: 0.8 > > Attachments: kafka-937_delta.patch, kafka-937.patch > > > We have the following access pattern that can introduce a deadlock. > AbstractFetcherThread.processPartitionsWithError() -> > ConsumerFetcherThread.processPartitionsWithError() -> > ConsumerFetcherManager.addPartitionsWithError() wait for lock -> > LeaderFinderThread holding lock while calling > AbstractFetcherManager.shutdownIdleFetcherThreads() -> > AbstractFetcherManager calling fetcher.shutdown, which needs to wait until > AbstractFetcherThread.processPartitionsWithError() completes. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira