[jira] [Created] (KAFKA-912) [Documentation] 0.8 Quickstart mentions sbt assembly-package-dependency - not needed

2013-05-21 Thread Martin Eigenbrodt (JIRA)
Martin Eigenbrodt created KAFKA-912:
---

 Summary: [Documentation] 0.8 Quickstart  mentions sbt 
assembly-package-dependency - not needed
 Key: KAFKA-912
 URL: https://issues.apache.org/jira/browse/KAFKA-912
 Project: Kafka
  Issue Type: Bug
  Components: website
Affects Versions: 0.8
 Environment: 0.8 Git Revision 731ba90
Reporter: Martin Eigenbrodt


https://kafka.apache.org/08/quickstart.html says:
 ./sbt update
 ./sbt package
 ./sbt assembly-package-dependency

but  assembly-package-dependency fails and is actually not needed to run the 
rest of the code.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (KAFKA-913) [Documentation] 0.8 Quickstart mentions sbt assembly-package-dependency - not needed

2013-05-21 Thread Martin Eigenbrodt (JIRA)
Martin Eigenbrodt created KAFKA-913:
---

 Summary: [Documentation] 0.8 Quickstart  mentions sbt 
assembly-package-dependency - not needed
 Key: KAFKA-913
 URL: https://issues.apache.org/jira/browse/KAFKA-913
 Project: Kafka
  Issue Type: Bug
  Components: website
Affects Versions: 0.8
 Environment: 0.8 Git Revision 731ba90
Reporter: Martin Eigenbrodt


https://kafka.apache.org/08/quickstart.html says:
 ./sbt update
 ./sbt package
 ./sbt assembly-package-dependency

but  assembly-package-dependency fails and is actually not needed to run the 
rest of the code.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (KAFKA-914) Deadlock between initial rebalance and watcher-triggered rebalances

2013-05-21 Thread Joel Koshy (JIRA)
Joel Koshy created KAFKA-914:


 Summary: Deadlock between initial rebalance and watcher-triggered 
rebalances
 Key: KAFKA-914
 URL: https://issues.apache.org/jira/browse/KAFKA-914
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Joel Koshy
 Fix For: 0.8


Summary doesn't give the full picture and the fetcher-manager/fetcher-thread


code is very complex so it's a bit hard to articulate the following very


clearly. I will try and describe the sequence that results in a deadlock


when starting up a large number of consumers at around the same time:   





- When a consumer's createMessageStream method is called, it initiates an   


  initial inline rebalance. 


- However, before the above initial rebalance actually begins, a ZK watch   


  may trigger (due to some other consumers starting up) and initiate a  


  rebalance. This happens successfully so fetchers start and start filling  


  up the chunk queues.  


- Another watch triggers and initiates yet another rebalance. This rebalance


  attempt tries to close the fetchers. Before the fetchers are stopped, we  


  shutdown the leader-finder-thread to prevent new fetchers from being  


  started.  


- The shutdown is accomplished by interrupting the leader-finder-thread and 


  then awaiting its shutdown latch. 


- If the leader-finder-thread still has a partition without leader to   


  process and tries to add a fetcher for it, it will get an exception   


  (InterruptedException if acquiring the partitionMapLock or


  ClosedByInterruptException if performing an offset request). If we get an 


  InterruptedException the thread's interrupted flag is cleared.


- However, the leader-finder-thread may have 

[jira] [Updated] (KAFKA-901) Kafka server can become unavailable if clients send several metadata requests

2013-05-21 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-901:


Attachment: kafka-901-followup2.patch

I agree. Kept only the 2nd logging message about caching the leader info.

Also, both log messages logged only the partition leader info, not the whole 
request. 

Fixed another exception in ControllerChannelManager 

 Kafka server can become unavailable if clients send several metadata requests
 -

 Key: KAFKA-901
 URL: https://issues.apache.org/jira/browse/KAFKA-901
 Project: Kafka
  Issue Type: Bug
  Components: replication
Affects Versions: 0.8
Reporter: Neha Narkhede
Assignee: Neha Narkhede
Priority: Blocker
 Attachments: kafka-901-followup2.patch, kafka-901-followup.patch, 
 kafka-901.patch, kafka-901-v2.patch, kafka-901-v4.patch, kafka-901-v5.patch, 
 metadata-request-improvement.patch


 Currently, if a broker is bounced without controlled shutdown and there are 
 several clients talking to the Kafka cluster, each of the clients realize the 
 unavailability of leaders for some partitions. This leads to several metadata 
 requests sent to the Kafka brokers. Since metadata requests are pretty slow, 
 all the I/O threads quickly become busy serving the metadata requests. This 
 leads to a full request queue, that stalls handling of finished responses 
 since the same network thread handles requests as well as responses. In this 
 situation, clients timeout on metadata requests and send more metadata 
 requests. This quickly makes the Kafka cluster unavailable. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-913) [Documentation] 0.8 Quickstart mentions sbt assembly-package-dependency - not needed

2013-05-21 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13663093#comment-13663093
 ] 

Jun Rao commented on KAFKA-913:
---

Did you try this on the latest of the 0.8 branch? What OS did you run it on? 
When I ran it, it works and gives me the following. This is actually a required 
step. Otherwise, the scripts under bin won't work.

./sbt assembly-package-dependency 
[info] Loading project definition from 
/Users/jrao/intellij_workspace/kafka_git/project
[info] Set current project to Kafka (in build 
file:/Users/jrao/intellij_workspace/kafka_git/)
[info] Compiling 2 Scala sources to 
/Users/jrao/intellij_workspace/kafka_git/core/target/scala-2.8.0/classes...
[info] Including snappy-java-1.0.4.1.jar
[info] Including zookeeper-3.3.4.jar
[info] Including metrics-core-2.2.0.jar
[info] Including zkclient-0.2.jar
[info] Including metrics-annotation-2.2.0.jar
[info] Including log4j-1.2.15.jar
[info] Including scala-compiler.jar
[info] Including slf4j-api-1.7.2.jar
[info] Including slf4j-simple-1.6.4.jar
[info] Including jopt-simple-3.2.jar
[info] Including scala-library.jar
[warn] Merging 'META-INF/NOTICE' with strategy 'rename'
[warn] Merging 'org/xerial/snappy/native/README' with strategy 'rename'
[warn] Merging 'META-INF/maven/org.xerial.snappy/snappy-java/LICENSE' with 
strategy 'rename'
[warn] Merging 'LICENSE.txt' with strategy 'rename'
[warn] Merging 'META-INF/LICENSE' with strategy 'rename'
[warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard'
[warn] Strategy 'discard' was applied to a file
[warn] Strategy 'rename' was applied to 5 files
[success] Total time: 43 s, completed May 21, 2013 9:32:30 AM


 [Documentation] 0.8 Quickstart  mentions sbt assembly-package-dependency - 
 not needed
 -

 Key: KAFKA-913
 URL: https://issues.apache.org/jira/browse/KAFKA-913
 Project: Kafka
  Issue Type: Bug
  Components: website
Affects Versions: 0.8
 Environment: 0.8 Git Revision 731ba90
Reporter: Martin Eigenbrodt

 https://kafka.apache.org/08/quickstart.html says:
  ./sbt update
  ./sbt package
  ./sbt assembly-package-dependency
 but  assembly-package-dependency fails and is actually not needed to run the 
 rest of the code.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Resolved] (KAFKA-912) [Documentation] 0.8 Quickstart mentions sbt assembly-package-dependency - not needed

2013-05-21 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-912.
---

Resolution: Duplicate

 [Documentation] 0.8 Quickstart  mentions sbt assembly-package-dependency - 
 not needed
 -

 Key: KAFKA-912
 URL: https://issues.apache.org/jira/browse/KAFKA-912
 Project: Kafka
  Issue Type: Bug
  Components: website
Affects Versions: 0.8
 Environment: 0.8 Git Revision 731ba90
Reporter: Martin Eigenbrodt

 https://kafka.apache.org/08/quickstart.html says:
  ./sbt update
  ./sbt package
  ./sbt assembly-package-dependency
 but  assembly-package-dependency fails and is actually not needed to run the 
 rest of the code.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


produce request wire format question

2013-05-21 Thread Dave Peterson
In the version 0.8 wire format for a produce request, does a value of -1
still indicate use a random partition as it did for 0.7?

Thanks,
Dave


[jira] [Commented] (KAFKA-914) Deadlock between initial rebalance and watcher-triggered rebalances

2013-05-21 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13663146#comment-13663146
 ] 

Joel Koshy commented on KAFKA-914:
--

One more point: [td3] above does not need to originate from a watcher-triggered 
rebalance. The initial rebalance can also run into the same deadlock. i.e., as 
long as one or more watcher-triggered rebalances succeed and start fetchers 
prior to the initial rebalance, we may end up in this wedged state. E.g., on 
another instance I saw [td3] but on the main thread:

2013-05-21_17:07:14.34308 main prio=10 tid=0x7f5e34008000 nid=0x4e49 
waiting on condition [0x7f5e3b41]
2013-05-21_17:07:14.34308java.lang.Thread.State: WAITING (parking)
2013-05-21_17:07:14.34309   at sun.misc.Unsafe.park(Native Method)
2013-05-21_17:07:14.34309   - parking to wait for  0x7f5d36d99fa0 (a 
java.util.concurrent.CountDownLatch$Sync)
2013-05-21_17:07:14.34309   at 
java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
2013-05-21_17:07:14.34310   at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
2013-05-21_17:07:14.34310   at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
2013-05-21_17:07:14.34310   at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
2013-05-21_17:07:14.34311   at 
java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
2013-05-21_17:07:14.34312   at 
kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
2013-05-21_17:07:14.34313   at 
kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:125)
2013-05-21_17:07:14.34313   at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerCo
nnector.scala:486)
2013-05-21_17:07:14.34313   at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:523)
2013-05-21_17:07:14.34314   at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala
:420)
2013-05-21_17:07:14.34314   at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:373)
2013-05-21_17:07:14.34315   at 
scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)
2013-05-21_17:07:14.34315   at 
scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265)
2013-05-21_17:07:14.34316   at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:368)
2013-05-21_17:07:14.34316   - locked 0x7f5d36d4b2e0 (a 
java.lang.Object)
2013-05-21_17:07:14.34317   at 
kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:678)
2013-05-21_17:07:14.34317   at 
kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.init(ZookeeperConsumerConnector.scala:712)
2013-05-21_17:07:14.34318   at 
kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:140)
2013-05-21_17:07:14.34318   at 
kafka.tools.MirrorMaker$$anonfun$4.apply(MirrorMaker.scala:118)
2013-05-21_17:07:14.34318   at 
kafka.tools.MirrorMaker$$anonfun$4.apply(MirrorMaker.scala:118)
2013-05-21_17:07:14.34319   at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
2013-05-21_17:07:14.34319   at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
2013-05-21_17:07:14.34319   at 
scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
2013-05-21_17:07:14.34320   at 
scala.collection.immutable.List.foreach(List.scala:45)
2013-05-21_17:07:14.34320   at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
2013-05-21_17:07:14.34320   at 
scala.collection.immutable.List.map(List.scala:45)
2013-05-21_17:07:14.34321   at 
kafka.tools.MirrorMaker$.main(MirrorMaker.scala:118)
2013-05-21_17:07:14.34322   at 
kafka.tools.MirrorMaker.main(MirrorMaker.scala)


 Deadlock between initial rebalance and watcher-triggered rebalances
 ---

 Key: KAFKA-914
 URL: https://issues.apache.org/jira/browse/KAFKA-914
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Joel Koshy
 Fix For: 0.8


 Summary 

Re: produce request wire format question

2013-05-21 Thread Dave Peterson
I'm looking at the document entitled A Guide to the Kafka Protocol
located here:

https://cwiki.apache.org/KAFKA/a-guide-to-the-kafka-protocol.html

It shows a produce request as containing a number of message sets, which are
grouped first by topic and second by partition (a 32-bit integer).
However, each
message in a message set contains a Key field, which is described as follows:

The key is an optional message key that was used for partition assignment.
The key can be null.

I notice the use of was (past tense) above.  That seems to suggest that the
Key field was once used to specify a partition (at the granularity of each
individual message), but the plan for the future is to instead use the 32-bit
partition value preceding each message set.  Is this correct?  If so, when I am
creating a produce request for 0.8, what should I use for the 32-bit partition
value, and how does this relate to the Key field of each individual message?
Ideally, I would like to just send a produce request and let the broker choose
the partition.  How do I accomplish this in 0.8, and are there plans to change
this after 0.8?

Thanks,
Dave

