[jira] [Updated] (KAFKA-693) Consumer rebalance fails if no leader available for a partition and stops all fetchers
[ https://issues.apache.org/jira/browse/KAFKA-693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maxime Brugidou updated KAFKA-693: -- Attachment: KAFKA-693-v2.patch Consumer rebalance fails if no leader available for a partition and stops all fetchers -- Key: KAFKA-693 URL: https://issues.apache.org/jira/browse/KAFKA-693 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: Maxime Brugidou Assignee: Maxime Brugidou Attachments: KAFKA-693.patch, KAFKA-693-v2.patch, mirror_debug.log, mirror.log I am currently experiencing this with the MirrorMaker but I assume it happens for any rebalance. The symptoms are: I have replication factor of 1 1. If i start the MirrorMaker (bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config mirror-consumer.properties --producer.config mirror-producer.properties --blacklist 'xdummyx' --num.streams=1 --num.producers=1) with a broker down 1.1 I set the refresh.leader.backoff.ms to 60 (10min) so that the ConsumerFetcherManager doesn't retry to often to get the unavailable partitions 1.2 The rebalance starts at the init step and fails: Exception in thread main kafka.common.ConsumerRebalanceFailedException: KafkaMirror_mirror-01-1357893495345-fac86b15 can't rebalance after 4 retries 1.3 After the exception, everything stops (fetchers and queues) 1.4 I attached the full logs (info debug) for this case 2. If i start the MirrorMaker with all the brokers up and then kill a broker 2.1 The first rebalance is successful 2.2 The consumer will handle correctly the broker down and stop the associated ConsumerFetcherThread 2.3 The refresh.leader.backoff.ms to 60 works correctly 2.4 If something triggers a rebalance (new topic, partition reassignment...), then we go back to 1., the rebalance fails and stops everything. I think the desired behavior is to consumer whatever is available, and try later at some intervals. I would be glad to help on that issue although the Consumer code seems a little tough to get on. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-693) Consumer rebalance fails if no leader available for a partition and stops all fetchers
[ https://issues.apache.org/jira/browse/KAFKA-693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13554852#comment-13554852 ] Maxime Brugidou commented on KAFKA-693: --- 10. Created PartitionTopicInfo.InvalidOffset 11. In ConsumerFetcherManager.doWork(), I believe that addFetcher() is called before the partition is removed from noLeaderPartitionSet, if an exception is caught the partition will still be in the noLeaderPartitionSet, so I didn't change anything 12. done 13. done Consumer rebalance fails if no leader available for a partition and stops all fetchers -- Key: KAFKA-693 URL: https://issues.apache.org/jira/browse/KAFKA-693 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: Maxime Brugidou Assignee: Maxime Brugidou Attachments: KAFKA-693.patch, KAFKA-693-v2.patch, mirror_debug.log, mirror.log I am currently experiencing this with the MirrorMaker but I assume it happens for any rebalance. The symptoms are: I have replication factor of 1 1. If i start the MirrorMaker (bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config mirror-consumer.properties --producer.config mirror-producer.properties --blacklist 'xdummyx' --num.streams=1 --num.producers=1) with a broker down 1.1 I set the refresh.leader.backoff.ms to 60 (10min) so that the ConsumerFetcherManager doesn't retry to often to get the unavailable partitions 1.2 The rebalance starts at the init step and fails: Exception in thread main kafka.common.ConsumerRebalanceFailedException: KafkaMirror_mirror-01-1357893495345-fac86b15 can't rebalance after 4 retries 1.3 After the exception, everything stops (fetchers and queues) 1.4 I attached the full logs (info debug) for this case 2. If i start the MirrorMaker with all the brokers up and then kill a broker 2.1 The first rebalance is successful 2.2 The consumer will handle correctly the broker down and stop the associated ConsumerFetcherThread 2.3 The refresh.leader.backoff.ms to 60 works correctly 2.4 If something triggers a rebalance (new topic, partition reassignment...), then we go back to 1., the rebalance fails and stops everything. I think the desired behavior is to consumer whatever is available, and try later at some intervals. I would be glad to help on that issue although the Consumer code seems a little tough to get on. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Created] (KAFKA-706) broker appears to be encoding ProduceResponse, but never sending it
ben fleis created KAFKA-706: --- Summary: broker appears to be encoding ProduceResponse, but never sending it Key: KAFKA-706 URL: https://issues.apache.org/jira/browse/KAFKA-706 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Environment: reproduced on both Mac OS and RH linux, via private node.js client Reporter: ben fleis By all appearances, I seem to be able to convince a broker to periodically encode, but never transmit, a ProduceResponse. Unfortunately my client is proprietary, but I will share it with Neha via LI channels. But I will describe what's going on in the hopes that there's another trivial way to reproduce it. (I did search through JIRA, and haven't found anything that looks like this.) I am running a single instance zookeeper and single broker. I have a client that generates configurable amounts of data, tracking what is produced (both sent and ACK'd), and what is consumed. I was noticing that when using high transfer rates via high frequency single messages, my unack'd queue appeared to be getting continuously larger. So, I outfitted my client to log more information about correlation ids at various stages, and modified the kafka ProducerRequest/ProducerResponse to log (de)serialization of the same. I then used tcpdump to intercept all communications between my client and the broker. Finally, I configured my client to generate 1 message per ~10ms, each payload being approximately 33 bytes; requestAckTimeout was set to 2000ms, and requestAcksRequired was set to 1. I used 10ms as I found that 5ms or less caused my unacked queue to build up due to system speed -- it simply couldn't keep up. 10ms keeps the load high, but just manageable. YMMV with that param. All of this is done on a single host, over loopback. I ran it on both my airbook, and a well setup RH linux box, and found the same problem. At startup, my system logged expired requests - meaning reqs that were sent, but for which no ACK, positive or negative, was seen from the broker, within 1.25x the requestAckTimeout (ie, 2500ms). I would let it settle until the unacked queue was stable at or around 0. What I found is this: ACKs are normally generated within milliseconds. This was demonstrated by my logging added to the scala ProducerRe* classes, and they are normally seen quickly by my client. But when the actual error occurs, namely that a request is ignored, the ProducerResponse class *does* encode the correct correlationId; however, a response containing that ID is never sent over the network, as evidenced by my tcpdump traces. In my experience this would take anywhere from 3-15 seconds to occur after the system was warm, meaning that it's 1 out of several hundred on average that shows the condition. While I can't attach my client code, I could attach logs; but since my intention is to share the code with LI people, I will wait to see if that's useful here. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-649) Cleanup log4j logging
[ https://issues.apache.org/jira/browse/KAFKA-649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13555150#comment-13555150 ] Jun Rao commented on KAFKA-649: --- I can take a look at this. Cleanup log4j logging - Key: KAFKA-649 URL: https://issues.apache.org/jira/browse/KAFKA-649 Project: Kafka Issue Type: Improvement Affects Versions: 0.8 Reporter: Jay Kreps Assignee: Jun Rao Priority: Blocker Review the logs and do the following: 1. Fix confusing or duplicative messages 2. Assess that messages are at the right level (TRACE/DEBUG/INFO/WARN/ERROR) It would also be nice to add a log4j logger for the request logging (i.e. the access log) and another for the controller state change log, since these really have their own use cases. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Assigned] (KAFKA-649) Cleanup log4j logging
[ https://issues.apache.org/jira/browse/KAFKA-649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao reassigned KAFKA-649: - Assignee: Jun Rao Cleanup log4j logging - Key: KAFKA-649 URL: https://issues.apache.org/jira/browse/KAFKA-649 Project: Kafka Issue Type: Improvement Affects Versions: 0.8 Reporter: Jay Kreps Assignee: Jun Rao Priority: Blocker Review the logs and do the following: 1. Fix confusing or duplicative messages 2. Assess that messages are at the right level (TRACE/DEBUG/INFO/WARN/ERROR) It would also be nice to add a log4j logger for the request logging (i.e. the access log) and another for the controller state change log, since these really have their own use cases. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-631) Implement log compaction
[ https://issues.apache.org/jira/browse/KAFKA-631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13555154#comment-13555154 ] Jun Rao commented on KAFKA-631: --- Thanks for the patch. Do you know the revision of trunk on which this patch will apply? I can take a look before you rebase. Implement log compaction Key: KAFKA-631 URL: https://issues.apache.org/jira/browse/KAFKA-631 Project: Kafka Issue Type: New Feature Components: core Affects Versions: 0.8.1 Reporter: Jay Kreps Assignee: Jay Kreps Attachments: KAFKA-631-v1.patch Currently Kafka has only one way to bound the space of the log, namely by deleting old segments. The policy that controls which segments are deleted can be configured based either on the number of bytes to retain or the age of the messages. This makes sense for event or log data which has no notion of primary key. However lots of data has a primary key and consists of updates by primary key. For this data it would be nice to be able to ensure that the log contained at least the last version of every key. As an example, say that the Kafka topic contains a sequence of User Account messages, each capturing the current state of a given user account. Rather than simply discarding old segments, since the set of user accounts is finite, it might make more sense to delete individual records that have been made obsolete by a more recent update for the same key. This would ensure that the topic contained at least the current state of each record. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-702) Deadlock between request handler/processor threads
[ https://issues.apache.org/jira/browse/KAFKA-702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13555219#comment-13555219 ] Sriram Subramanian commented on KAFKA-702: -- I would like to add my thoughts to this. 1. Load shedding arbitrary clients will bound the memory but would essentially cause the system to fail most of the requests and not recover from it till the load goes down. We have quite a few inter-dependencies between requests (producer depends on replica requests, replica depends on produce requests and consumer requests depend on produce requests) and dropping requests would essentially cause the requests depending on it to stay longer in the purgatory and fail. 2. Having client quotas may not work because we do not have one faulty client. Each client can at most have only one request. Few improvements might reduce the failure scenarios 1. Currently replica request wait on a hard limit (min bytes). Instead they could be made to return earlier to free the purgatory and accept more requests during high load scenarios. 2. Direct consumers to read from other replicas in the isr that have lesser load. This is going to be harder. Deadlock between request handler/processor threads -- Key: KAFKA-702 URL: https://issues.apache.org/jira/browse/KAFKA-702 Project: Kafka Issue Type: Bug Components: network Affects Versions: 0.8 Reporter: Joel Koshy Assignee: Jay Kreps Priority: Blocker Labels: bugs Fix For: 0.8 Attachments: KAFKA-702-v1.patch We have seen this a couple of times in the past few days in a test cluster. The request handler and processor threads deadlock on the request/response queues bringing the server to a halt kafka-processor-10251-7 prio=10 tid=0x7f4a0c3c9800 nid=0x4c39 waiting on condition [0x7f46f698e000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0x7f48c9dd2698 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:252) at kafka.network.RequestChannel.sendRequest(RequestChannel.scala:107) at kafka.network.Processor.read(SocketServer.scala:321) at kafka.network.Processor.run(SocketServer.scala:231) at java.lang.Thread.run(Thread.java:619) kafka-request-handler-7 daemon prio=10 tid=0x7f4a0c57f000 nid=0x4c47 waiting on condition [0x7f46f5b8] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0x7f48c9dd6348 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:252) at kafka.network.RequestChannel.sendResponse(RequestChannel.scala:112) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:198) at kafka.server.KafkaApis.handle(KafkaApis.scala:58) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41) at java.lang.Thread.run(Thread.java:619) This is because there is a cycle in the wait-for graph of processor threads and request handler threads. If the request handling slows down on a busy server, the request queue fills up. All processor threads quickly block on adding incoming requests to the request queue. Due to this, those threads do not processes responses filling up their response queues. At this moment, the request handler threads start blocking on adding responses to the respective response queues. This can lead to a deadlock where every thread is holding a lock on one queue and asking a lock for the other queue. This brings the server to a halt where it accepts connections but every request gets timed out. One way to resolve this is by breaking the cycle in the wait-for graph of the request handler and processor threads. Instead of having the processor threads dispatching the responses, we can have one or more dedicated response handler threads that dequeue responses from the queue and write those on the socket. One downside of this approach is that now access to the selector will have to be synchronized. -- This message is
[jira] [Commented] (KAFKA-693) Consumer rebalance fails if no leader available for a partition and stops all fetchers
[ https://issues.apache.org/jira/browse/KAFKA-693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13555251#comment-13555251 ] Jun Rao commented on KAFKA-693: --- Thanks for patch v2. Looks good. Some minor comments: 11. I think we still need to change ConsumerFetcherManager.doWork(): Currently, if we hit an exception when calling addFetcher(), we won't remove any partition from noLeaderPartitionSet, include those that have been processed successfully. We can change it so that we remove each partition from noLeaderPartitionSet after calling addFetcher() successfully. 20. AbstractFetcherThread: Instead of doing initialOffset 0, could we define an isOffsetInvalid() method? Consumer rebalance fails if no leader available for a partition and stops all fetchers -- Key: KAFKA-693 URL: https://issues.apache.org/jira/browse/KAFKA-693 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: Maxime Brugidou Assignee: Maxime Brugidou Attachments: KAFKA-693.patch, KAFKA-693-v2.patch, mirror_debug.log, mirror.log I am currently experiencing this with the MirrorMaker but I assume it happens for any rebalance. The symptoms are: I have replication factor of 1 1. If i start the MirrorMaker (bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config mirror-consumer.properties --producer.config mirror-producer.properties --blacklist 'xdummyx' --num.streams=1 --num.producers=1) with a broker down 1.1 I set the refresh.leader.backoff.ms to 60 (10min) so that the ConsumerFetcherManager doesn't retry to often to get the unavailable partitions 1.2 The rebalance starts at the init step and fails: Exception in thread main kafka.common.ConsumerRebalanceFailedException: KafkaMirror_mirror-01-1357893495345-fac86b15 can't rebalance after 4 retries 1.3 After the exception, everything stops (fetchers and queues) 1.4 I attached the full logs (info debug) for this case 2. If i start the MirrorMaker with all the brokers up and then kill a broker 2.1 The first rebalance is successful 2.2 The consumer will handle correctly the broker down and stop the associated ConsumerFetcherThread 2.3 The refresh.leader.backoff.ms to 60 works correctly 2.4 If something triggers a rebalance (new topic, partition reassignment...), then we go back to 1., the rebalance fails and stops everything. I think the desired behavior is to consumer whatever is available, and try later at some intervals. I would be glad to help on that issue although the Consumer code seems a little tough to get on. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Resolved] (KAFKA-702) Deadlock between request handler/processor threads
[ https://issues.apache.org/jira/browse/KAFKA-702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede resolved KAFKA-702. - Resolution: Fixed Checked this in to proceed with deployment Deadlock between request handler/processor threads -- Key: KAFKA-702 URL: https://issues.apache.org/jira/browse/KAFKA-702 Project: Kafka Issue Type: Bug Components: network Affects Versions: 0.8 Reporter: Joel Koshy Assignee: Jay Kreps Priority: Blocker Labels: bugs Fix For: 0.8 Attachments: KAFKA-702-v1.patch We have seen this a couple of times in the past few days in a test cluster. The request handler and processor threads deadlock on the request/response queues bringing the server to a halt kafka-processor-10251-7 prio=10 tid=0x7f4a0c3c9800 nid=0x4c39 waiting on condition [0x7f46f698e000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0x7f48c9dd2698 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:252) at kafka.network.RequestChannel.sendRequest(RequestChannel.scala:107) at kafka.network.Processor.read(SocketServer.scala:321) at kafka.network.Processor.run(SocketServer.scala:231) at java.lang.Thread.run(Thread.java:619) kafka-request-handler-7 daemon prio=10 tid=0x7f4a0c57f000 nid=0x4c47 waiting on condition [0x7f46f5b8] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0x7f48c9dd6348 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:252) at kafka.network.RequestChannel.sendResponse(RequestChannel.scala:112) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:198) at kafka.server.KafkaApis.handle(KafkaApis.scala:58) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41) at java.lang.Thread.run(Thread.java:619) This is because there is a cycle in the wait-for graph of processor threads and request handler threads. If the request handling slows down on a busy server, the request queue fills up. All processor threads quickly block on adding incoming requests to the request queue. Due to this, those threads do not processes responses filling up their response queues. At this moment, the request handler threads start blocking on adding responses to the respective response queues. This can lead to a deadlock where every thread is holding a lock on one queue and asking a lock for the other queue. This brings the server to a halt where it accepts connections but every request gets timed out. One way to resolve this is by breaking the cycle in the wait-for graph of the request handler and processor threads. Instead of having the processor threads dispatching the responses, we can have one or more dedicated response handler threads that dequeue responses from the queue and write those on the socket. One downside of this approach is that now access to the selector will have to be synchronized. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-702) Deadlock between request handler/processor threads
[ https://issues.apache.org/jira/browse/KAFKA-702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13555350#comment-13555350 ] Neha Narkhede commented on KAFKA-702: - 2. Having client quotas may not work because we do not have one faulty client. Each client can at most have only one request. We understand that. Client quotas are better done probably in terms of expirations per second. Basically, if you setup your partitions with a large replication factor (let's say 6) and set the num.acks in your producer to -1. At the same time, if you set your timeout too low, all requests will timeout and expire. This will allow your client to send many requests that all timeout. Load shedding needs more thought. It is not as straightforward and when we scope it out, we will need to obviously keep in mind consequences of load shedding. Deadlock between request handler/processor threads -- Key: KAFKA-702 URL: https://issues.apache.org/jira/browse/KAFKA-702 Project: Kafka Issue Type: Bug Components: network Affects Versions: 0.8 Reporter: Joel Koshy Assignee: Jay Kreps Priority: Blocker Labels: bugs Fix For: 0.8 Attachments: KAFKA-702-v1.patch We have seen this a couple of times in the past few days in a test cluster. The request handler and processor threads deadlock on the request/response queues bringing the server to a halt kafka-processor-10251-7 prio=10 tid=0x7f4a0c3c9800 nid=0x4c39 waiting on condition [0x7f46f698e000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0x7f48c9dd2698 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:252) at kafka.network.RequestChannel.sendRequest(RequestChannel.scala:107) at kafka.network.Processor.read(SocketServer.scala:321) at kafka.network.Processor.run(SocketServer.scala:231) at java.lang.Thread.run(Thread.java:619) kafka-request-handler-7 daemon prio=10 tid=0x7f4a0c57f000 nid=0x4c47 waiting on condition [0x7f46f5b8] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0x7f48c9dd6348 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:252) at kafka.network.RequestChannel.sendResponse(RequestChannel.scala:112) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:198) at kafka.server.KafkaApis.handle(KafkaApis.scala:58) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41) at java.lang.Thread.run(Thread.java:619) This is because there is a cycle in the wait-for graph of processor threads and request handler threads. If the request handling slows down on a busy server, the request queue fills up. All processor threads quickly block on adding incoming requests to the request queue. Due to this, those threads do not processes responses filling up their response queues. At this moment, the request handler threads start blocking on adding responses to the respective response queues. This can lead to a deadlock where every thread is holding a lock on one queue and asking a lock for the other queue. This brings the server to a halt where it accepts connections but every request gets timed out. One way to resolve this is by breaking the cycle in the wait-for graph of the request handler and processor threads. Instead of having the processor threads dispatching the responses, we can have one or more dedicated response handler threads that dequeue responses from the queue and write those on the socket. One downside of this approach is that now access to the selector will have to be synchronized. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-705) Controlled shutdown doesn't seem to work on more than one broker in a cluster
[ https://issues.apache.org/jira/browse/KAFKA-705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13555352#comment-13555352 ] Joel Koshy commented on KAFKA-705: -- I set up a local cluster of three brokers and created a bunch of topics, replication factor = 2. I was able to do multiple iterations of rolling bounces without issue. Since this was local, I did not use your py script as it kills pid's returned by ps. Would you by any chance be able to provide a scenario to reproduce this locally? That said, I believe John Fung also tried to reproduce this in a distributed environment but was unable to do so; so I'll probably need to take a look at logs in your environment. Controlled shutdown doesn't seem to work on more than one broker in a cluster - Key: KAFKA-705 URL: https://issues.apache.org/jira/browse/KAFKA-705 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: Neha Narkhede Assignee: Joel Koshy Priority: Critical Labels: bugs Attachments: shutdown_brokers_eat.py, shutdown-command I wrote a script (attached here) to basically round robin through the brokers in a cluster doing the following 2 operations on each of them - 1. Send the controlled shutdown admin command. If it succeeds 2. Restart the broker What I've observed is that only one broker is able to finish the above successfully the first time around. For the rest of the iterations, no broker is able to shutdown using the admin command and every single time it fails with the error message stating the same number of leaders on every broker. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-631) Implement log compaction
[ https://issues.apache.org/jira/browse/KAFKA-631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13555470#comment-13555470 ] Jay Kreps commented on KAFKA-631: - 362eba981de40a69ae509a291649531ead6f6aee Implement log compaction Key: KAFKA-631 URL: https://issues.apache.org/jira/browse/KAFKA-631 Project: Kafka Issue Type: New Feature Components: core Affects Versions: 0.8.1 Reporter: Jay Kreps Assignee: Jay Kreps Attachments: KAFKA-631-v1.patch Currently Kafka has only one way to bound the space of the log, namely by deleting old segments. The policy that controls which segments are deleted can be configured based either on the number of bytes to retain or the age of the messages. This makes sense for event or log data which has no notion of primary key. However lots of data has a primary key and consists of updates by primary key. For this data it would be nice to be able to ensure that the log contained at least the last version of every key. As an example, say that the Kafka topic contains a sequence of User Account messages, each capturing the current state of a given user account. Rather than simply discarding old segments, since the set of user accounts is finite, it might make more sense to delete individual records that have been made obsolete by a more recent update for the same key. This would ensure that the topic contained at least the current state of each record. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Closed] (KAFKA-702) Deadlock between request handler/processor threads
[ https://issues.apache.org/jira/browse/KAFKA-702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede closed KAFKA-702. --- Deadlock between request handler/processor threads -- Key: KAFKA-702 URL: https://issues.apache.org/jira/browse/KAFKA-702 Project: Kafka Issue Type: Bug Components: network Affects Versions: 0.8 Reporter: Joel Koshy Assignee: Jay Kreps Priority: Blocker Labels: bugs Fix For: 0.8 Attachments: KAFKA-702-v1.patch We have seen this a couple of times in the past few days in a test cluster. The request handler and processor threads deadlock on the request/response queues bringing the server to a halt kafka-processor-10251-7 prio=10 tid=0x7f4a0c3c9800 nid=0x4c39 waiting on condition [0x7f46f698e000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0x7f48c9dd2698 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:252) at kafka.network.RequestChannel.sendRequest(RequestChannel.scala:107) at kafka.network.Processor.read(SocketServer.scala:321) at kafka.network.Processor.run(SocketServer.scala:231) at java.lang.Thread.run(Thread.java:619) kafka-request-handler-7 daemon prio=10 tid=0x7f4a0c57f000 nid=0x4c47 waiting on condition [0x7f46f5b8] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0x7f48c9dd6348 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:252) at kafka.network.RequestChannel.sendResponse(RequestChannel.scala:112) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:198) at kafka.server.KafkaApis.handle(KafkaApis.scala:58) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41) at java.lang.Thread.run(Thread.java:619) This is because there is a cycle in the wait-for graph of processor threads and request handler threads. If the request handling slows down on a busy server, the request queue fills up. All processor threads quickly block on adding incoming requests to the request queue. Due to this, those threads do not processes responses filling up their response queues. At this moment, the request handler threads start blocking on adding responses to the respective response queues. This can lead to a deadlock where every thread is holding a lock on one queue and asking a lock for the other queue. This brings the server to a halt where it accepts connections but every request gets timed out. One way to resolve this is by breaking the cycle in the wait-for graph of the request handler and processor threads. Instead of having the processor threads dispatching the responses, we can have one or more dedicated response handler threads that dequeue responses from the queue and write those on the socket. One downside of this approach is that now access to the selector will have to be synchronized. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-691) Fault tolerance broken with replication factor 1
[ https://issues.apache.org/jira/browse/KAFKA-691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-691: -- Attachment: kafka-691_extra.patch The last patch introduced a bug. DefaultEventHander.getPartition() is expected to return the index of the partitionList, instead of the actual partition id. Attach a patch that fixes the issue. Fault tolerance broken with replication factor 1 Key: KAFKA-691 URL: https://issues.apache.org/jira/browse/KAFKA-691 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Jay Kreps Assignee: Maxime Brugidou Fix For: 0.8 Attachments: kafka-691_extra.patch, KAFKA-691-v1.patch, KAFKA-691-v2.patch In 0.7 if a partition was down we would just send the message elsewhere. This meant that the partitioning was really more of a stickiness then a hard guarantee. This made it impossible to depend on it for partitioned, stateful processing. In 0.8 when running with replication this should not be a problem generally as the partitions are now highly available and fail over to other replicas. However in the case of replication factor = 1 no longer really works for most cases as now a dead broker will give errors for that broker. I am not sure of the best fix. Intuitively I think this is something that should be handled by the Partitioner interface. However currently the partitioner has no knowledge of which nodes are available. So you could use a random partitioner, but that would keep going back to the down node. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-691) Fault tolerance broken with replication factor 1
[ https://issues.apache.org/jira/browse/KAFKA-691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-691: -- Attachment: kafka-691_extra.patch Attach the right patch (kafka-691_extra.patch). Fault tolerance broken with replication factor 1 Key: KAFKA-691 URL: https://issues.apache.org/jira/browse/KAFKA-691 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Jay Kreps Assignee: Maxime Brugidou Fix For: 0.8 Attachments: kafka-691_extra.patch, KAFKA-691-v1.patch, KAFKA-691-v2.patch In 0.7 if a partition was down we would just send the message elsewhere. This meant that the partitioning was really more of a stickiness then a hard guarantee. This made it impossible to depend on it for partitioned, stateful processing. In 0.8 when running with replication this should not be a problem generally as the partitions are now highly available and fail over to other replicas. However in the case of replication factor = 1 no longer really works for most cases as now a dead broker will give errors for that broker. I am not sure of the best fix. Intuitively I think this is something that should be handled by the Partitioner interface. However currently the partitioner has no knowledge of which nodes are available. So you could use a random partitioner, but that would keep going back to the down node. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Created] (KAFKA-708) ISR becomes empty while marking a partition offline
Swapnil Ghike created KAFKA-708: --- Summary: ISR becomes empty while marking a partition offline Key: KAFKA-708 URL: https://issues.apache.org/jira/browse/KAFKA-708 Project: Kafka Issue Type: Bug Affects Versions: 0.8, 0.8.1 Reporter: Swapnil Ghike Priority: Blocker Fix For: 0.8 Attached state change log shows that ISR becomes empty when a partition is being marked as offline. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Assigned] (KAFKA-708) ISR becomes empty while marking a partition offline
[ https://issues.apache.org/jira/browse/KAFKA-708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Swapnil Ghike reassigned KAFKA-708: --- Assignee: Neha Narkhede ISR becomes empty while marking a partition offline --- Key: KAFKA-708 URL: https://issues.apache.org/jira/browse/KAFKA-708 Project: Kafka Issue Type: Bug Affects Versions: 0.8, 0.8.1 Reporter: Swapnil Ghike Assignee: Neha Narkhede Priority: Blocker Labels: bugs Fix For: 0.8 Attachments: kafka-request.log.2013-01-16-15 Attached state change log shows that ISR becomes empty when a partition is being marked as offline. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Created] (KAFKA-709) Default queue.enqueue.timeout.ms to -1
Chris Riccomini created KAFKA-709: - Summary: Default queue.enqueue.timeout.ms to -1 Key: KAFKA-709 URL: https://issues.apache.org/jira/browse/KAFKA-709 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8 Reporter: Chris Riccomini Assignee: Jun Rao Hey Guys, It seems that, by default, producers in 0.8 are async, and have a default queue.enqueue.timeout.ms of 0. This means that anyone who reads messages faster than they're producing them will likely end up eventually hitting this exception: Exception in thread Thread-3 kafka.common.QueueFullException: Event queue is full of unsent messages, could not send event: KeyedMessage(PageViewEventByGroupJson,Missing Page Group,java.nio.HeapByteBuffer[pos=0 lim=125 cap=125]) at kafka.producer.Producer$$anonfun$asyncSend$1.apply(Producer.scala:111) at kafka.producer.Producer$$anonfun$asyncSend$1.apply(Producer.scala:89) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:32) at kafka.producer.Producer.asyncSend(Producer.scala:89) at kafka.producer.Producer.send(Producer.scala:77) As it says in https://cwiki.apache.org/KAFKA/kafka-mirroring.html, this can result in losing messages, and nasty exceptions in the logs. I think the better default is setting queue.enqueue.timeout.ms to -1, which will just block until the queue frees up. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-709) Default queue.enqueue.timeout.ms to -1
[ https://issues.apache.org/jira/browse/KAFKA-709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13555782#comment-13555782 ] Jay Kreps commented on KAFKA-709: - Yeah I agree. Everyone who tries to use it hits this and then basically just thinks kafka is broken because the error message isn't very clear. Default queue.enqueue.timeout.ms to -1 -- Key: KAFKA-709 URL: https://issues.apache.org/jira/browse/KAFKA-709 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8 Reporter: Chris Riccomini Assignee: Jun Rao Hey Guys, It seems that, by default, producers in 0.8 are async, and have a default queue.enqueue.timeout.ms of 0. This means that anyone who reads messages faster than they're producing them will likely end up eventually hitting this exception: Exception in thread Thread-3 kafka.common.QueueFullException: Event queue is full of unsent messages, could not send event: KeyedMessage(PageViewEventByGroupJson,Missing Page Group,java.nio.HeapByteBuffer[pos=0 lim=125 cap=125]) at kafka.producer.Producer$$anonfun$asyncSend$1.apply(Producer.scala:111) at kafka.producer.Producer$$anonfun$asyncSend$1.apply(Producer.scala:89) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:32) at kafka.producer.Producer.asyncSend(Producer.scala:89) at kafka.producer.Producer.send(Producer.scala:77) As it says in https://cwiki.apache.org/KAFKA/kafka-mirroring.html, this can result in losing messages, and nasty exceptions in the logs. I think the better default is setting queue.enqueue.timeout.ms to -1, which will just block until the queue frees up. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
dynamic topic config
I added a first pass at a proposal for moving topic configuration into zookeeper so that you would specify the config when you created the topic and config could be altered without bouncing brokers. I think other people know more about the zookeeper interaction so I probably need help working out some of the details for the best way to do that: https://cwiki.apache.org/confluence/display/KAFKA/Dynamic+Topic+Config -Jay
Abou Kafka 0.8 producer throughput test
Hi, I do producer(Kafka 0.8) throughput test many times. But the average value is 3MB/S. Below is my test environment: CPU core :16 Vendor_id :GenuineIntel Cpu family :6 Cpu MHz :2899.999 Cache size:20480 KB Cpu level :13 MEM :16330832KB=15.57GB Disk : RAID5 I don’t know the detail information about the disk, such as rotation. But I do some test about the I/O performance of the disk. The write rate is 500MB~600MB/S, the read rate is 180MB/S. The detail is as below. [cid:image002.png@01CDF4AE.52046900] And I adjust the broker configuration file as the official document says as below. And I adjust the JVM to 5120MB. I run producer performance test with the script kafka-producer-perf-test.sh, with the test command is bin/kafka-producer-perf-test.sh --broker-list 10.75.167.46:49092 --topics topic_perf_46_1,topic_perf_46_2,topic_perf_46_3,topic_perf_46_4, topic_perf_46_5,topic_perf_46_6, topic_perf_46_7,topic_perf_46_8,topic_perf_46_9,topic_perf_46_10 --initial-message-id 0 --threads 200 --messages 100 --message-size 200 --compression-codec 1 But the test result is also not as good as the official document says(50MB/S, and that value in your paper is 100MB/S). The test result is as below: 2013-01-17 04:15:24:768, 2013-01-17 04:25:01:637, 0, 200, 200, 1907.35, 3.3064, 1000, 17334.9582 On the other hand, I do consumer throughput test, the result is about 60MB/S while that value in official document is 100MB/S. I really don’t know why? You know high throughput is one of the most important features of Kafka. So I am really concerned with it. Thanks and best regards! From: Jay Kreps [mailto:jkr...@linkedin.com] Sent: 2013年1月16日 2:22 To: Jun Guo -X (jungu - CIIC at Cisco) Subject: RE: About acknowledge from broker to producer in your paper. Not sure which version you are using... In 0.7 this would happen only if there was a socket level error (i.e. can't connect to the host). This covers a lot of cases since in the event of I/O errors (disk full, etc) we just have that node shut itself down to let others take over. In 0.8 we send all errors back to the client. So the difference is that, for example, in the event of a disk error, in 0.7 the client would send a message, the broker would get an error and shoot itself in the head and disconnect its clients, and the client would get the error the next time it tried to send a message. So in 0.7 the error might not get passed back to the client until the second message send. In 0.8 this would happen with the first send, which is an improvement. -Jay From: Jun Guo -X (jungu - CIIC at Cisco) [ju...@cisco.com] Sent: Monday, January 14, 2013 9:45 PM To: Jay Kreps Subject: About acknowledge from broker to producer in your paper. Hi, I have read your paper Kafka: a Distributed Messaging System for Log Processing . In experimental results part. There are some words as below: There are a few reasons why Kafka performed much better. First, the Kafka producer currently doesn’t wait for acknowledgements from the broker and sends messages as faster as the broker can handle. This significantly increased the throughput of the publisher. With a batch size of 50, a single Kafka producer almost saturated the 1Gb link between the producer and the broker. This is a valid optimization for the log aggregation case, as data must be sent asynchronously to avoid introducing any latency into the live serving of traffic. We note that without acknowledging the producer, there is no guarantee that every published message is actually received by the broker. For many types of log data, it is desirable to trade durability for throughput, as long as the number of dropped messages is relatively small. However, we do plan to address the durability issue for more critical data in the future. But I have done a series of test. I found that ,if I shut down all the brokers, when I send a message from producer to broker, the producer will report kafka.common.FailedToSendMessageException . It says, Failed to send messages after 3 tries. [cid:image003.png@01CDF4AE.D547ED00] If there is no acknowledge from broker, how the producer find the sending is failed? And how it try 3 times? Maybe, the acknowledge in your paper refers to another thing, if so ,please tell what is the meaning of acknowledge? Many thanks and best regards! Guo Jun
[jira] [Assigned] (KAFKA-709) Default queue.enqueue.timeout.ms to -1
[ https://issues.apache.org/jira/browse/KAFKA-709?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede reassigned KAFKA-709: --- Assignee: Neha Narkhede (was: Jun Rao) Default queue.enqueue.timeout.ms to -1 -- Key: KAFKA-709 URL: https://issues.apache.org/jira/browse/KAFKA-709 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8 Reporter: Chris Riccomini Assignee: Neha Narkhede Attachments: kafka-709.patch Hey Guys, It seems that, by default, producers in 0.8 are async, and have a default queue.enqueue.timeout.ms of 0. This means that anyone who reads messages faster than they're producing them will likely end up eventually hitting this exception: Exception in thread Thread-3 kafka.common.QueueFullException: Event queue is full of unsent messages, could not send event: KeyedMessage(PageViewEventByGroupJson,Missing Page Group,java.nio.HeapByteBuffer[pos=0 lim=125 cap=125]) at kafka.producer.Producer$$anonfun$asyncSend$1.apply(Producer.scala:111) at kafka.producer.Producer$$anonfun$asyncSend$1.apply(Producer.scala:89) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:32) at kafka.producer.Producer.asyncSend(Producer.scala:89) at kafka.producer.Producer.send(Producer.scala:77) As it says in https://cwiki.apache.org/KAFKA/kafka-mirroring.html, this can result in losing messages, and nasty exceptions in the logs. I think the better default is setting queue.enqueue.timeout.ms to -1, which will just block until the queue frees up. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-707) Improve error message in the producer when sending data to a partition without an active leader
[ https://issues.apache.org/jira/browse/KAFKA-707?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-707: Attachment: kafka-707.patch Minor patch to fix the error message in the producer Improve error message in the producer when sending data to a partition without an active leader --- Key: KAFKA-707 URL: https://issues.apache.org/jira/browse/KAFKA-707 Project: Kafka Issue Type: Improvement Components: producer Affects Versions: 0.8 Reporter: Neha Narkhede Assignee: Neha Narkhede Attachments: kafka-707.patch Original Estimate: 24h Remaining Estimate: 24h We log a very cryptic message when the producer tries to send data to a partition that doesn't have a leader at that moment - Failed to send to broker -1 with data Map([PageViewEventByGroupJson,8] Let's improve this to log a better error message -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira