[jira] [Created] (KAFKA-914) Deadlock between initial rebalance and watcher-triggered rebalances
Joel Koshy created KAFKA-914: Summary: Deadlock between initial rebalance and watcher-triggered rebalances Key: KAFKA-914 URL: https://issues.apache.org/jira/browse/KAFKA-914 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Joel Koshy Fix For: 0.8 Summary doesn't give the full picture and the fetcher-manager/fetcher-thread code is very complex so it's a bit hard to articulate the following very clearly. I will try and describe the sequence that results in a deadlock when starting up a large number of consumers at around the same time: - When a consumer's createMessageStream method is called, it initiates an initial inline rebalance. - However, before the above initial rebalance actually begins, a ZK watch may trigger (due to some other consumers starting up) and initiate a rebalance. This happens successfully so fetchers start and start filling up the chunk queues. - Another watch triggers and initiates yet another rebalance. This rebalance attempt tries to close the fetchers. Before the fetchers are stopped, we shutdown the leader-finder-thread to prevent new fetchers from being started. - The shutdown is accomplished by interrupting the leader-finder-thread and then awaiting its shutdown latch. - If the leader-finder-thread still has a partition without leader to process and tries to add a fetcher for it, it will get an exception (InterruptedException if acquiring the partitionMapLock or ClosedByInterruptException if performing an offset request). If we get an InterruptedException the thread's interrupted flag is cleared. - However, the leader-finder-thread may have mu
[jira] [Commented] (KAFKA-901) Kafka server can become unavailable if clients send several metadata requests
[ https://issues.apache.org/jira/browse/KAFKA-901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13663063#comment-13663063 ] Jun Rao commented on KAFKA-901: --- Thanks for the followup patch. Some comments: 60. KafkaApis: The following logging logs the whole request for each partition. This will probably pollute the log. Is it enough just to log the whole request once? if(stateChangeLogger.isTraceEnabled) updateMetadataRequest.partitionStateInfos.foreach(p => stateChangeLogger.trace(("Broker %d handling " + "UpdateMetadata request %s correlation id %d received from controller %d epoch %d for partition %s") .format(brokerId, p._2, updateMetadataRequest.correlationId, updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, p._1))) Is the following logging necessary? If we know a request, we already know what should be in the cache after processing the request. if(stateChangeLogger.isTraceEnabled) stateChangeLogger.trace(("Broker %d caching leader info %s for partition %s in response to UpdateMetadata request sent by controller %d" + " epoch %d with correlation id %d").format(brokerId, partitionState._2, partitionState._1, updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) } > Kafka server can become unavailable if clients send several metadata requests > - > > Key: KAFKA-901 > URL: https://issues.apache.org/jira/browse/KAFKA-901 > Project: Kafka > Issue Type: Bug > Components: replication >Affects Versions: 0.8 >Reporter: Neha Narkhede >Assignee: Neha Narkhede >Priority: Blocker > Attachments: kafka-901-followup.patch, kafka-901.patch, > kafka-901-v2.patch, kafka-901-v4.patch, kafka-901-v5.patch, > metadata-request-improvement.patch > > > Currently, if a broker is bounced without controlled shutdown and there are > several clients talking to the Kafka cluster, each of the clients realize the > unavailability of leaders for some partitions. This leads to several metadata > requests sent to the Kafka brokers. Since metadata requests are pretty slow, > all the I/O threads quickly become busy serving the metadata requests. This > leads to a full request queue, that stalls handling of finished responses > since the same network thread handles requests as well as responses. In this > situation, clients timeout on metadata requests and send more metadata > requests. This quickly makes the Kafka cluster unavailable. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-901) Kafka server can become unavailable if clients send several metadata requests
[ https://issues.apache.org/jira/browse/KAFKA-901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-901: Attachment: kafka-901-followup2.patch I agree. Kept only the 2nd logging message about caching the leader info. Also, both log messages logged only the partition leader info, not the whole request. Fixed another exception in ControllerChannelManager > Kafka server can become unavailable if clients send several metadata requests > - > > Key: KAFKA-901 > URL: https://issues.apache.org/jira/browse/KAFKA-901 > Project: Kafka > Issue Type: Bug > Components: replication >Affects Versions: 0.8 >Reporter: Neha Narkhede >Assignee: Neha Narkhede >Priority: Blocker > Attachments: kafka-901-followup2.patch, kafka-901-followup.patch, > kafka-901.patch, kafka-901-v2.patch, kafka-901-v4.patch, kafka-901-v5.patch, > metadata-request-improvement.patch > > > Currently, if a broker is bounced without controlled shutdown and there are > several clients talking to the Kafka cluster, each of the clients realize the > unavailability of leaders for some partitions. This leads to several metadata > requests sent to the Kafka brokers. Since metadata requests are pretty slow, > all the I/O threads quickly become busy serving the metadata requests. This > leads to a full request queue, that stalls handling of finished responses > since the same network thread handles requests as well as responses. In this > situation, clients timeout on metadata requests and send more metadata > requests. This quickly makes the Kafka cluster unavailable. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-913) [Documentation] 0.8 Quickstart mentions sbt assembly-package-dependency - not needed
[ https://issues.apache.org/jira/browse/KAFKA-913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13663093#comment-13663093 ] Jun Rao commented on KAFKA-913: --- Did you try this on the latest of the 0.8 branch? What OS did you run it on? When I ran it, it works and gives me the following. This is actually a required step. Otherwise, the scripts under bin won't work. ./sbt assembly-package-dependency [info] Loading project definition from /Users/jrao/intellij_workspace/kafka_git/project [info] Set current project to Kafka (in build file:/Users/jrao/intellij_workspace/kafka_git/) [info] Compiling 2 Scala sources to /Users/jrao/intellij_workspace/kafka_git/core/target/scala-2.8.0/classes... [info] Including snappy-java-1.0.4.1.jar [info] Including zookeeper-3.3.4.jar [info] Including metrics-core-2.2.0.jar [info] Including zkclient-0.2.jar [info] Including metrics-annotation-2.2.0.jar [info] Including log4j-1.2.15.jar [info] Including scala-compiler.jar [info] Including slf4j-api-1.7.2.jar [info] Including slf4j-simple-1.6.4.jar [info] Including jopt-simple-3.2.jar [info] Including scala-library.jar [warn] Merging 'META-INF/NOTICE' with strategy 'rename' [warn] Merging 'org/xerial/snappy/native/README' with strategy 'rename' [warn] Merging 'META-INF/maven/org.xerial.snappy/snappy-java/LICENSE' with strategy 'rename' [warn] Merging 'LICENSE.txt' with strategy 'rename' [warn] Merging 'META-INF/LICENSE' with strategy 'rename' [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard' [warn] Strategy 'discard' was applied to a file [warn] Strategy 'rename' was applied to 5 files [success] Total time: 43 s, completed May 21, 2013 9:32:30 AM > [Documentation] 0.8 Quickstart mentions sbt assembly-package-dependency - > not needed > - > > Key: KAFKA-913 > URL: https://issues.apache.org/jira/browse/KAFKA-913 > Project: Kafka > Issue Type: Bug > Components: website >Affects Versions: 0.8 > Environment: 0.8 Git Revision 731ba90 >Reporter: Martin Eigenbrodt > > https://kafka.apache.org/08/quickstart.html says: > > ./sbt update > > ./sbt package > > ./sbt assembly-package-dependency > but assembly-package-dependency fails and is actually not needed to run the > rest of the code. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Resolved] (KAFKA-912) [Documentation] 0.8 Quickstart mentions sbt assembly-package-dependency - not needed
[ https://issues.apache.org/jira/browse/KAFKA-912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-912. --- Resolution: Duplicate > [Documentation] 0.8 Quickstart mentions sbt assembly-package-dependency - > not needed > - > > Key: KAFKA-912 > URL: https://issues.apache.org/jira/browse/KAFKA-912 > Project: Kafka > Issue Type: Bug > Components: website >Affects Versions: 0.8 > Environment: 0.8 Git Revision 731ba90 >Reporter: Martin Eigenbrodt > > https://kafka.apache.org/08/quickstart.html says: > > ./sbt update > > ./sbt package > > ./sbt assembly-package-dependency > but assembly-package-dependency fails and is actually not needed to run the > rest of the code. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Closed] (KAFKA-912) [Documentation] 0.8 Quickstart mentions sbt assembly-package-dependency - not needed
[ https://issues.apache.org/jira/browse/KAFKA-912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao closed KAFKA-912. - > [Documentation] 0.8 Quickstart mentions sbt assembly-package-dependency - > not needed > - > > Key: KAFKA-912 > URL: https://issues.apache.org/jira/browse/KAFKA-912 > Project: Kafka > Issue Type: Bug > Components: website >Affects Versions: 0.8 > Environment: 0.8 Git Revision 731ba90 >Reporter: Martin Eigenbrodt > > https://kafka.apache.org/08/quickstart.html says: > > ./sbt update > > ./sbt package > > ./sbt assembly-package-dependency > but assembly-package-dependency fails and is actually not needed to run the > rest of the code. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-901) Kafka server can become unavailable if clients send several metadata requests
[ https://issues.apache.org/jira/browse/KAFKA-901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13663118#comment-13663118 ] Jun Rao commented on KAFKA-901: --- Thanks for the second followup patch. +1. > Kafka server can become unavailable if clients send several metadata requests > - > > Key: KAFKA-901 > URL: https://issues.apache.org/jira/browse/KAFKA-901 > Project: Kafka > Issue Type: Bug > Components: replication >Affects Versions: 0.8 >Reporter: Neha Narkhede >Assignee: Neha Narkhede >Priority: Blocker > Attachments: kafka-901-followup2.patch, kafka-901-followup.patch, > kafka-901.patch, kafka-901-v2.patch, kafka-901-v4.patch, kafka-901-v5.patch, > metadata-request-improvement.patch > > > Currently, if a broker is bounced without controlled shutdown and there are > several clients talking to the Kafka cluster, each of the clients realize the > unavailability of leaders for some partitions. This leads to several metadata > requests sent to the Kafka brokers. Since metadata requests are pretty slow, > all the I/O threads quickly become busy serving the metadata requests. This > leads to a full request queue, that stalls handling of finished responses > since the same network thread handles requests as well as responses. In this > situation, clients timeout on metadata requests and send more metadata > requests. This quickly makes the Kafka cluster unavailable. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
produce request wire format question
In the version 0.8 wire format for a produce request, does a value of -1 still indicate "use a random partition" as it did for 0.7? Thanks, Dave
[jira] [Commented] (KAFKA-901) Kafka server can become unavailable if clients send several metadata requests
[ https://issues.apache.org/jira/browse/KAFKA-901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13663131#comment-13663131 ] Neha Narkhede commented on KAFKA-901: - Thanks for the quick review, committed it > Kafka server can become unavailable if clients send several metadata requests > - > > Key: KAFKA-901 > URL: https://issues.apache.org/jira/browse/KAFKA-901 > Project: Kafka > Issue Type: Bug > Components: replication >Affects Versions: 0.8 >Reporter: Neha Narkhede >Assignee: Neha Narkhede >Priority: Blocker > Attachments: kafka-901-followup2.patch, kafka-901-followup.patch, > kafka-901.patch, kafka-901-v2.patch, kafka-901-v4.patch, kafka-901-v5.patch, > metadata-request-improvement.patch > > > Currently, if a broker is bounced without controlled shutdown and there are > several clients talking to the Kafka cluster, each of the clients realize the > unavailability of leaders for some partitions. This leads to several metadata > requests sent to the Kafka brokers. Since metadata requests are pretty slow, > all the I/O threads quickly become busy serving the metadata requests. This > leads to a full request queue, that stalls handling of finished responses > since the same network thread handles requests as well as responses. In this > situation, clients timeout on metadata requests and send more metadata > requests. This quickly makes the Kafka cluster unavailable. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-914) Deadlock between initial rebalance and watcher-triggered rebalances
[ https://issues.apache.org/jira/browse/KAFKA-914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13663146#comment-13663146 ] Joel Koshy commented on KAFKA-914: -- One more point: [td3] above does not need to originate from a watcher-triggered rebalance. The initial rebalance can also run into the same deadlock. i.e., as long as one or more watcher-triggered rebalances succeed and start fetchers prior to the initial rebalance, we may end up in this wedged state. E.g., on another instance I saw [td3] but on the main thread: 2013-05-21_17:07:14.34308 "main" prio=10 tid=0x7f5e34008000 nid=0x4e49 waiting on condition [0x7f5e3b41] 2013-05-21_17:07:14.34308java.lang.Thread.State: WAITING (parking) 2013-05-21_17:07:14.34309 at sun.misc.Unsafe.park(Native Method) 2013-05-21_17:07:14.34309 - parking to wait for <0x7f5d36d99fa0> (a java.util.concurrent.CountDownLatch$Sync) 2013-05-21_17:07:14.34309 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) 2013-05-21_17:07:14.34310 at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) 2013-05-21_17:07:14.34310 at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969) 2013-05-21_17:07:14.34310 at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281) 2013-05-21_17:07:14.34311 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207) 2013-05-21_17:07:14.34312 at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36) 2013-05-21_17:07:14.34313 at kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:125) 2013-05-21_17:07:14.34313 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerCo nnector.scala:486) 2013-05-21_17:07:14.34313 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:523) 2013-05-21_17:07:14.34314 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala :420) 2013-05-21_17:07:14.34314 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:373) 2013-05-21_17:07:14.34315 at scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282) 2013-05-21_17:07:14.34315 at scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265) 2013-05-21_17:07:14.34316 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:368) 2013-05-21_17:07:14.34316 - locked <0x7f5d36d4b2e0> (a java.lang.Object) 2013-05-21_17:07:14.34317 at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:678) 2013-05-21_17:07:14.34317 at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.(ZookeeperConsumerConnector.scala:712) 2013-05-21_17:07:14.34318 at kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:140) 2013-05-21_17:07:14.34318 at kafka.tools.MirrorMaker$$anonfun$4.apply(MirrorMaker.scala:118) 2013-05-21_17:07:14.34318 at kafka.tools.MirrorMaker$$anonfun$4.apply(MirrorMaker.scala:118) 2013-05-21_17:07:14.34319 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) 2013-05-21_17:07:14.34319 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) 2013-05-21_17:07:14.34319 at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) 2013-05-21_17:07:14.34320 at scala.collection.immutable.List.foreach(List.scala:45) 2013-05-21_17:07:14.34320 at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) 2013-05-21_17:07:14.34320 at scala.collection.immutable.List.map(List.scala:45) 2013-05-21_17:07:14.34321 at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:118) 2013-05-21_17:07:14.34322 at kafka.tools.MirrorMaker.main(MirrorMaker.scala) > Deadlock between initial rebalance and watcher-triggered rebalances > --- > > Key: KAFKA-914 > URL: https://issues.apache.org/jira/browse/KAFKA-914 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: Joel Koshy > Fix For: 0.8 > >
Re: produce request wire format question
No. In 0.8, if you don't specify a key for a message, it is sent to any of the available partitions. In other words, the partition id is selected on the partition and the server doesn't get -1 as the partition id. Thanks, Neha On Tue, May 21, 2013 at 9:54 AM, Dave Peterson wrote: > In the version 0.8 wire format for a produce request, does a value of -1 > still indicate "use a random partition" as it did for 0.7? > > Thanks, > Dave >
[jira] [Resolved] (KAFKA-913) [Documentation] 0.8 Quickstart mentions sbt assembly-package-dependency - not needed
[ https://issues.apache.org/jira/browse/KAFKA-913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martin Eigenbrodt resolved KAFKA-913. - Resolution: Invalid Indeed. I mixed up the git branches and tried it on "trunk". on 0.8 it works as advertised. I appologize for bothering you. > [Documentation] 0.8 Quickstart mentions sbt assembly-package-dependency - > not needed > - > > Key: KAFKA-913 > URL: https://issues.apache.org/jira/browse/KAFKA-913 > Project: Kafka > Issue Type: Bug > Components: website >Affects Versions: 0.8 > Environment: 0.8 Git Revision 731ba90 >Reporter: Martin Eigenbrodt > > https://kafka.apache.org/08/quickstart.html says: > > ./sbt update > > ./sbt package > > ./sbt assembly-package-dependency > but assembly-package-dependency fails and is actually not needed to run the > rest of the code. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
Re: produce request wire format question
I'm looking at the document entitled "A Guide to the Kafka Protocol" located here: https://cwiki.apache.org/KAFKA/a-guide-to-the-kafka-protocol.html It shows a produce request as containing a number of message sets, which are grouped first by topic and second by partition (a 32-bit integer). However, each message in a message set contains a Key field, which is described as follows: The key is an optional message key that was used for partition assignment. The key can be null. I notice the use of "was" (past tense) above. That seems to suggest that the Key field was once used to specify a partition (at the granularity of each individual message), but the plan for the future is to instead use the 32-bit partition value preceding each message set. Is this correct? If so, when I am creating a produce request for 0.8, what should I use for the 32-bit partition value, and how does this relate to the Key field of each individual message? Ideally, I would like to just send a produce request and let the broker choose the partition. How do I accomplish this in 0.8, and are there plans to change this after 0.8? Thanks, Dave On Tue, May 21, 2013 at 10:47 AM, Neha Narkhede wrote: > No. In 0.8, if you don't specify a key for a message, it is sent to any of > the available partitions. In other words, the partition id is selected on > the partition and the server doesn't get -1 as the partition id. > > Thanks, > Neha > > > On Tue, May 21, 2013 at 9:54 AM, Dave Peterson wrote: > >> In the version 0.8 wire format for a produce request, does a value of -1 >> still indicate "use a random partition" as it did for 0.7? >> >> Thanks, >> Dave >>
Re: produce request wire format question
The key is used by the client to decide which partition to send the message to. By the time the client is creating the produce request, it should be known which partition each message is being sent to. I believe Neha described the behavior of the Java client which sends messages with a null key to any partition. The key is described in past tense because of the use case for persisting keys with messages. The key is persisted through the broker so that a consumer knows what key was used to partition the message on the producer side. I don't believe that you can have the broker decide which partition a message goes to. -- Colin B. On 05/21/2013 11:48 AM, Dave Peterson wrote: > I'm looking at the document entitled "A Guide to the Kafka Protocol" > located here: > > https://cwiki.apache.org/KAFKA/a-guide-to-the-kafka-protocol.html > > It shows a produce request as containing a number of message sets, which are > grouped first by topic and second by partition (a 32-bit integer). > However, each > message in a message set contains a Key field, which is described as follows: > > The key is an optional message key that was used for partition assignment. > The key can be null. > > I notice the use of "was" (past tense) above. That seems to suggest that the > Key field was once used to specify a partition (at the granularity of each > individual message), but the plan for the future is to instead use the 32-bit > partition value preceding each message set. Is this correct? If so, when I > am > creating a produce request for 0.8, what should I use for the 32-bit partition > value, and how does this relate to the Key field of each individual message? > Ideally, I would like to just send a produce request and let the broker choose > the partition. How do I accomplish this in 0.8, and are there plans to change > this after 0.8? > > Thanks, > Dave > > On Tue, May 21, 2013 at 10:47 AM, Neha Narkhede > wrote: >> No. In 0.8, if you don't specify a key for a message, it is sent to any of >> the available partitions. In other words, the partition id is selected on >> the partition and the server doesn't get -1 as the partition id. >> >> Thanks, >> Neha >> >> >> On Tue, May 21, 2013 at 9:54 AM, Dave Peterson wrote: >> >>> In the version 0.8 wire format for a produce request, does a value of -1 >>> still indicate "use a random partition" as it did for 0.7? >>> >>> Thanks, >>> Dave >>>
Re: produce request wire format question
Dave, Colin described the producer behavior of picking the partition for a message before it is sent to Kafka broker correctly. However, I'm interested in knowing your use case a little before to see why you would rather have the broker decide the partition? Thanks, Neha On Tue, May 21, 2013 at 12:05 PM, Colin Blower wrote: > The key is used by the client to decide which partition to send the > message to. By the time the client is creating the produce request, it > should be known which partition each message is being sent to. I believe > Neha described the behavior of the Java client which sends messages with > a null key to any partition. > > The key is described in past tense because of the use case for > persisting keys with messages. The key is persisted through the broker > so that a consumer knows what key was used to partition the message on > the producer side. > > I don't believe that you can have the broker decide which partition a > message goes to. > > -- > Colin B. > > On 05/21/2013 11:48 AM, Dave Peterson wrote: > > I'm looking at the document entitled "A Guide to the Kafka Protocol" > > located here: > > > > https://cwiki.apache.org/KAFKA/a-guide-to-the-kafka-protocol.html > > > > It shows a produce request as containing a number of message sets, which > are > > grouped first by topic and second by partition (a 32-bit integer). > > However, each > > message in a message set contains a Key field, which is described as > follows: > > > > The key is an optional message key that was used for partition > assignment. > > The key can be null. > > > > I notice the use of "was" (past tense) above. That seems to suggest > that the > > Key field was once used to specify a partition (at the granularity of > each > > individual message), but the plan for the future is to instead use the > 32-bit > > partition value preceding each message set. Is this correct? If so, > when I am > > creating a produce request for 0.8, what should I use for the 32-bit > partition > > value, and how does this relate to the Key field of each individual > message? > > Ideally, I would like to just send a produce request and let the broker > choose > > the partition. How do I accomplish this in 0.8, and are there plans to > change > > this after 0.8? > > > > Thanks, > > Dave > > > > On Tue, May 21, 2013 at 10:47 AM, Neha Narkhede > wrote: > >> No. In 0.8, if you don't specify a key for a message, it is sent to any > of > >> the available partitions. In other words, the partition id is selected > on > >> the partition and the server doesn't get -1 as the partition id. > >> > >> Thanks, > >> Neha > >> > >> > >> On Tue, May 21, 2013 at 9:54 AM, Dave Peterson >wrote: > >> > >>> In the version 0.8 wire format for a produce request, does a value of > -1 > >>> still indicate "use a random partition" as it did for 0.7? > >>> > >>> Thanks, > >>> Dave > >>> > > >
Re: produce request wire format question
In my case, there is a load balancer between the producers and the brokers, so I want the behavior described for the Java client (null key specifies "any partition"). If the Key field of each individual message specifies the partition to send it to, then I don't understand the purpose of the 32-bit partition identifier that precedes each message set in a produce request: what if a produce request specifies "partition N" for a given message set, and then each individual message in the set specifies a different partition in its Key field? Also, the above- mentioned partition identifier is a 32-bit integer and the Key field of each individual message can contain data of arbitrary length, which seems inconsistent. Is a partition identifier a 32-bit integer, or can it be of arbitrary length? Thanks, Dave On Tue, May 21, 2013 at 12:30 PM, Neha Narkhede wrote: > Dave, > > Colin described the producer behavior of picking the partition for a > message before it is sent to Kafka broker correctly. However, I'm > interested in knowing your use case a little before to see why you would > rather have the broker decide the partition? > > Thanks, > Neha > > > On Tue, May 21, 2013 at 12:05 PM, Colin Blower wrote: > >> The key is used by the client to decide which partition to send the >> message to. By the time the client is creating the produce request, it >> should be known which partition each message is being sent to. I believe >> Neha described the behavior of the Java client which sends messages with >> a null key to any partition. >> >> The key is described in past tense because of the use case for >> persisting keys with messages. The key is persisted through the broker >> so that a consumer knows what key was used to partition the message on >> the producer side. >> >> I don't believe that you can have the broker decide which partition a >> message goes to. >> >> -- >> Colin B. >> >> On 05/21/2013 11:48 AM, Dave Peterson wrote: >> > I'm looking at the document entitled "A Guide to the Kafka Protocol" >> > located here: >> > >> > https://cwiki.apache.org/KAFKA/a-guide-to-the-kafka-protocol.html >> > >> > It shows a produce request as containing a number of message sets, which >> are >> > grouped first by topic and second by partition (a 32-bit integer). >> > However, each >> > message in a message set contains a Key field, which is described as >> follows: >> > >> > The key is an optional message key that was used for partition >> assignment. >> > The key can be null. >> > >> > I notice the use of "was" (past tense) above. That seems to suggest >> that the >> > Key field was once used to specify a partition (at the granularity of >> each >> > individual message), but the plan for the future is to instead use the >> 32-bit >> > partition value preceding each message set. Is this correct? If so, >> when I am >> > creating a produce request for 0.8, what should I use for the 32-bit >> partition >> > value, and how does this relate to the Key field of each individual >> message? >> > Ideally, I would like to just send a produce request and let the broker >> choose >> > the partition. How do I accomplish this in 0.8, and are there plans to >> change >> > this after 0.8? >> > >> > Thanks, >> > Dave >> > >> > On Tue, May 21, 2013 at 10:47 AM, Neha Narkhede >> wrote: >> >> No. In 0.8, if you don't specify a key for a message, it is sent to any >> of >> >> the available partitions. In other words, the partition id is selected >> on >> >> the partition and the server doesn't get -1 as the partition id. >> >> >> >> Thanks, >> >> Neha >> >> >> >> >> >> On Tue, May 21, 2013 at 9:54 AM, Dave Peterson > >wrote: >> >> >> >>> In the version 0.8 wire format for a produce request, does a value of >> -1 >> >>> still indicate "use a random partition" as it did for 0.7? >> >>> >> >>> Thanks, >> >>> Dave >> >>> >> >> >>
[jira] [Updated] (KAFKA-914) Deadlock between initial rebalance and watcher-triggered rebalances
[ https://issues.apache.org/jira/browse/KAFKA-914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy updated KAFKA-914: - Attachment: KAFKA-914-v1.patch Patch with the mentioned fix. 1 - I added comments with some detail since the manager/fetcher/connector interaction is very tricky. 2 - Passing through throwables while shutting down. The isRunning check is probably unnecessary, but safer to keep. 3 - Made the following changes to the mirrormaker - I can put that in a separate jira as well. a - Currently if no streams are created, the mirrormaker doesn't quit. Setting streams to empty/nil fixes that issue. b - If a consumer-side exception (e.g., iterator timeout) gets thrown the mirror-maker does not exit. Addressed this by awaiting on the consumer threads at the end of the main method. > Deadlock between initial rebalance and watcher-triggered rebalances > --- > > Key: KAFKA-914 > URL: https://issues.apache.org/jira/browse/KAFKA-914 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: Joel Koshy > Fix For: 0.8 > > Attachments: KAFKA-914-v1.patch > > > Summary doesn't give the full picture and the fetcher-manager/fetcher-thread > > > code is very complex so it's a bit hard to articulate the following very > > > clearly. I will try and describe the sequence that results in a deadlock > > > when starting up a large number of consumers at around the same time: > > > > > > - When a consumer's createMessageStream method is called, it initiates an > > > initial inline rebalance. > > > - However, before the above initial rebalance actually begins, a ZK watch > > > may trigger (due to some other consumers starting up) and initiate a > > > rebalance. This happens successfully so fetchers start and start filling > > > up the chunk queues. > > > - Another watch triggers and initiates yet another rebalance. This rebalance > > > attempt tries to close the fetchers. Before the fetchers are stopped, we > > > shutdown the leader-finder-thread to prevent new fetchers from being > > > started. > > > - The shutdown is accomplished by interrupting the leader-finder-thread and > > > then awaiting its shutdown latch. > > > - If the leader-finder-thread still has
[jira] [Commented] (KAFKA-914) Deadlock between initial rebalance and watcher-triggered rebalances
[ https://issues.apache.org/jira/browse/KAFKA-914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13663605#comment-13663605 ] Jun Rao commented on KAFKA-914: --- Thanks for the patch. Looks good. +1. One minor comment: The following statement in the catch clause in MirrorMaker is unnecessary. streams = Nil > Deadlock between initial rebalance and watcher-triggered rebalances > --- > > Key: KAFKA-914 > URL: https://issues.apache.org/jira/browse/KAFKA-914 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: Joel Koshy > Fix For: 0.8 > > Attachments: KAFKA-914-v1.patch > > > Summary doesn't give the full picture and the fetcher-manager/fetcher-thread > > > code is very complex so it's a bit hard to articulate the following very > > > clearly. I will try and describe the sequence that results in a deadlock > > > when starting up a large number of consumers at around the same time: > > > > > > - When a consumer's createMessageStream method is called, it initiates an > > > initial inline rebalance. > > > - However, before the above initial rebalance actually begins, a ZK watch > > > may trigger (due to some other consumers starting up) and initiate a > > > rebalance. This happens successfully so fetchers start and start filling > > > up the chunk queues. > > > - Another watch triggers and initiates yet another rebalance. This rebalance > > > attempt tries to close the fetchers. Before the fetchers are stopped, we > > > shutdown the leader-finder-thread to prevent new fetchers from being > > > started. > > > - The shutdown is accomplished by interrupting the leader-finder-thread and > > > then awaiting its shutdown latch. > > > - If the leader-finder-thread still has a partition without leader to > > > process and tries to add a fetcher for it, it will get an exception > > > (InterruptedException if acquiring the partitionMapLock or >