On Tue, May 21, 2013 at 10:47 AM, Neha Narkhede neha.narkh...@gmail.com wrote:
 No. In 0.8, if you don't specify a key for a message, it is sent to any of
 the available partitions. In other words, the partition id is selected on
 the partition and the server doesn't get -1 as the partition id.

 Thanks,
 Neha


 On Tue, May 21, 2013 at 9:54 AM, Dave Peterson dspeter...@tagged.comwrote:

 In the version 0.8 wire format for a produce request, does a value of -1
 still indicate use a random partition as it did for 0.7?

 Thanks,
 Dave



Re: produce request wire format question

2013-05-21 Thread Neha Narkhede
Dave,

Colin described the producer behavior of picking the partition for a
message before it is sent to Kafka broker correctly. However, I'm
interested in knowing your use case a little before to see why you would
rather have the broker decide the partition?

Thanks,
Neha


On Tue, May 21, 2013 at 12:05 PM, Colin Blower cblo...@barracuda.comwrote:

 The key is used by the client to decide which partition to send the
 message to. By the time the client is creating the produce request, it
 should be known which partition each message is being sent to. I believe
 Neha described the behavior of the Java client which sends messages with
 a null key to any partition.

 The key is described in past tense because of the use case for
 persisting keys with messages. The key is persisted through the broker
 so that a consumer knows what key was used to partition the message on
 the producer side.

 I don't believe that you can have the broker decide which partition a
 message goes to.

 --
 Colin B.

 On 05/21/2013 11:48 AM, Dave Peterson wrote:
  I'm looking at the document entitled A Guide to the Kafka Protocol
  located here:
 
  https://cwiki.apache.org/KAFKA/a-guide-to-the-kafka-protocol.html
 
  It shows a produce request as containing a number of message sets, which
 are
  grouped first by topic and second by partition (a 32-bit integer).
  However, each
  message in a message set contains a Key field, which is described as
 follows:
 
  The key is an optional message key that was used for partition
 assignment.
  The key can be null.
 
  I notice the use of was (past tense) above.  That seems to suggest
 that the
  Key field was once used to specify a partition (at the granularity of
 each
  individual message), but the plan for the future is to instead use the
 32-bit
  partition value preceding each message set.  Is this correct?  If so,
 when I am
  creating a produce request for 0.8, what should I use for the 32-bit
 partition
  value, and how does this relate to the Key field of each individual
 message?
  Ideally, I would like to just send a produce request and let the broker
 choose
  the partition.  How do I accomplish this in 0.8, and are there plans to
 change
  this after 0.8?
 
  Thanks,
  Dave
 
  On Tue, May 21, 2013 at 10:47 AM, Neha Narkhede neha.narkh...@gmail.com
 wrote:
  No. In 0.8, if you don't specify a key for a message, it is sent to any
 of
  the available partitions. In other words, the partition id is selected
 on
  the partition and the server doesn't get -1 as the partition id.
 
  Thanks,
  Neha
 
 
  On Tue, May 21, 2013 at 9:54 AM, Dave Peterson dspeter...@tagged.com
 wrote:
 
  In the version 0.8 wire format for a produce request, does a value of
 -1
  still indicate use a random partition as it did for 0.7?
 
  Thanks,
  Dave
 





Re: produce request wire format question

2013-05-21 Thread Dave Peterson
In my case, there is a load balancer between the producers and the
brokers, so I want the behavior described for the Java client (null key
specifies any partition).  If the Key field of each individual message
specifies the partition to send it to, then I don't understand the purpose
of the 32-bit partition identifier that precedes each message set in a
produce request: what if a produce request specifies partition N for a
given message set, and then each individual message in the set
specifies a different partition in its Key field?  Also, the above-
mentioned partition identifier is a 32-bit integer and the Key field of
each individual message can contain data of arbitrary length, which
seems inconsistent.  Is a partition identifier a 32-bit integer, or can it
be of arbitrary length?

Thanks,
Dave

