[ 
https://issues.apache.org/jira/browse/KAFKA-937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13692411#comment-13692411
 ] 

Alexey Ozeritskiy commented on KAFKA-937:
-----------------------------------------

kafka.tools.ConsumerOffsetChecker uses SimpleConsumer for OffsetRequest

To reproduce just make git pull and run
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group group 
--zkconnect zk-servers --topic topic

The problem is in the following diff:

diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index bdeee91..1c28328 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -37,6 +37,7 @@ class SimpleConsumer(val host: String,
   private val blockingChannel = new BlockingChannel(host, port, bufferSize, 
BlockingChannel.UseDefaultBufferSize, soTimeout)
   val brokerInfo = "host_%s-port_%s".format(host, port)
   private val fetchRequestAndResponseStats = 
FetchRequestAndResponseStatsRegistry.getFetchRequestAndResponseStats(clientId)
+  private var isClosed = false

   private def connect(): BlockingChannel = {
     close
@@ -58,7 +59,8 @@ class SimpleConsumer(val host: String,

   def close() {
     lock synchronized {
-        disconnect()
+      disconnect()
+      isClosed = true
     }
   }

@@ -123,7 +125,7 @@ class SimpleConsumer(val host: String,
   def getOffsetsBefore(request: OffsetRequest) = 
OffsetResponse.readFrom(sendRequest(request).buffer)

   private def getOrMakeConnection() {
-    if(!blockingChannel.isConnected) {
+    if(!isClosed && !blockingChannel.isConnected) {
       connect()
     }
   }


SimpleConsumer stops working after close (ConsumerOffsetChecker.scala, line 77)
                
> ConsumerFetcherThread can deadlock
> ----------------------------------
>
>                 Key: KAFKA-937
>                 URL: https://issues.apache.org/jira/browse/KAFKA-937
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Jun Rao
>             Fix For: 0.8
>
>         Attachments: kafka-937_delta.patch, kafka-937.patch
>
>
> We have the following access pattern that can introduce a deadlock.
> AbstractFetcherThread.processPartitionsWithError() ->
> ConsumerFetcherThread.processPartitionsWithError() -> 
> ConsumerFetcherManager.addPartitionsWithError() wait for lock ->
> LeaderFinderThread holding lock while calling 
> AbstractFetcherManager.shutdownIdleFetcherThreads() ->
> AbstractFetcherManager calling fetcher.shutdown, which needs to wait until 
> AbstractFetcherThread.processPartitionsWithError() completes.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to