[jira] [Created] (KAFKA-912) [Documentation] 0.8 Quickstart mentions sbt assembly-package-dependency - not needed
Martin Eigenbrodt created KAFKA-912: --- Summary: [Documentation] 0.8 Quickstart mentions sbt assembly-package-dependency - not needed Key: KAFKA-912 URL: https://issues.apache.org/jira/browse/KAFKA-912 Project: Kafka Issue Type: Bug Components: website Affects Versions: 0.8 Environment: 0.8 Git Revision 731ba90 Reporter: Martin Eigenbrodt https://kafka.apache.org/08/quickstart.html says: ./sbt update ./sbt package ./sbt assembly-package-dependency but assembly-package-dependency fails and is actually not needed to run the rest of the code. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Created] (KAFKA-913) [Documentation] 0.8 Quickstart mentions sbt assembly-package-dependency - not needed
Martin Eigenbrodt created KAFKA-913: --- Summary: [Documentation] 0.8 Quickstart mentions sbt assembly-package-dependency - not needed Key: KAFKA-913 URL: https://issues.apache.org/jira/browse/KAFKA-913 Project: Kafka Issue Type: Bug Components: website Affects Versions: 0.8 Environment: 0.8 Git Revision 731ba90 Reporter: Martin Eigenbrodt https://kafka.apache.org/08/quickstart.html says: ./sbt update ./sbt package ./sbt assembly-package-dependency but assembly-package-dependency fails and is actually not needed to run the rest of the code. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Created] (KAFKA-914) Deadlock between initial rebalance and watcher-triggered rebalances
Joel Koshy created KAFKA-914: Summary: Deadlock between initial rebalance and watcher-triggered rebalances Key: KAFKA-914 URL: https://issues.apache.org/jira/browse/KAFKA-914 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Joel Koshy Fix For: 0.8 Summary doesn't give the full picture and the fetcher-manager/fetcher-thread code is very complex so it's a bit hard to articulate the following very clearly. I will try and describe the sequence that results in a deadlock when starting up a large number of consumers at around the same time: - When a consumer's createMessageStream method is called, it initiates an initial inline rebalance. - However, before the above initial rebalance actually begins, a ZK watch may trigger (due to some other consumers starting up) and initiate a rebalance. This happens successfully so fetchers start and start filling up the chunk queues. - Another watch triggers and initiates yet another rebalance. This rebalance attempt tries to close the fetchers. Before the fetchers are stopped, we shutdown the leader-finder-thread to prevent new fetchers from being started. - The shutdown is accomplished by interrupting the leader-finder-thread and then awaiting its shutdown latch. - If the leader-finder-thread still has a partition without leader to process and tries to add a fetcher for it, it will get an exception (InterruptedException if acquiring the partitionMapLock or ClosedByInterruptException if performing an offset request). If we get an InterruptedException the thread's interrupted flag is cleared. - However, the leader-finder-thread may have
[jira] [Updated] (KAFKA-901) Kafka server can become unavailable if clients send several metadata requests
[ https://issues.apache.org/jira/browse/KAFKA-901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-901: Attachment: kafka-901-followup2.patch I agree. Kept only the 2nd logging message about caching the leader info. Also, both log messages logged only the partition leader info, not the whole request. Fixed another exception in ControllerChannelManager Kafka server can become unavailable if clients send several metadata requests - Key: KAFKA-901 URL: https://issues.apache.org/jira/browse/KAFKA-901 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 0.8 Reporter: Neha Narkhede Assignee: Neha Narkhede Priority: Blocker Attachments: kafka-901-followup2.patch, kafka-901-followup.patch, kafka-901.patch, kafka-901-v2.patch, kafka-901-v4.patch, kafka-901-v5.patch, metadata-request-improvement.patch Currently, if a broker is bounced without controlled shutdown and there are several clients talking to the Kafka cluster, each of the clients realize the unavailability of leaders for some partitions. This leads to several metadata requests sent to the Kafka brokers. Since metadata requests are pretty slow, all the I/O threads quickly become busy serving the metadata requests. This leads to a full request queue, that stalls handling of finished responses since the same network thread handles requests as well as responses. In this situation, clients timeout on metadata requests and send more metadata requests. This quickly makes the Kafka cluster unavailable. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-913) [Documentation] 0.8 Quickstart mentions sbt assembly-package-dependency - not needed
[ https://issues.apache.org/jira/browse/KAFKA-913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13663093#comment-13663093 ] Jun Rao commented on KAFKA-913: --- Did you try this on the latest of the 0.8 branch? What OS did you run it on? When I ran it, it works and gives me the following. This is actually a required step. Otherwise, the scripts under bin won't work. ./sbt assembly-package-dependency [info] Loading project definition from /Users/jrao/intellij_workspace/kafka_git/project [info] Set current project to Kafka (in build file:/Users/jrao/intellij_workspace/kafka_git/) [info] Compiling 2 Scala sources to /Users/jrao/intellij_workspace/kafka_git/core/target/scala-2.8.0/classes... [info] Including snappy-java-1.0.4.1.jar [info] Including zookeeper-3.3.4.jar [info] Including metrics-core-2.2.0.jar [info] Including zkclient-0.2.jar [info] Including metrics-annotation-2.2.0.jar [info] Including log4j-1.2.15.jar [info] Including scala-compiler.jar [info] Including slf4j-api-1.7.2.jar [info] Including slf4j-simple-1.6.4.jar [info] Including jopt-simple-3.2.jar [info] Including scala-library.jar [warn] Merging 'META-INF/NOTICE' with strategy 'rename' [warn] Merging 'org/xerial/snappy/native/README' with strategy 'rename' [warn] Merging 'META-INF/maven/org.xerial.snappy/snappy-java/LICENSE' with strategy 'rename' [warn] Merging 'LICENSE.txt' with strategy 'rename' [warn] Merging 'META-INF/LICENSE' with strategy 'rename' [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard' [warn] Strategy 'discard' was applied to a file [warn] Strategy 'rename' was applied to 5 files [success] Total time: 43 s, completed May 21, 2013 9:32:30 AM [Documentation] 0.8 Quickstart mentions sbt assembly-package-dependency - not needed - Key: KAFKA-913 URL: https://issues.apache.org/jira/browse/KAFKA-913 Project: Kafka Issue Type: Bug Components: website Affects Versions: 0.8 Environment: 0.8 Git Revision 731ba90 Reporter: Martin Eigenbrodt https://kafka.apache.org/08/quickstart.html says: ./sbt update ./sbt package ./sbt assembly-package-dependency but assembly-package-dependency fails and is actually not needed to run the rest of the code. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Resolved] (KAFKA-912) [Documentation] 0.8 Quickstart mentions sbt assembly-package-dependency - not needed
[ https://issues.apache.org/jira/browse/KAFKA-912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-912. --- Resolution: Duplicate [Documentation] 0.8 Quickstart mentions sbt assembly-package-dependency - not needed - Key: KAFKA-912 URL: https://issues.apache.org/jira/browse/KAFKA-912 Project: Kafka Issue Type: Bug Components: website Affects Versions: 0.8 Environment: 0.8 Git Revision 731ba90 Reporter: Martin Eigenbrodt https://kafka.apache.org/08/quickstart.html says: ./sbt update ./sbt package ./sbt assembly-package-dependency but assembly-package-dependency fails and is actually not needed to run the rest of the code. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
produce request wire format question
In the version 0.8 wire format for a produce request, does a value of -1 still indicate use a random partition as it did for 0.7? Thanks, Dave
[jira] [Commented] (KAFKA-914) Deadlock between initial rebalance and watcher-triggered rebalances
[ https://issues.apache.org/jira/browse/KAFKA-914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13663146#comment-13663146 ] Joel Koshy commented on KAFKA-914: -- One more point: [td3] above does not need to originate from a watcher-triggered rebalance. The initial rebalance can also run into the same deadlock. i.e., as long as one or more watcher-triggered rebalances succeed and start fetchers prior to the initial rebalance, we may end up in this wedged state. E.g., on another instance I saw [td3] but on the main thread: 2013-05-21_17:07:14.34308 main prio=10 tid=0x7f5e34008000 nid=0x4e49 waiting on condition [0x7f5e3b41] 2013-05-21_17:07:14.34308java.lang.Thread.State: WAITING (parking) 2013-05-21_17:07:14.34309 at sun.misc.Unsafe.park(Native Method) 2013-05-21_17:07:14.34309 - parking to wait for 0x7f5d36d99fa0 (a java.util.concurrent.CountDownLatch$Sync) 2013-05-21_17:07:14.34309 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) 2013-05-21_17:07:14.34310 at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) 2013-05-21_17:07:14.34310 at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969) 2013-05-21_17:07:14.34310 at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281) 2013-05-21_17:07:14.34311 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207) 2013-05-21_17:07:14.34312 at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36) 2013-05-21_17:07:14.34313 at kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:125) 2013-05-21_17:07:14.34313 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerCo nnector.scala:486) 2013-05-21_17:07:14.34313 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:523) 2013-05-21_17:07:14.34314 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala :420) 2013-05-21_17:07:14.34314 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:373) 2013-05-21_17:07:14.34315 at scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282) 2013-05-21_17:07:14.34315 at scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265) 2013-05-21_17:07:14.34316 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:368) 2013-05-21_17:07:14.34316 - locked 0x7f5d36d4b2e0 (a java.lang.Object) 2013-05-21_17:07:14.34317 at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:678) 2013-05-21_17:07:14.34317 at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.init(ZookeeperConsumerConnector.scala:712) 2013-05-21_17:07:14.34318 at kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:140) 2013-05-21_17:07:14.34318 at kafka.tools.MirrorMaker$$anonfun$4.apply(MirrorMaker.scala:118) 2013-05-21_17:07:14.34318 at kafka.tools.MirrorMaker$$anonfun$4.apply(MirrorMaker.scala:118) 2013-05-21_17:07:14.34319 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) 2013-05-21_17:07:14.34319 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) 2013-05-21_17:07:14.34319 at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) 2013-05-21_17:07:14.34320 at scala.collection.immutable.List.foreach(List.scala:45) 2013-05-21_17:07:14.34320 at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) 2013-05-21_17:07:14.34320 at scala.collection.immutable.List.map(List.scala:45) 2013-05-21_17:07:14.34321 at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:118) 2013-05-21_17:07:14.34322 at kafka.tools.MirrorMaker.main(MirrorMaker.scala) Deadlock between initial rebalance and watcher-triggered rebalances --- Key: KAFKA-914 URL: https://issues.apache.org/jira/browse/KAFKA-914 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Joel Koshy Fix For: 0.8 Summary
Re: produce request wire format question
I'm looking at the document entitled A Guide to the Kafka Protocol located here: https://cwiki.apache.org/KAFKA/a-guide-to-the-kafka-protocol.html It shows a produce request as containing a number of message sets, which are grouped first by topic and second by partition (a 32-bit integer). However, each message in a message set contains a Key field, which is described as follows: The key is an optional message key that was used for partition assignment. The key can be null. I notice the use of was (past tense) above. That seems to suggest that the Key field was once used to specify a partition (at the granularity of each individual message), but the plan for the future is to instead use the 32-bit partition value preceding each message set. Is this correct? If so, when I am creating a produce request for 0.8, what should I use for the 32-bit partition value, and how does this relate to the Key field of each individual message? Ideally, I would like to just send a produce request and let the broker choose the partition. How do I accomplish this in 0.8, and are there plans to change this after 0.8? Thanks, Dave On Tue, May 21, 2013 at 10:47 AM, Neha Narkhede neha.narkh...@gmail.com wrote: No. In 0.8, if you don't specify a key for a message, it is sent to any of the available partitions. In other words, the partition id is selected on the partition and the server doesn't get -1 as the partition id. Thanks, Neha On Tue, May 21, 2013 at 9:54 AM, Dave Peterson dspeter...@tagged.comwrote: In the version 0.8 wire format for a produce request, does a value of -1 still indicate use a random partition as it did for 0.7? Thanks, Dave
Re: produce request wire format question
Dave, Colin described the producer behavior of picking the partition for a message before it is sent to Kafka broker correctly. However, I'm interested in knowing your use case a little before to see why you would rather have the broker decide the partition? Thanks, Neha On Tue, May 21, 2013 at 12:05 PM, Colin Blower cblo...@barracuda.comwrote: The key is used by the client to decide which partition to send the message to. By the time the client is creating the produce request, it should be known which partition each message is being sent to. I believe Neha described the behavior of the Java client which sends messages with a null key to any partition. The key is described in past tense because of the use case for persisting keys with messages. The key is persisted through the broker so that a consumer knows what key was used to partition the message on the producer side. I don't believe that you can have the broker decide which partition a message goes to. -- Colin B. On 05/21/2013 11:48 AM, Dave Peterson wrote: I'm looking at the document entitled A Guide to the Kafka Protocol located here: https://cwiki.apache.org/KAFKA/a-guide-to-the-kafka-protocol.html It shows a produce request as containing a number of message sets, which are grouped first by topic and second by partition (a 32-bit integer). However, each message in a message set contains a Key field, which is described as follows: The key is an optional message key that was used for partition assignment. The key can be null. I notice the use of was (past tense) above. That seems to suggest that the Key field was once used to specify a partition (at the granularity of each individual message), but the plan for the future is to instead use the 32-bit partition value preceding each message set. Is this correct? If so, when I am creating a produce request for 0.8, what should I use for the 32-bit partition value, and how does this relate to the Key field of each individual message? Ideally, I would like to just send a produce request and let the broker choose the partition. How do I accomplish this in 0.8, and are there plans to change this after 0.8? Thanks, Dave On Tue, May 21, 2013 at 10:47 AM, Neha Narkhede neha.narkh...@gmail.com wrote: No. In 0.8, if you don't specify a key for a message, it is sent to any of the available partitions. In other words, the partition id is selected on the partition and the server doesn't get -1 as the partition id. Thanks, Neha On Tue, May 21, 2013 at 9:54 AM, Dave Peterson dspeter...@tagged.com wrote: In the version 0.8 wire format for a produce request, does a value of -1 still indicate use a random partition as it did for 0.7? Thanks, Dave
Re: produce request wire format question
In my case, there is a load balancer between the producers and the brokers, so I want the behavior described for the Java client (null key specifies any partition). If the Key field of each individual message specifies the partition to send it to, then I don't understand the purpose of the 32-bit partition identifier that precedes each message set in a produce request: what if a produce request specifies partition N for a given message set, and then each individual message in the set specifies a different partition in its Key field? Also, the above- mentioned partition identifier is a 32-bit integer and the Key field of each individual message can contain data of arbitrary length, which seems inconsistent. Is a partition identifier a 32-bit integer, or can it be of arbitrary length? Thanks, Dave On Tue, May 21, 2013 at 12:30 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Dave, Colin described the producer behavior of picking the partition for a message before it is sent to Kafka broker correctly. However, I'm interested in knowing your use case a little before to see why you would rather have the broker decide the partition? Thanks, Neha On Tue, May 21, 2013 at 12:05 PM, Colin Blower cblo...@barracuda.comwrote: The key is used by the client to decide which partition to send the message to. By the time the client is creating the produce request, it should be known which partition each message is being sent to. I believe Neha described the behavior of the Java client which sends messages with a null key to any partition. The key is described in past tense because of the use case for persisting keys with messages. The key is persisted through the broker so that a consumer knows what key was used to partition the message on the producer side. I don't believe that you can have the broker decide which partition a message goes to. -- Colin B. On 05/21/2013 11:48 AM, Dave Peterson wrote: I'm looking at the document entitled A Guide to the Kafka Protocol located here: https://cwiki.apache.org/KAFKA/a-guide-to-the-kafka-protocol.html It shows a produce request as containing a number of message sets, which are grouped first by topic and second by partition (a 32-bit integer). However, each message in a message set contains a Key field, which is described as follows: The key is an optional message key that was used for partition assignment. The key can be null. I notice the use of was (past tense) above. That seems to suggest that the Key field was once used to specify a partition (at the granularity of each individual message), but the plan for the future is to instead use the 32-bit partition value preceding each message set. Is this correct? If so, when I am creating a produce request for 0.8, what should I use for the 32-bit partition value, and how does this relate to the Key field of each individual message? Ideally, I would like to just send a produce request and let the broker choose the partition. How do I accomplish this in 0.8, and are there plans to change this after 0.8? Thanks, Dave On Tue, May 21, 2013 at 10:47 AM, Neha Narkhede neha.narkh...@gmail.com wrote: No. In 0.8, if you don't specify a key for a message, it is sent to any of the available partitions. In other words, the partition id is selected on the partition and the server doesn't get -1 as the partition id. Thanks, Neha On Tue, May 21, 2013 at 9:54 AM, Dave Peterson dspeter...@tagged.com wrote: In the version 0.8 wire format for a produce request, does a value of -1 still indicate use a random partition as it did for 0.7? Thanks, Dave
[jira] [Updated] (KAFKA-914) Deadlock between initial rebalance and watcher-triggered rebalances
[ https://issues.apache.org/jira/browse/KAFKA-914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy updated KAFKA-914: - Attachment: KAFKA-914-v1.patch Patch with the mentioned fix. 1 - I added comments with some detail since the manager/fetcher/connector interaction is very tricky. 2 - Passing through throwables while shutting down. The isRunning check is probably unnecessary, but safer to keep. 3 - Made the following changes to the mirrormaker - I can put that in a separate jira as well. a - Currently if no streams are created, the mirrormaker doesn't quit. Setting streams to empty/nil fixes that issue. b - If a consumer-side exception (e.g., iterator timeout) gets thrown the mirror-maker does not exit. Addressed this by awaiting on the consumer threads at the end of the main method. Deadlock between initial rebalance and watcher-triggered rebalances --- Key: KAFKA-914 URL: https://issues.apache.org/jira/browse/KAFKA-914 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Joel Koshy Fix For: 0.8 Attachments: KAFKA-914-v1.patch Summary doesn't give the full picture and the fetcher-manager/fetcher-thread code is very complex so it's a bit hard to articulate the following very clearly. I will try and describe the sequence that results in a deadlock when starting up a large number of consumers at around the same time: - When a consumer's createMessageStream method is called, it initiates an initial inline rebalance. - However, before the above initial rebalance actually begins, a ZK watch may trigger (due to some other consumers starting up) and initiate a rebalance. This happens successfully so fetchers start and start filling up the chunk queues. - Another watch triggers and initiates yet another rebalance. This rebalance attempt tries to close the fetchers. Before the fetchers are stopped, we shutdown the leader-finder-thread to prevent new fetchers from being started. - The shutdown is accomplished by interrupting the leader-finder-thread and then awaiting its shutdown latch. - If the leader-finder-thread still has a partition without leader to