On Tue, May 21, 2013 at 12:30 PM, Neha Narkhede neha.narkh...@gmail.com wrote:
 Dave,

 Colin described the producer behavior of picking the partition for a
 message before it is sent to Kafka broker correctly. However, I'm
 interested in knowing your use case a little before to see why you would
 rather have the broker decide the partition?

 Thanks,
 Neha


 On Tue, May 21, 2013 at 12:05 PM, Colin Blower cblo...@barracuda.comwrote:

 The key is used by the client to decide which partition to send the
 message to. By the time the client is creating the produce request, it
 should be known which partition each message is being sent to. I believe
 Neha described the behavior of the Java client which sends messages with
 a null key to any partition.

 The key is described in past tense because of the use case for
 persisting keys with messages. The key is persisted through the broker
 so that a consumer knows what key was used to partition the message on
 the producer side.

 I don't believe that you can have the broker decide which partition a
 message goes to.

 --
 Colin B.

 On 05/21/2013 11:48 AM, Dave Peterson wrote:
  I'm looking at the document entitled A Guide to the Kafka Protocol
  located here:
 
  https://cwiki.apache.org/KAFKA/a-guide-to-the-kafka-protocol.html
 
  It shows a produce request as containing a number of message sets, which
 are
  grouped first by topic and second by partition (a 32-bit integer).
  However, each
  message in a message set contains a Key field, which is described as
 follows:
 
  The key is an optional message key that was used for partition
 assignment.
  The key can be null.
 
  I notice the use of was (past tense) above.  That seems to suggest
 that the
  Key field was once used to specify a partition (at the granularity of
 each
  individual message), but the plan for the future is to instead use the
 32-bit
  partition value preceding each message set.  Is this correct?  If so,
 when I am
  creating a produce request for 0.8, what should I use for the 32-bit
 partition
  value, and how does this relate to the Key field of each individual
 message?
  Ideally, I would like to just send a produce request and let the broker
 choose
  the partition.  How do I accomplish this in 0.8, and are there plans to
 change
  this after 0.8?
 
  Thanks,
  Dave
 
  On Tue, May 21, 2013 at 10:47 AM, Neha Narkhede neha.narkh...@gmail.com
 wrote:
  No. In 0.8, if you don't specify a key for a message, it is sent to any
 of
  the available partitions. In other words, the partition id is selected
 on
  the partition and the server doesn't get -1 as the partition id.
 
  Thanks,
  Neha
 
 
  On Tue, May 21, 2013 at 9:54 AM, Dave Peterson dspeter...@tagged.com
 wrote:
 
  In the version 0.8 wire format for a produce request, does a value of
 -1
  still indicate use a random partition as it did for 0.7?
 
  Thanks,
  Dave
 





[jira] [Updated] (KAFKA-914) Deadlock between initial rebalance and watcher-triggered rebalances

2013-05-21 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-914:
-

Attachment: KAFKA-914-v1.patch

Patch with the mentioned fix.

1 - I added comments with some detail since the manager/fetcher/connector 
interaction is very tricky.
2 - Passing through throwables while shutting down. The isRunning check is 
probably unnecessary, but safer to keep.
3 - Made the following changes to the mirrormaker - I can put that in a 
separate jira as well.
  a - Currently if no streams are created, the mirrormaker doesn't quit. 
Setting streams to empty/nil fixes that issue.
  b - If a consumer-side exception (e.g., iterator timeout) gets thrown the 
mirror-maker does not exit. Addressed this by awaiting on the consumer threads 
at the end of the main method.



 Deadlock between initial rebalance and watcher-triggered rebalances
 ---

 Key: KAFKA-914
 URL: https://issues.apache.org/jira/browse/KAFKA-914
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Joel Koshy
 Fix For: 0.8

 Attachments: KAFKA-914-v1.patch


 Summary doesn't give the full picture and the fetcher-manager/fetcher-thread  
   
 
 code is very complex so it's a bit hard to articulate the following very  
   
 
 clearly. I will try and describe the sequence that results in a deadlock  
   
 
 when starting up a large number of consumers at around the same time: 
   
 
   
   
 
 - When a consumer's createMessageStream method is called, it initiates an 
   
 
   initial inline rebalance.   
   
 
 - However, before the above initial rebalance actually begins, a ZK watch 
   
 
   may trigger (due to some other consumers starting up) and initiate a
   
 
   rebalance. This happens successfully so fetchers start and start filling
   
 
   up the chunk queues.
   
 
 - Another watch triggers and initiates yet another rebalance. This rebalance  
   
 
   attempt tries to close the fetchers. Before the fetchers are stopped, we
   
 
   shutdown the leader-finder-thread to prevent new fetchers from being
   
 
   started.
   
 
 - The shutdown is accomplished by interrupting the leader-finder-thread and   
   
 
   then awaiting its shutdown latch.   
   
 
 - If the leader-finder-thread still has a partition without leader to