Re: How to produce and consume events in 2 DCs?
Hi Steven, That doesn't work. In your proposal mirrormaker in once DC would copy messages from topic A to the other DC in topic A. However, in the other DC there is a mirrormaker which does the same, creating a loop. Messages will be duplicated, triplicated, etc in a never ending loop. Mirroring to another topic would work (mirrormaker doesn't support that), and so would mirroring to another cluster. Neha's proposal would work also but I assume its a lot more work for the Kafka internals and therefor IMHO wouldn't meet the kiss principle. Kind regards, Erik. Steven Wu schreef op 22-10-14 om 01:48: I think it doesn't have to be two more clusters. can be just two more topics. MirrorMaker can copy from source topics in both regions into one aggregate topic. On Tue, Oct 21, 2014 at 1:54 AM, Erik van oosten e.vanoos...@grons.nl.invalid wrote: Thanks Neha, Unfortunately, the maintenance overhead of 2 more clusters is not acceptable to us. Would you accept a pull request on mirror maker that would rename topics on the fly? For example by accepting the parameter rename: —rename src1/dest1,src2/dest2 or, extended with RE support: —rename old_(.*)/new_\1 Kind regards, Erik. Op 20 okt. 2014, om 16:43 heeft Neha Narkhede neha.narkh...@gmail.com het volgende geschreven: Another way to set up this kind of mirroring is by deploying 2 clusters in each DC - a local Kafka cluster and an aggregate Kafka cluster. The mirror maker copies data from both the DC's local clusters into the aggregate clusters. So if you want access to a topic with data from both DC's, you subscribe to the aggregate cluster. Thanks, Neha On Mon, Oct 20, 2014 at 7:07 AM, Erik van oosten e.vanoos...@grons.nl.invalid wrote: Hi, We have 2 data centers that produce events. Each DC has to process events from both DCs. I had the following in mind: DC 1 | DC 2 events |events + + + | + + + | | | | | | | v v v | v v v ++ | ++ | Receiver topic | | | Receiver topic | ++ ++ | | mirroring || | | +--+| | | | | | ++ | v vv v ++ | ++ | Consumer topic | | | Consumer topic | ++ | ++ + + + | + + + | | | | | | | v v v | v v v consumers | consumers As each DC has a single Kafka cluster, on each DC the receiver topic and consumer topic needs to be on the same cluster. Unfortunately, mirror maker does not seem to support mirroring to a topic with another name. Is there another tool we could use? Or, is there another approach for producing and consuming from 2 DCs? Kind regards, Erik. — Erik van Oosten http://www.day-to-day-stuff.blogspot.nl/ -- Erik van Oosten http://www.day-to-day-stuff.blogspot.com/
Re: [Kafka-users] Producer not distributing across all partitions
Hi, First of all, thank you for replaying. And I am using 0.8.1.1. I am expecting the new producer will solve this kind of problem. Thanks, Mungeol On Wed, Oct 22, 2014 at 9:51 AM, Jun Rao jun...@gmail.com wrote: Yes, what you did is correct. See details in https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whyisdatanotevenlydistributedamongpartitionswhenapartitioningkeyisnotspecified ? It seems that it doesn't work all the time. What version of Kafka are you using? Thanks, Jun On Mon, Oct 20, 2014 at 9:00 PM, Mungeol Heo mungeol@gmail.com wrote: Hi, I have a question about 'topic.metadata.refresh.interval.ms' configuration. As I know, the default value of it is 10 minutes. Does it means that producer will change the partition at every 10 minutes? What I am experiencing is producer does not change to another partition at every 10 minutes. Sometime, It never changed during the process which costs about 25 minutes. I also changed the value of it to 1 minute for testing. It looks like working well at first time. However, same problem happens start from second test. Sometime, it takes more than 10 minutes to change the partition even if I set the value as 1 minute. Am i missing something? Any help will be great. Thanks.
Re: 0.8.1.2
Does 0.8.2 includes new producer which mentioned at the documentation of kafka? If not, which version will include it? Thanks, Mungeol On Wed, Oct 22, 2014 at 11:21 AM, Neha Narkhede neha.narkh...@gmail.com wrote: Shlomi, As Jun mentioned, we are voting on a 0.8.2 beta release now. Are you suggesting there be an 0.8.1.2 release in addition to that? We can take a quick vote from the community to see how many people prefer to have this and why. Thanks, Neha On Tue, Oct 21, 2014 at 6:03 PM, Jun Rao jun...@gmail.com wrote: We are voting an 0.8.2 beta release right now. Thanks, Jun On Tue, Oct 21, 2014 at 11:17 AM, Shlomi Hazan shl...@viber.com wrote: Hi All, Will version 0.8.1.2 happen? Shlomi
Explicit topic creation and topic metadata availability
Hello Apache Kafka users, Using Kafka 0.8.1.1 (single instance with single ZK 3.4.6 running locally), with auto topic creation disabled, in a test I have topic created with AdminUtils.createTopic (AdminUtils.topicExists returns true) but KafkaProducer on send request keeps throwing UnknownTopicOrPartitionException even after 100 retries, both when topic.metadata.refresh.interval.ms and retry.backoff.ms are left at defaults, and when customized. Am I doing something wrong or is this a known bug? How long does it typically take for metadata to be refreshed? How long does it take for leader to be elected? Documentation for retry.backoff.ms states: Before each retry, the producer refreshes the metadata of relevant topics to see if a new leader has been elected. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata. Do I understand this docs correctly - on failure to send a message, such as unknown topic, if retries are configured producer will wait for configured retry.backoff.ms, then it will initiate and wait for metadata refresh to complete, and only then retry sending? Kind regards, Stevo Slavic.
Re: Explicit topic creation and topic metadata availability
Hello Stevo, Your understanding about the configs are correct, and it is indeed wired that the producer gets the exception after topic is created. Could you use the kafka-topics command to check if the leaders exist? kafka-topics.sh --zookeeper XXX --topic [topic-name] describe Guozhang On Wed, Oct 22, 2014 at 5:57 AM, Stevo Slavić ssla...@gmail.com wrote: Hello Apache Kafka users, Using Kafka 0.8.1.1 (single instance with single ZK 3.4.6 running locally), with auto topic creation disabled, in a test I have topic created with AdminUtils.createTopic (AdminUtils.topicExists returns true) but KafkaProducer on send request keeps throwing UnknownTopicOrPartitionException even after 100 retries, both when topic.metadata.refresh.interval.ms and retry.backoff.ms are left at defaults, and when customized. Am I doing something wrong or is this a known bug? How long does it typically take for metadata to be refreshed? How long does it take for leader to be elected? Documentation for retry.backoff.ms states: Before each retry, the producer refreshes the metadata of relevant topics to see if a new leader has been elected. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata. Do I understand this docs correctly - on failure to send a message, such as unknown topic, if retries are configured producer will wait for configured retry.backoff.ms, then it will initiate and wait for metadata refresh to complete, and only then retry sending? Kind regards, Stevo Slavic. -- -- Guozhang
Re: 0.8.1.2
Yes, 0.8.2 includes the new producer. 0.8.2 will have a lot of new features which will take time to stabilize. If people want 0.8.1.2 for some critical bug fixes, we can discuss the feasibility of doing the release. On Wed, Oct 22, 2014 at 1:39 AM, Shlomi Hazan shl...@viber.com wrote: at the time I thought it was a good idea but if I understand correctly what Jun is saying is that 0.8.1.2 will not happen. I assume Jun sees 0.8.2 coming soon enough to remove any added value from 0.8.1.2. Shlomi On Wed, Oct 22, 2014 at 5:21 AM, Neha Narkhede neha.narkh...@gmail.com wrote: Shlomi, As Jun mentioned, we are voting on a 0.8.2 beta release now. Are you suggesting there be an 0.8.1.2 release in addition to that? We can take a quick vote from the community to see how many people prefer to have this and why. Thanks, Neha On Tue, Oct 21, 2014 at 6:03 PM, Jun Rao jun...@gmail.com wrote: We are voting an 0.8.2 beta release right now. Thanks, Jun On Tue, Oct 21, 2014 at 11:17 AM, Shlomi Hazan shl...@viber.com wrote: Hi All, Will version 0.8.1.2 happen? Shlomi
Re: frequent periods of ~1500 replicas not in sync
Neil, We fixed a bug related to the BadVersion problem in 0.8.1.1. Would you mind repeating your test on 0.8.1.1 and if you can still reproduce this issue, then send around the thread dump and attach the logs to KAFKA-1407? Thanks, Neha On Tue, Oct 21, 2014 at 11:56 AM, Neil Harkins nhark...@gmail.com wrote: Hi. I've got a 5 node cluster running Kafka 0.8.1, with 4697 partitions (2 replicas each) across 564 topics. I'm sending it about 1% of our total messaging load now, and several times a day there is a period where 1~1500 partitions have one replica not in sync. Is this normal? If a consumer is reading from a replica that gets deemed not in sync, does it get redirected to the good replica? Is there a #partitions over which maintenance tasks become infeasible? Relevant config bits: auto.leader.rebalance.enable=true leader.imbalance.per.broker.percentage=20 leader.imbalance.check.interval.seconds=30 replica.lag.time.max.ms=1 replica.lag.max.messages=4000 num.replica.fetchers=4 replica.fetch.max.bytes=10485760 Not necessarily correlated to those periods, I see a lot of these errors in the logs: [2014-10-20 21:23:26,999] 21963614 [ReplicaFetcherThread-3-1] ERROR kafka.server.ReplicaFetcherThread - [ReplicaFetcherThread-3-1], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 77423; ClientId: ReplicaFetcherThread-3-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: ... And a few of these: [2014-10-20 21:23:39,555] 3467527 [kafka-scheduler-2] ERROR kafka.utils.ZkUtils$ - Conditional update of path /brokers/topics/foo.bar/partitions/3/state with data {controller_epoch:11,leader:3,version:1,leader_epoch:109,isr:[3]} and expected version 197 failed due to org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /brokers/topics/foo.bar/partitions/3/state And this one I assume is a client closing the connection non-gracefully, thus should probably be a warning, not an error?: [2014-10-20 21:54:15,599] 23812214 [kafka-processor-9092-3] ERROR kafka.network.Processor - Closing socket for /10.31.0.224 because of error -neil
Re: How many partition can one single machine handle in Kafka?
The number of brokers doesn't really matter here, as far as I can tell, because the question is about what a single broker can handle. The number of partitions in the cluster is governed by the ability of the controller to manage the list of partitions for the cluster, and the ability of each broker to keep that list (to serve metadata requests). The number of partitions on a single broker is governed by that broker's ability to handle the messages and files on disk. That's a much more limiting factor than what the controller can do. -Todd On Tue, Oct 21, 2014 at 2:52 PM, Neil Harkins nhark...@gmail.com wrote: On Tue, Oct 21, 2014 at 2:10 PM, Todd Palino tpal...@gmail.com wrote: As far as the number of partitions a single broker can handle, we've set our cap at 4000 partitions (including replicas). Above that we've seen some performance and stability issues. How many brokers? I'm curious: what kinds of problems would affect a single broker with a large number of partitions, but not affect the entire cluster with even more partitions?
Errors after reboot on single node setup
Hi, First of all, I am new to Kafka and more of a user than a developer. I will try to clarify things as much as possible though. We are using Kafka as a message system for our apps and works nicely in our SaaS cluster. I am trying to make the apps also work on a single node for demo purposes. I set up Zookeeper, Kafka and our apps on a node and things were ok until rebooting the node. After that I see the following messages in Kafka log: [2014-10-22 16:37:22,206] INFO [Controller 0]: Controller starting up (kafka.controller.KafkaController) [2014-10-22 16:37:22,419] INFO [Controller 0]: Controller startup complete (kafka.controller.KafkaController) [2014-10-22 16:37:22,554] INFO conflict in /brokers/ids/0 data: {jmx_port:-1,timestamp:1413995842465,host:ip-10-91-142-54.eu-west-1.compute.internal,version:1,port:9092} stored data: {jmx_port:-1,timestamp:1413994171579,host:ip-10-91-142-54.eu-west-1.compute.internal,version:1,port:9092} (kafka.utils.ZkUtils$) [2014-10-22 16:37:22,736] INFO I wrote this conflicted ephemeral node [{jmx_port:-1,timestamp:1413995842465,host:ip-10-91-142-54.eu-west-1.compute.internal,version:1,port:9092}] at /brokers/ids/0 a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry (kafka.utils.ZkUtils$) [2014-10-22 16:37:25,010] ERROR Error handling event ZkEvent[Data of /controller changed sent to kafka.server.ZookeeperLeaderElector$LeaderChangeListener@a6af882] (org.I0Itec.zkclient.ZkEventThread) java.lang.IllegalStateException: Kafka scheduler has not been started at kafka.utils.KafkaScheduler.ensureStarted(KafkaScheduler.scala:114) at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:86) at kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:350) at kafka.controller.KafkaController$$anonfun$2.apply$mcV$sp(KafkaController.scala:162) at kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:138) at kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:134) at kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:134) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:134) at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:549) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) [2014-10-22 16:37:28,757] INFO Registered broker 0 at path /brokers/ids/0 with address ip-10-91-142-54.eu-west-1.compute.internal:9092. (kafka.utils.ZkUtils$) [2014-10-22 16:37:28,849] INFO [Kafka Server 0], started (kafka.server.KafkaServer) [2014-10-22 16:38:56,718] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor) [2014-10-22 16:38:56,850] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor) [2014-10-22 16:38:56,985] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor) The last log line repeats forever and is correlated with errors on the app side. Restarting Kafka fixes the errors. I am using Kafka 0.8.2 from github to avoid https://issues.apache.org/jira/browse/KAFKA-1451. Does anyone have any idea why this happens and how it can be fixed? Thanks, Ciprian -- Performance Monitoring * Log Analytics * Search Analytics Solr Elasticsearch Support * http://sematext.com/
Re: How many partition can one single machine handle in Kafka?
In fact there are many more than 4000 open files. Many of our brokers run with 28,000+ open files (regular file handles, not network connections). In our case, we're beefing up the disk performance as much as we can by running in a RAID-10 configuration with 14 disks. -Todd On Tue, Oct 21, 2014 at 7:58 PM, Xiaobin She xiaobin...@gmail.com wrote: Todd, Actually I'm wondering how kafka handle so much partition, with one partition there is at least one file on disk, and with 4000 partition, there will be at least 4000 files. When all these partitions have write request, how did Kafka make the write operation on the disk to be sequential (which is emphasized in the design document of Kafka) and make sure the disk access is effective? Thank you for your reply. xiaobinshe 2014-10-22 5:10 GMT+08:00 Todd Palino tpal...@gmail.com: As far as the number of partitions a single broker can handle, we've set our cap at 4000 partitions (including replicas). Above that we've seen some performance and stability issues. -Todd On Tue, Oct 21, 2014 at 12:15 AM, Xiaobin She xiaobin...@gmail.com wrote: hello, everyone I'm new to kafka, I'm wondering what's the max num of partition can one siggle machine handle in Kafka? Is there an sugeest num? Thanks. xiaobinshe
Re: Errors after reboot on single node setup
Can you provide steps to reproduce this? I'm not sure I understand how you run into this. It does look like a bug. On Wed, Oct 22, 2014 at 9:55 AM, Ciprian Hacman ciprian.hac...@sematext.com wrote: Hi, First of all, I am new to Kafka and more of a user than a developer. I will try to clarify things as much as possible though. We are using Kafka as a message system for our apps and works nicely in our SaaS cluster. I am trying to make the apps also work on a single node for demo purposes. I set up Zookeeper, Kafka and our apps on a node and things were ok until rebooting the node. After that I see the following messages in Kafka log: [2014-10-22 16:37:22,206] INFO [Controller 0]: Controller starting up (kafka.controller.KafkaController) [2014-10-22 16:37:22,419] INFO [Controller 0]: Controller startup complete (kafka.controller.KafkaController) [2014-10-22 16:37:22,554] INFO conflict in /brokers/ids/0 data: {jmx_port:-1,timestamp:1413995842465,host:ip-10-91-142-54.eu-west-1.compute.internal,version:1,port:9092} stored data: {jmx_port:-1,timestamp:1413994171579,host:ip-10-91-142-54.eu-west-1.compute.internal,version:1,port:9092} (kafka.utils.ZkUtils$) [2014-10-22 16:37:22,736] INFO I wrote this conflicted ephemeral node [{jmx_port:-1,timestamp:1413995842465,host:ip-10-91-142-54.eu-west-1.compute.internal,version:1,port:9092}] at /brokers/ids/0 a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry (kafka.utils.ZkUtils$) [2014-10-22 16:37:25,010] ERROR Error handling event ZkEvent[Data of /controller changed sent to kafka.server.ZookeeperLeaderElector$LeaderChangeListener@a6af882] (org.I0Itec.zkclient.ZkEventThread) java.lang.IllegalStateException: Kafka scheduler has not been started at kafka.utils.KafkaScheduler.ensureStarted(KafkaScheduler.scala:114) at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:86) at kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:350) at kafka.controller.KafkaController$$anonfun$2.apply$mcV$sp(KafkaController.scala:162) at kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:138) at kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:134) at kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:134) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:134) at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:549) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) [2014-10-22 16:37:28,757] INFO Registered broker 0 at path /brokers/ids/0 with address ip-10-91-142-54.eu-west-1.compute.internal:9092. (kafka.utils.ZkUtils$) [2014-10-22 16:37:28,849] INFO [Kafka Server 0], started (kafka.server.KafkaServer) [2014-10-22 16:38:56,718] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor) [2014-10-22 16:38:56,850] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor) [2014-10-22 16:38:56,985] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor) The last log line repeats forever and is correlated with errors on the app side. Restarting Kafka fixes the errors. I am using Kafka 0.8.2 from github to avoid https://issues.apache.org/jira/browse/KAFKA-1451. Does anyone have any idea why this happens and how it can be fixed? Thanks, Ciprian -- Performance Monitoring * Log Analytics * Search Analytics Solr Elasticsearch Support * http://sematext.com/
Re: Errors after reboot on single node setup
This can reproduced with trunk. start zookeeper start kafka-broker create topic or start a producer writing to a topic stop zookeeper stop kafka-broker( kafka broker shutdown goes into WARN Session 0x14938d9dc010001 for server null, unexpected error, closing socket connection and attempting reconn ect (org.apache.zookeeper.ClientCnxn) java.net.ConnectException: Connection refused) kill -9 kafka-broker restart zookeeper and than kafka-broker leads into the the error posted by Ciprian. Ciprian, Can you open a jira for this. Thanks, Harsha On Wed, Oct 22, 2014, at 10:03 AM, Neha Narkhede wrote: Can you provide steps to reproduce this? I'm not sure I understand how you run into this. It does look like a bug. On Wed, Oct 22, 2014 at 9:55 AM, Ciprian Hacman ciprian.hac...@sematext.com wrote: Hi, First of all, I am new to Kafka and more of a user than a developer. I will try to clarify things as much as possible though. We are using Kafka as a message system for our apps and works nicely in our SaaS cluster. I am trying to make the apps also work on a single node for demo purposes. I set up Zookeeper, Kafka and our apps on a node and things were ok until rebooting the node. After that I see the following messages in Kafka log: [2014-10-22 16:37:22,206] INFO [Controller 0]: Controller starting up (kafka.controller.KafkaController) [2014-10-22 16:37:22,419] INFO [Controller 0]: Controller startup complete (kafka.controller.KafkaController) [2014-10-22 16:37:22,554] INFO conflict in /brokers/ids/0 data: {jmx_port:-1,timestamp:1413995842465,host:ip-10-91-142-54.eu-west-1.compute.internal,version:1,port:9092} stored data: {jmx_port:-1,timestamp:1413994171579,host:ip-10-91-142-54.eu-west-1.compute.internal,version:1,port:9092} (kafka.utils.ZkUtils$) [2014-10-22 16:37:22,736] INFO I wrote this conflicted ephemeral node [{jmx_port:-1,timestamp:1413995842465,host:ip-10-91-142-54.eu-west-1.compute.internal,version:1,port:9092}] at /brokers/ids/0 a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry (kafka.utils.ZkUtils$) [2014-10-22 16:37:25,010] ERROR Error handling event ZkEvent[Data of /controller changed sent to kafka.server.ZookeeperLeaderElector$LeaderChangeListener@a6af882] (org.I0Itec.zkclient.ZkEventThread) java.lang.IllegalStateException: Kafka scheduler has not been started at kafka.utils.KafkaScheduler.ensureStarted(KafkaScheduler.scala:114) at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:86) at kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:350) at kafka.controller.KafkaController$$anonfun$2.apply$mcV$sp(KafkaController.scala:162) at kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:138) at kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:134) at kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:134) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:134) at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:549) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) [2014-10-22 16:37:28,757] INFO Registered broker 0 at path /brokers/ids/0 with address ip-10-91-142-54.eu-west-1.compute.internal:9092. (kafka.utils.ZkUtils$) [2014-10-22 16:37:28,849] INFO [Kafka Server 0], started (kafka.server.KafkaServer) [2014-10-22 16:38:56,718] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor) [2014-10-22 16:38:56,850] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor) [2014-10-22 16:38:56,985] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor) The last log line repeats forever and is correlated with errors on the app side. Restarting Kafka fixes the errors. I am using Kafka 0.8.2 from github to avoid https://issues.apache.org/jira/browse/KAFKA-1451. Does anyone have any idea why this happens and how it can be fixed? Thanks, Ciprian -- Performance Monitoring * Log Analytics * Search Analytics Solr Elasticsearch Support * http://sematext.com/
Erratic behavior when quickly re-balancing
For the purpose of a unit/integration test for Spring XD, I am creating several consumers (in the same group) in quick succession. With just 3 consumers, this triggers 2 rebalances that (on some machines) can't be dealt with in the default 4*2000ms and fails. I have created a simple use case that reproduces this out of the context of Spring XD. It can be found in the main() method of [1]. If it does not fail on your machine, I believe bumping the number of creations to 4 or 5 may do it.. So, I have a couple of questions: - on my machine, when using various combinations of rebalance.backoff.ms and rebalance.max.retries, this failure always seems to happen after 30s, whatever the combination. On some other (more powerful) machines, it seems to never fail. Is this actually cpu bound? 30s sounds a lot like twice the default tick, so is this at all related to an ephemeral node timing out? - given that this is for the purposes of an integration test, is there any other parameter that I could tweak to have the system settle down faster? - Is this something that is likely to change with the 0.9 rewrite (I saw that the current de-centralized rebalancing mechanics are the cause of other issues)? As a workaround, I tried waiting for nodes to re-appear in ZooKeeper (code at [2]), but this is still very slow (not to mention that this would be very intrusive to the tests I'm trying to write) Lastly, I should mention that I do need to create 3 consumers sequentially (as opposed to say, use a 3 threads consumer). The test in question simply happens to mimic the creation of consumers that may well be on 3 separate machines Best, [1] https://gist.github.com/ericbottard/91aa9ee114c6091e5b7b [2] https://gist.github.com/ericbottard/de87f1edc8eee2b9bee5 -- Eric Bottard
Re: How to produce and consume events in 2 DCs?
Erik, I don't know that mirrormaker can't write to a different topic. but it might be an useful feature request to mirrormaker. On Wed, Oct 22, 2014 at 12:21 AM, Erik van Oosten e.vanoos...@grons.nl.invalid wrote: Hi Steven, That doesn't work. In your proposal mirrormaker in once DC would copy messages from topic A to the other DC in topic A. However, in the other DC there is a mirrormaker which does the same, creating a loop. Messages will be duplicated, triplicated, etc in a never ending loop. Mirroring to another topic would work (mirrormaker doesn't support that), and so would mirroring to another cluster. Neha's proposal would work also but I assume its a lot more work for the Kafka internals and therefor IMHO wouldn't meet the kiss principle. Kind regards, Erik. Steven Wu schreef op 22-10-14 om 01:48: I think it doesn't have to be two more clusters. can be just two more topics. MirrorMaker can copy from source topics in both regions into one aggregate topic. On Tue, Oct 21, 2014 at 1:54 AM, Erik van oosten e.vanoos...@grons.nl.invalid wrote: Thanks Neha, Unfortunately, the maintenance overhead of 2 more clusters is not acceptable to us. Would you accept a pull request on mirror maker that would rename topics on the fly? For example by accepting the parameter rename: —rename src1/dest1,src2/dest2 or, extended with RE support: —rename old_(.*)/new_\1 Kind regards, Erik. Op 20 okt. 2014, om 16:43 heeft Neha Narkhede neha.narkh...@gmail.com het volgende geschreven: Another way to set up this kind of mirroring is by deploying 2 clusters in each DC - a local Kafka cluster and an aggregate Kafka cluster. The mirror maker copies data from both the DC's local clusters into the aggregate clusters. So if you want access to a topic with data from both DC's, you subscribe to the aggregate cluster. Thanks, Neha On Mon, Oct 20, 2014 at 7:07 AM, Erik van oosten e.vanoos...@grons.nl.invalid wrote: Hi, We have 2 data centers that produce events. Each DC has to process events from both DCs. I had the following in mind: DC 1 | DC 2 events |events + + + | + + + | | | | | | | v v v | v v v ++ | ++ | Receiver topic | | | Receiver topic | ++ ++ | | mirroring || | | +--+| | | | | | ++ | v vv v ++ | ++ | Consumer topic | | | Consumer topic | ++ | ++ + + + | + + + | | | | | | | v v v | v v v consumers | consumers As each DC has a single Kafka cluster, on each DC the receiver topic and consumer topic needs to be on the same cluster. Unfortunately, mirror maker does not seem to support mirroring to a topic with another name. Is there another tool we could use? Or, is there another approach for producing and consuming from 2 DCs? Kind regards, Erik. — Erik van Oosten http://www.day-to-day-stuff.blogspot.nl/ -- Erik van Oosten http://www.day-to-day-stuff.blogspot.com/
Re: How many partition can one single machine handle in Kafka?
RAID-10? Interesting choice for a system where the data is already replicated between nodes. Is it to avoid the cost of large replication over the network? how large are these disks? On Wed, Oct 22, 2014 at 10:00 AM, Todd Palino tpal...@gmail.com wrote: In fact there are many more than 4000 open files. Many of our brokers run with 28,000+ open files (regular file handles, not network connections). In our case, we're beefing up the disk performance as much as we can by running in a RAID-10 configuration with 14 disks. -Todd On Tue, Oct 21, 2014 at 7:58 PM, Xiaobin She xiaobin...@gmail.com wrote: Todd, Actually I'm wondering how kafka handle so much partition, with one partition there is at least one file on disk, and with 4000 partition, there will be at least 4000 files. When all these partitions have write request, how did Kafka make the write operation on the disk to be sequential (which is emphasized in the design document of Kafka) and make sure the disk access is effective? Thank you for your reply. xiaobinshe 2014-10-22 5:10 GMT+08:00 Todd Palino tpal...@gmail.com: As far as the number of partitions a single broker can handle, we've set our cap at 4000 partitions (including replicas). Above that we've seen some performance and stability issues. -Todd On Tue, Oct 21, 2014 at 12:15 AM, Xiaobin She xiaobin...@gmail.com wrote: hello, everyone I'm new to kafka, I'm wondering what's the max num of partition can one siggle machine handle in Kafka? Is there an sugeest num? Thanks. xiaobinshe
Re: Errors after reboot on single node setup
Thank you for the *very* quick replies Neha, Harsha. I opened a Jira for this issue: https://issues.apache.org/jira/browse/KAFKA-1724 Ciprian -- Performance Monitoring * Log Analytics * Search Analytics Solr Elasticsearch Support * http://sematext.com/ On Wed, Oct 22, 2014 at 8:27 PM, Harsha ka...@harsha.io wrote: This can reproduced with trunk. start zookeeper start kafka-broker create topic or start a producer writing to a topic stop zookeeper stop kafka-broker( kafka broker shutdown goes into WARN Session 0x14938d9dc010001 for server null, unexpected error, closing socket connection and attempting reconn ect (org.apache.zookeeper.ClientCnxn) java.net.ConnectException: Connection refused) kill -9 kafka-broker restart zookeeper and than kafka-broker leads into the the error posted by Ciprian. Ciprian, Can you open a jira for this. Thanks, Harsha On Wed, Oct 22, 2014, at 10:03 AM, Neha Narkhede wrote: Can you provide steps to reproduce this? I'm not sure I understand how you run into this. It does look like a bug. On Wed, Oct 22, 2014 at 9:55 AM, Ciprian Hacman ciprian.hac...@sematext.com wrote: Hi, First of all, I am new to Kafka and more of a user than a developer. I will try to clarify things as much as possible though. We are using Kafka as a message system for our apps and works nicely in our SaaS cluster. I am trying to make the apps also work on a single node for demo purposes. I set up Zookeeper, Kafka and our apps on a node and things were ok until rebooting the node. After that I see the following messages in Kafka log: [2014-10-22 16:37:22,206] INFO [Controller 0]: Controller starting up (kafka.controller.KafkaController) [2014-10-22 16:37:22,419] INFO [Controller 0]: Controller startup complete (kafka.controller.KafkaController) [2014-10-22 16:37:22,554] INFO conflict in /brokers/ids/0 data: {jmx_port:-1,timestamp:1413995842465,host:ip-10-91-142-54.eu-west-1.compute.internal,version:1,port:9092} stored data: {jmx_port:-1,timestamp:1413994171579,host:ip-10-91-142-54.eu-west-1.compute.internal,version:1,port:9092} (kafka.utils.ZkUtils$) [2014-10-22 16:37:22,736] INFO I wrote this conflicted ephemeral node [{jmx_port:-1,timestamp:1413995842465,host:ip-10-91-142-54.eu-west-1.compute.internal,version:1,port:9092}] at /brokers/ids/0 a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry (kafka.utils.ZkUtils$) [2014-10-22 16:37:25,010] ERROR Error handling event ZkEvent[Data of /controller changed sent to kafka.server.ZookeeperLeaderElector$LeaderChangeListener@a6af882] (org.I0Itec.zkclient.ZkEventThread) java.lang.IllegalStateException: Kafka scheduler has not been started at kafka.utils.KafkaScheduler.ensureStarted(KafkaScheduler.scala:114) at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:86) at kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:350) at kafka.controller.KafkaController$$anonfun$2.apply$mcV$sp(KafkaController.scala:162) at kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:138) at kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:134) at kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:134) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:134) at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:549) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) [2014-10-22 16:37:28,757] INFO Registered broker 0 at path /brokers/ids/0 with address ip-10-91-142-54.eu-west-1.compute.internal:9092. (kafka.utils.ZkUtils$) [2014-10-22 16:37:28,849] INFO [Kafka Server 0], started (kafka.server.KafkaServer) [2014-10-22 16:38:56,718] INFO Closing socket connection to /127.0.0.1 . (kafka.network.Processor) [2014-10-22 16:38:56,850] INFO Closing socket connection to /127.0.0.1 . (kafka.network.Processor) [2014-10-22 16:38:56,985] INFO Closing socket connection to /127.0.0.1 . (kafka.network.Processor) The last log line repeats forever and is correlated with errors on the app side. Restarting Kafka fixes the errors. I am using Kafka 0.8.2 from github to avoid https://issues.apache.org/jira/browse/KAFKA-1451. Does anyone have any idea why this happens and how it can be fixed? Thanks, Ciprian -- Performance Monitoring * Log Analytics * Search Analytics Solr Elasticsearch Support *
Re: Sizing Cluster
On 10/21/14 21:13, István wrote: Hi Pete, Yes you are right, both nodes has all of the data. I was just wondering what is the scenario for losing one node, in production it might not fly. If this is for testing only, you are good. Answering your question, I think retention policy (log.retention.hours) is for controlling the disk utilization. I think disk IO (log.flush.* section) and network IO (num.network.threads, etc.) saturation you might want to measure during tests and spec it based on that. Here is a link with examples for the full list of relevant settings, with more description: https://kafka.apache.org/08/ops.html. I guess the most important question is, how many clients do you want to support. You could work out how much space you need based on that, assuming few things. For more complete documentation refer to: https://kafka.apache.org/08/configuration.html Thanks Istvan - this is helpful. Cheers, -pete -- Pete Wright Systems Architect Rubicon Project pwri...@rubiconproject.com 310.309.9298
Re: How many partition can one single machine handle in Kafka?
There are various costs when a broker fails, including broker leader election for each partition, etc., as well as exposing possible issues for in-flight messages, and client rebalancing etc. So even though replication provides partition redundancy, RAID 10 on each broker is usually a good tradeoff to prevent the typical most common cause of broker server failure (e.g. disk failure) as well, and overall smoother operation. Best Regards, -Jonathan On Oct 22, 2014, at 11:01 AM, Gwen Shapira gshap...@cloudera.com wrote: RAID-10? Interesting choice for a system where the data is already replicated between nodes. Is it to avoid the cost of large replication over the network? how large are these disks? On Wed, Oct 22, 2014 at 10:00 AM, Todd Palino tpal...@gmail.com wrote: In fact there are many more than 4000 open files. Many of our brokers run with 28,000+ open files (regular file handles, not network connections). In our case, we're beefing up the disk performance as much as we can by running in a RAID-10 configuration with 14 disks. -Todd On Tue, Oct 21, 2014 at 7:58 PM, Xiaobin She xiaobin...@gmail.com wrote: Todd, Actually I'm wondering how kafka handle so much partition, with one partition there is at least one file on disk, and with 4000 partition, there will be at least 4000 files. When all these partitions have write request, how did Kafka make the write operation on the disk to be sequential (which is emphasized in the design document of Kafka) and make sure the disk access is effective? Thank you for your reply. xiaobinshe 2014-10-22 5:10 GMT+08:00 Todd Palino tpal...@gmail.com: As far as the number of partitions a single broker can handle, we've set our cap at 4000 partitions (including replicas). Above that we've seen some performance and stability issues. -Todd On Tue, Oct 21, 2014 at 12:15 AM, Xiaobin She xiaobin...@gmail.com wrote: hello, everyone I'm new to kafka, I'm wondering what's the max num of partition can one siggle machine handle in Kafka? Is there an sugeest num? Thanks. xiaobinshe
Re: How many partition can one single machine handle in Kafka?
Makes sense. Thanks :) On Wed, Oct 22, 2014 at 11:10 AM, Jonathan Weeks jonathanbwe...@gmail.com wrote: There are various costs when a broker fails, including broker leader election for each partition, etc., as well as exposing possible issues for in-flight messages, and client rebalancing etc. So even though replication provides partition redundancy, RAID 10 on each broker is usually a good tradeoff to prevent the typical most common cause of broker server failure (e.g. disk failure) as well, and overall smoother operation. Best Regards, -Jonathan On Oct 22, 2014, at 11:01 AM, Gwen Shapira gshap...@cloudera.com wrote: RAID-10? Interesting choice for a system where the data is already replicated between nodes. Is it to avoid the cost of large replication over the network? how large are these disks? On Wed, Oct 22, 2014 at 10:00 AM, Todd Palino tpal...@gmail.com wrote: In fact there are many more than 4000 open files. Many of our brokers run with 28,000+ open files (regular file handles, not network connections). In our case, we're beefing up the disk performance as much as we can by running in a RAID-10 configuration with 14 disks. -Todd On Tue, Oct 21, 2014 at 7:58 PM, Xiaobin She xiaobin...@gmail.com wrote: Todd, Actually I'm wondering how kafka handle so much partition, with one partition there is at least one file on disk, and with 4000 partition, there will be at least 4000 files. When all these partitions have write request, how did Kafka make the write operation on the disk to be sequential (which is emphasized in the design document of Kafka) and make sure the disk access is effective? Thank you for your reply. xiaobinshe 2014-10-22 5:10 GMT+08:00 Todd Palino tpal...@gmail.com: As far as the number of partitions a single broker can handle, we've set our cap at 4000 partitions (including replicas). Above that we've seen some performance and stability issues. -Todd On Tue, Oct 21, 2014 at 12:15 AM, Xiaobin She xiaobin...@gmail.com wrote: hello, everyone I'm new to kafka, I'm wondering what's the max num of partition can one siggle machine handle in Kafka? Is there an sugeest num? Thanks. xiaobinshe
Re: Performance issues
I can't find this property in server.properties file. Is that the right place to set this parameter? On Tue, Oct 21, 2014 at 6:27 PM, Jun Rao jun...@gmail.com wrote: Could you also set replica.fetch.wait.max.ms in the broker to sth much smaller? Thanks, Jun On Tue, Oct 21, 2014 at 2:15 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I set the property to 1 in the consumer code that is passed to createJavaConsumerConnector code, but it didn't seem to help props.put(fetch.wait.max.ms, fetchMaxWait); On Tue, Oct 21, 2014 at 1:21 PM, Guozhang Wang wangg...@gmail.com wrote: This is a consumer config: fetch.wait.max.ms On Tue, Oct 21, 2014 at 11:39 AM, Mohit Anchlia mohitanch...@gmail.com wrote: Is this a parameter I need to set it in kafka server or on the client side? Also, can you help point out which one exactly is consumer max wait time from this list? https://kafka.apache.org/08/configuration.html On Tue, Oct 21, 2014 at 11:35 AM, Jay Kreps jay.kr...@gmail.com wrote: There was a bug that could lead to the fetch request from the consumer hitting it's timeout instead of being immediately triggered by the produce request. To see if you are effected by that set you consumer max wait time to 1 ms and see if the latency drops to 1 ms (or, alternately, try with trunk and see if that fixes the problem). The reason I suspect this problem is because the default timeout in the java consumer is 100ms. -Jay On Tue, Oct 21, 2014 at 11:06 AM, Mohit Anchlia mohitanch...@gmail.com wrote: This is the version I am using: kafka_2.10-0.8.1.1 I think this is fairly recent version On Tue, Oct 21, 2014 at 10:57 AM, Jay Kreps jay.kr...@gmail.com wrote: What version of Kafka is this? Can you try the same test against trunk? We fixed a couple of latency related bugs which may be the cause. -Jay On Tue, Oct 21, 2014 at 10:50 AM, Mohit Anchlia mohitanch...@gmail.com wrote: It's consistently close to 100ms which makes me believe that there are some settings that I might have to tweak, however, I am not sure how to confirm that assumption :) On Tue, Oct 21, 2014 at 8:53 AM, Mohit Anchlia mohitanch...@gmail.com wrote: I have a java test that produces messages and then consumer consumers it. Consumers are active all the time. There is 1 consumer for 1 producer. I am measuring the time between the message is successfully written to the queue and the time consumer picks it up. On Tue, Oct 21, 2014 at 8:32 AM, Neha Narkhede neha.narkh...@gmail.com wrote: Can you give more information about the performance test? Which test? Which queue? How did you measure the dequeue latency. On Mon, Oct 20, 2014 at 5:09 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I am running a performance test and from what I am seeing is that messages are taking about 100ms to pop from the queue itself and hence making the test slow. I am looking for pointers of how I can troubleshoot this issue. There seems to be plenty of CPU and IO available. I am running 22 producers and 22 consumers in the same group. -- -- Guozhang
Re: Explicit topic creation and topic metadata availability
kafka-topics.sh execution, from latest trunk: ~/git/oss/kafka [trunk|✔] 21:00 $ bin/kafka-topics.sh --zookeeper 127.0.0.1:50194 --topic 059915e6-56ef-4b8e-8e95-9f676313a01c --describe SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/Users/d062007/git/oss/kafka/core/build/dependant-libs-2.10.1/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/d062007/git/oss/kafka/core/build/dependant-libs-2.10.1/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] Error while executing topic command next on empty iterator java.util.NoSuchElementException: next on empty iterator at scala.collection.Iterator$$anon$2.next(Iterator.scala:39) at scala.collection.Iterator$$anon$2.next(Iterator.scala:37) at scala.collection.IterableLike$class.head(IterableLike.scala:91) at scala.collection.AbstractIterable.head(Iterable.scala:54) at kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:170) at kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:160) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at kafka.admin.TopicCommand$.describeTopic(TopicCommand.scala:160) at kafka.admin.TopicCommand$.main(TopicCommand.scala:60) at kafka.admin.TopicCommand.main(TopicCommand.scala) Output from same command on 0.8.1 branch is better, but still same exception: ~/git/oss/kafka [0.8.1|✔] 21:12 $ bin/kafka-topics.sh --zookeeper 127.0.0.1:50194 --topic 059915e6-56ef-4b8e-8e95-9f676313a01c --describe Error while executing topic command null java.util.NoSuchElementException at scala.collection.IterableLike$class.head(IterableLike.scala:101) at scala.collection.immutable.Map$EmptyMap$.head(Map.scala:73) at kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:137) at kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:127) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) at kafka.admin.TopicCommand$.describeTopic(TopicCommand.scala:127) at kafka.admin.TopicCommand$.main(TopicCommand.scala:56) at kafka.admin.TopicCommand.main(TopicCommand.scala) On Wed, Oct 22, 2014 at 5:30 PM, Guozhang Wang wangg...@gmail.com wrote: Hello Stevo, Your understanding about the configs are correct, and it is indeed wired that the producer gets the exception after topic is created. Could you use the kafka-topics command to check if the leaders exist? kafka-topics.sh --zookeeper XXX --topic [topic-name] describe Guozhang On Wed, Oct 22, 2014 at 5:57 AM, Stevo Slavić ssla...@gmail.com wrote: Hello Apache Kafka users, Using Kafka 0.8.1.1 (single instance with single ZK 3.4.6 running locally), with auto topic creation disabled, in a test I have topic created with AdminUtils.createTopic (AdminUtils.topicExists returns true) but KafkaProducer on send request keeps throwing UnknownTopicOrPartitionException even after 100 retries, both when topic.metadata.refresh.interval.ms and retry.backoff.ms are left at defaults, and when customized. Am I doing something wrong or is this a known bug? How long does it typically take for metadata to be refreshed? How long does it take for leader to be elected? Documentation for retry.backoff.ms states: Before each retry, the producer refreshes the metadata of relevant topics to see if a new leader has been elected. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata. Do I understand this docs correctly - on failure to send a message, such as unknown topic, if retries are configured producer will wait for configured retry.backoff.ms, then it will initiate and wait for metadata refresh to complete, and only then retry sending? Kind regards, Stevo Slavic. -- -- Guozhang
Re: Explicit topic creation and topic metadata availability
Output on trunk is clean too, after clean build: ~/git/oss/kafka [trunk|✔] 22:00 $ bin/kafka-topics.sh --zookeeper 127.0.0.1:50194 --topic 059915e6-56ef-4b8e-8e95-9f676313a01c --describe Error while executing topic command next on empty iterator java.util.NoSuchElementException: next on empty iterator at scala.collection.Iterator$$anon$2.next(Iterator.scala:39) at scala.collection.Iterator$$anon$2.next(Iterator.scala:37) at scala.collection.IterableLike$class.head(IterableLike.scala:91) at scala.collection.AbstractIterable.head(Iterable.scala:54) at kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:137) at kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:127) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at kafka.admin.TopicCommand$.describeTopic(TopicCommand.scala:127) at kafka.admin.TopicCommand$.main(TopicCommand.scala:56) at kafka.admin.TopicCommand.main(TopicCommand.scala) On Wed, Oct 22, 2014 at 9:45 PM, Stevo Slavić ssla...@gmail.com wrote: kafka-topics.sh execution, from latest trunk: ~/git/oss/kafka [trunk|✔] 21:00 $ bin/kafka-topics.sh --zookeeper 127.0.0.1:50194 --topic 059915e6-56ef-4b8e-8e95-9f676313a01c --describe SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/Users/d062007/git/oss/kafka/core/build/dependant-libs-2.10.1/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/d062007/git/oss/kafka/core/build/dependant-libs-2.10.1/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] Error while executing topic command next on empty iterator java.util.NoSuchElementException: next on empty iterator at scala.collection.Iterator$$anon$2.next(Iterator.scala:39) at scala.collection.Iterator$$anon$2.next(Iterator.scala:37) at scala.collection.IterableLike$class.head(IterableLike.scala:91) at scala.collection.AbstractIterable.head(Iterable.scala:54) at kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:170) at kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:160) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at kafka.admin.TopicCommand$.describeTopic(TopicCommand.scala:160) at kafka.admin.TopicCommand$.main(TopicCommand.scala:60) at kafka.admin.TopicCommand.main(TopicCommand.scala) Output from same command on 0.8.1 branch is better, but still same exception: ~/git/oss/kafka [0.8.1|✔] 21:12 $ bin/kafka-topics.sh --zookeeper 127.0.0.1:50194 --topic 059915e6-56ef-4b8e-8e95-9f676313a01c --describe Error while executing topic command null java.util.NoSuchElementException at scala.collection.IterableLike$class.head(IterableLike.scala:101) at scala.collection.immutable.Map$EmptyMap$.head(Map.scala:73) at kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:137) at kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:127) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) at kafka.admin.TopicCommand$.describeTopic(TopicCommand.scala:127) at kafka.admin.TopicCommand$.main(TopicCommand.scala:56) at kafka.admin.TopicCommand.main(TopicCommand.scala) On Wed, Oct 22, 2014 at 5:30 PM, Guozhang Wang wangg...@gmail.com wrote: Hello Stevo, Your understanding about the configs are correct, and it is indeed wired that the producer gets the exception after topic is created. Could you use the kafka-topics command to check if the leaders exist? kafka-topics.sh --zookeeper XXX --topic [topic-name] describe Guozhang On Wed, Oct 22, 2014 at 5:57 AM, Stevo Slavić ssla...@gmail.com wrote: Hello Apache Kafka users, Using Kafka 0.8.1.1 (single instance with single ZK 3.4.6 running locally), with auto topic creation disabled, in a test I have topic created with AdminUtils.createTopic (AdminUtils.topicExists returns true) but KafkaProducer on send request keeps throwing UnknownTopicOrPartitionException even after 100 retries, both when topic.metadata.refresh.interval.ms and retry.backoff.ms are left at defaults, and when customized. Am I doing something wrong or is this a known bug? How long does it typically take for metadata to be refreshed? How long does it take for leader to be elected? Documentation for retry.backoff.ms states: Before each retry, the producer refreshes the metadata of relevant topics to
Re: How many partition can one single machine handle in Kafka?
In my experience, RAID 10 doesn't really provide value in the presence of replication. When a disk fails, the RAID resync process is so I/O intensive that it renders the broker useless until it completes. When this happens, you actually have to take the broker out of rotation and move the leaders off of it to prevent it from serving requests in a degraded state. You might as well shutdown the broker, delete the broker's data and let it catch up from the leader. On Wed, Oct 22, 2014 at 11:20 AM, Gwen Shapira gshap...@cloudera.com wrote: Makes sense. Thanks :) On Wed, Oct 22, 2014 at 11:10 AM, Jonathan Weeks jonathanbwe...@gmail.com wrote: There are various costs when a broker fails, including broker leader election for each partition, etc., as well as exposing possible issues for in-flight messages, and client rebalancing etc. So even though replication provides partition redundancy, RAID 10 on each broker is usually a good tradeoff to prevent the typical most common cause of broker server failure (e.g. disk failure) as well, and overall smoother operation. Best Regards, -Jonathan On Oct 22, 2014, at 11:01 AM, Gwen Shapira gshap...@cloudera.com wrote: RAID-10? Interesting choice for a system where the data is already replicated between nodes. Is it to avoid the cost of large replication over the network? how large are these disks? On Wed, Oct 22, 2014 at 10:00 AM, Todd Palino tpal...@gmail.com wrote: In fact there are many more than 4000 open files. Many of our brokers run with 28,000+ open files (regular file handles, not network connections). In our case, we're beefing up the disk performance as much as we can by running in a RAID-10 configuration with 14 disks. -Todd On Tue, Oct 21, 2014 at 7:58 PM, Xiaobin She xiaobin...@gmail.com wrote: Todd, Actually I'm wondering how kafka handle so much partition, with one partition there is at least one file on disk, and with 4000 partition, there will be at least 4000 files. When all these partitions have write request, how did Kafka make the write operation on the disk to be sequential (which is emphasized in the design document of Kafka) and make sure the disk access is effective? Thank you for your reply. xiaobinshe 2014-10-22 5:10 GMT+08:00 Todd Palino tpal...@gmail.com: As far as the number of partitions a single broker can handle, we've set our cap at 4000 partitions (including replicas). Above that we've seen some performance and stability issues. -Todd On Tue, Oct 21, 2014 at 12:15 AM, Xiaobin She xiaobin...@gmail.com wrote: hello, everyone I'm new to kafka, I'm wondering what's the max num of partition can one siggle machine handle in Kafka? Is there an sugeest num? Thanks. xiaobinshe
Re: How many partition can one single machine handle in Kafka?
Neha, Do you mean RAID 10 or RAID 5 or 6? With RAID 5 or 6, recovery is definitely very painful, but less so with RAID 10. We have been using the guidance here: http://www.youtube.com/watch?v=19DvtEC0EbQ#t=190 (LinkedIn Site Reliability Engineers state they run RAID 10 on all Kafka clusters @34:40 or so) Plus: https://cwiki.apache.org/confluence/display/KAFKA/Operations LinkedIn Hardware We are using dual quad-core Intel Xeon machines with 24GB of memory. In general this should not matter too much, we only see pretty low CPU usage at peak even with GZIP compression enabled and a number of clients that don't batch requests. The memory is probably more than is needed for caching the active segments of the log. The disk throughput is important. We have 8x7200 rpm SATA drives in a RAID 10 array. In general this is the performance bottleneck, and more disks is more better. Depending on how you configure flush behavior you may or may not benefit from more expensive disks (if you flush often then higher RPM SAS drives may be better). OS Settings We use Linux. Ext4 is the filesystem and we run using software RAID 10. We haven't benchmarked filesystems so other filesystems may be superior. We have added two tuning changes: (1) we upped the number of file descriptors since we have lots of topics and lots of connections, and (2) we upped the max socket buffer size to enable high-performance data transfer between data centers (described here). Best Regards, -Jonathan On Oct 22, 2014, at 3:44 PM, Neha Narkhede neha.narkh...@gmail.com wrote: In my experience, RAID 10 doesn't really provide value in the presence of replication. When a disk fails, the RAID resync process is so I/O intensive that it renders the broker useless until it completes. When this happens, you actually have to take the broker out of rotation and move the leaders off of it to prevent it from serving requests in a degraded state. You might as well shutdown the broker, delete the broker's data and let it catch up from the leader. On Wed, Oct 22, 2014 at 11:20 AM, Gwen Shapira gshap...@cloudera.com wrote: Makes sense. Thanks :) On Wed, Oct 22, 2014 at 11:10 AM, Jonathan Weeks jonathanbwe...@gmail.com wrote: There are various costs when a broker fails, including broker leader election for each partition, etc., as well as exposing possible issues for in-flight messages, and client rebalancing etc. So even though replication provides partition redundancy, RAID 10 on each broker is usually a good tradeoff to prevent the typical most common cause of broker server failure (e.g. disk failure) as well, and overall smoother operation. Best Regards, -Jonathan On Oct 22, 2014, at 11:01 AM, Gwen Shapira gshap...@cloudera.com wrote: RAID-10? Interesting choice for a system where the data is already replicated between nodes. Is it to avoid the cost of large replication over the network? how large are these disks? On Wed, Oct 22, 2014 at 10:00 AM, Todd Palino tpal...@gmail.com wrote: In fact there are many more than 4000 open files. Many of our brokers run with 28,000+ open files (regular file handles, not network connections). In our case, we're beefing up the disk performance as much as we can by running in a RAID-10 configuration with 14 disks. -Todd On Tue, Oct 21, 2014 at 7:58 PM, Xiaobin She xiaobin...@gmail.com wrote: Todd, Actually I'm wondering how kafka handle so much partition, with one partition there is at least one file on disk, and with 4000 partition, there will be at least 4000 files. When all these partitions have write request, how did Kafka make the write operation on the disk to be sequential (which is emphasized in the design document of Kafka) and make sure the disk access is effective? Thank you for your reply. xiaobinshe 2014-10-22 5:10 GMT+08:00 Todd Palino tpal...@gmail.com: As far as the number of partitions a single broker can handle, we've set our cap at 4000 partitions (including replicas). Above that we've seen some performance and stability issues. -Todd On Tue, Oct 21, 2014 at 12:15 AM, Xiaobin She xiaobin...@gmail.com wrote: hello, everyone I'm new to kafka, I'm wondering what's the max num of partition can one siggle machine handle in Kafka? Is there an sugeest num? Thanks. xiaobinshe
Re: How many partition can one single machine handle in Kafka?
Yeah, Jonathan, I'm the LinkedIn SRE who said that :) And Neha, up until recently, sat 8 feet from my desk. The data from the wiki page is off a little bit as well (we're running 14 disks now, and 64 GB systems) So to hit the first questions, RAID 10 gives higher read performance, and also allows you to suffer a disk failure without having to drop the entire cluster. As Neha noted, you're going to take a hit on the rebuild, and because of ongoing traffic in the cluster it will be for a long time (we can easily take half a day to rebuild a disk). But you still get some benefit out of the RAID over just killing the data and letting it rebuild from the replica, because during that time the cluster is not under replicated, so you can suffer another failure. The more servers and disks you have, the more often disks are going to fail, not to mention other components. Both hardware and software. I like running on the safer side. That said, I'm not sure RAID 10 is the answer either. We're going to be doing some experimenting with other disk layouts shortly. We've inherited a lot of our architecture, and many things have changed in that time. We're probably going to test out RAID 5 and 6 to start with and see how much we lose from the parity calculations. -Todd On Wed, Oct 22, 2014 at 3:59 PM, Jonathan Weeks jonathanbwe...@gmail.com wrote: Neha, Do you mean RAID 10 or RAID 5 or 6? With RAID 5 or 6, recovery is definitely very painful, but less so with RAID 10. We have been using the guidance here: http://www.youtube.com/watch?v=19DvtEC0EbQ#t=190 (LinkedIn Site Reliability Engineers state they run RAID 10 on all Kafka clusters @34:40 or so) Plus: https://cwiki.apache.org/confluence/display/KAFKA/Operations LinkedIn Hardware We are using dual quad-core Intel Xeon machines with 24GB of memory. In general this should not matter too much, we only see pretty low CPU usage at peak even with GZIP compression enabled and a number of clients that don't batch requests. The memory is probably more than is needed for caching the active segments of the log. The disk throughput is important. We have 8x7200 rpm SATA drives in a RAID 10 array. In general this is the performance bottleneck, and more disks is more better. Depending on how you configure flush behavior you may or may not benefit from more expensive disks (if you flush often then higher RPM SAS drives may be better). OS Settings We use Linux. Ext4 is the filesystem and we run using software RAID 10. We haven't benchmarked filesystems so other filesystems may be superior. We have added two tuning changes: (1) we upped the number of file descriptors since we have lots of topics and lots of connections, and (2) we upped the max socket buffer size to enable high-performance data transfer between data centers (described here). Best Regards, -Jonathan On Oct 22, 2014, at 3:44 PM, Neha Narkhede neha.narkh...@gmail.com wrote: In my experience, RAID 10 doesn't really provide value in the presence of replication. When a disk fails, the RAID resync process is so I/O intensive that it renders the broker useless until it completes. When this happens, you actually have to take the broker out of rotation and move the leaders off of it to prevent it from serving requests in a degraded state. You might as well shutdown the broker, delete the broker's data and let it catch up from the leader. On Wed, Oct 22, 2014 at 11:20 AM, Gwen Shapira gshap...@cloudera.com wrote: Makes sense. Thanks :) On Wed, Oct 22, 2014 at 11:10 AM, Jonathan Weeks jonathanbwe...@gmail.com wrote: There are various costs when a broker fails, including broker leader election for each partition, etc., as well as exposing possible issues for in-flight messages, and client rebalancing etc. So even though replication provides partition redundancy, RAID 10 on each broker is usually a good tradeoff to prevent the typical most common cause of broker server failure (e.g. disk failure) as well, and overall smoother operation. Best Regards, -Jonathan On Oct 22, 2014, at 11:01 AM, Gwen Shapira gshap...@cloudera.com wrote: RAID-10? Interesting choice for a system where the data is already replicated between nodes. Is it to avoid the cost of large replication over the network? how large are these disks? On Wed, Oct 22, 2014 at 10:00 AM, Todd Palino tpal...@gmail.com wrote: In fact there are many more than 4000 open files. Many of our brokers run with 28,000+ open files (regular file handles, not network connections). In our case, we're beefing up the disk performance as much as we can by running in a RAID-10 configuration with 14 disks. -Todd On Tue, Oct 21, 2014 at 7:58 PM, Xiaobin She xiaobin...@gmail.com wrote: Todd, Actually I'm wondering how kafka handle so much partition, with one partition there is at least one file on disk, and with 4000 partition, there will
Re: Explicit topic creation and topic metadata availability
Still have to understand what is going on, but when I set kafka.utils.ZKStringSerializer to be ZkSerializer for ZkClient used in AdminUtils calls, KafkaProducer could see created topic... Default ZkSerializer is org.I0Itec.zkclient.serialize.SerializableSerializer. Kind regards, Stevo Slavic. On Wed, Oct 22, 2014 at 10:03 PM, Stevo Slavić ssla...@gmail.com wrote: Output on trunk is clean too, after clean build: ~/git/oss/kafka [trunk|✔] 22:00 $ bin/kafka-topics.sh --zookeeper 127.0.0.1:50194 --topic 059915e6-56ef-4b8e-8e95-9f676313a01c --describe Error while executing topic command next on empty iterator java.util.NoSuchElementException: next on empty iterator at scala.collection.Iterator$$anon$2.next(Iterator.scala:39) at scala.collection.Iterator$$anon$2.next(Iterator.scala:37) at scala.collection.IterableLike$class.head(IterableLike.scala:91) at scala.collection.AbstractIterable.head(Iterable.scala:54) at kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:137) at kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:127) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at kafka.admin.TopicCommand$.describeTopic(TopicCommand.scala:127) at kafka.admin.TopicCommand$.main(TopicCommand.scala:56) at kafka.admin.TopicCommand.main(TopicCommand.scala) On Wed, Oct 22, 2014 at 9:45 PM, Stevo Slavić ssla...@gmail.com wrote: kafka-topics.sh execution, from latest trunk: ~/git/oss/kafka [trunk|✔] 21:00 $ bin/kafka-topics.sh --zookeeper 127.0.0.1:50194 --topic 059915e6-56ef-4b8e-8e95-9f676313a01c --describe SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/Users/d062007/git/oss/kafka/core/build/dependant-libs-2.10.1/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/d062007/git/oss/kafka/core/build/dependant-libs-2.10.1/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] Error while executing topic command next on empty iterator java.util.NoSuchElementException: next on empty iterator at scala.collection.Iterator$$anon$2.next(Iterator.scala:39) at scala.collection.Iterator$$anon$2.next(Iterator.scala:37) at scala.collection.IterableLike$class.head(IterableLike.scala:91) at scala.collection.AbstractIterable.head(Iterable.scala:54) at kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:170) at kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:160) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at kafka.admin.TopicCommand$.describeTopic(TopicCommand.scala:160) at kafka.admin.TopicCommand$.main(TopicCommand.scala:60) at kafka.admin.TopicCommand.main(TopicCommand.scala) Output from same command on 0.8.1 branch is better, but still same exception: ~/git/oss/kafka [0.8.1|✔] 21:12 $ bin/kafka-topics.sh --zookeeper 127.0.0.1:50194 --topic 059915e6-56ef-4b8e-8e95-9f676313a01c --describe Error while executing topic command null java.util.NoSuchElementException at scala.collection.IterableLike$class.head(IterableLike.scala:101) at scala.collection.immutable.Map$EmptyMap$.head(Map.scala:73) at kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:137) at kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:127) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) at kafka.admin.TopicCommand$.describeTopic(TopicCommand.scala:127) at kafka.admin.TopicCommand$.main(TopicCommand.scala:56) at kafka.admin.TopicCommand.main(TopicCommand.scala) On Wed, Oct 22, 2014 at 5:30 PM, Guozhang Wang wangg...@gmail.com wrote: Hello Stevo, Your understanding about the configs are correct, and it is indeed wired that the producer gets the exception after topic is created. Could you use the kafka-topics command to check if the leaders exist? kafka-topics.sh --zookeeper XXX --topic [topic-name] describe Guozhang On Wed, Oct 22, 2014 at 5:57 AM, Stevo Slavić ssla...@gmail.com wrote: Hello Apache Kafka users, Using Kafka 0.8.1.1 (single instance with single ZK 3.4.6 running locally), with auto topic creation disabled, in a test I have topic created with AdminUtils.createTopic (AdminUtils.topicExists returns true) but KafkaProducer on send request keeps throwing UnknownTopicOrPartitionException even after 100 retries, both when
Re: How many partition can one single machine handle in Kafka?
I suppose it also is going to depend on: a) How much spare I/O bandwidth the brokers have as well to support a rebuild while supporting ongoing requests. Our brokers have spare IO capacity. b) How many brokers are in the cluster and what the replication factor is — e.g. if you have a larger cluster, it is easier to tolerate the loss of a single broker. We started with 3 brokers, so the loss of a single broker is quite significant — we would prefer possibly degraded performance to having a “down” broker. I do understand that y’all both work at LinkedIn, my point is that all of the guidance to date (as recently as this summer) is that in production LinkedIn runs on RAID 10, so it is just a bit odd to hear a contrary recommendation, although I do understand that best practices are a moving, evolving target. Best Regards, -Jonathan On Oct 22, 2014, at 4:05 PM, Todd Palino tpal...@gmail.com wrote: Yeah, Jonathan, I'm the LinkedIn SRE who said that :) And Neha, up until recently, sat 8 feet from my desk. The data from the wiki page is off a little bit as well (we're running 14 disks now, and 64 GB systems) So to hit the first questions, RAID 10 gives higher read performance, and also allows you to suffer a disk failure without having to drop the entire cluster. As Neha noted, you're going to take a hit on the rebuild, and because of ongoing traffic in the cluster it will be for a long time (we can easily take half a day to rebuild a disk). But you still get some benefit out of the RAID over just killing the data and letting it rebuild from the replica, because during that time the cluster is not under replicated, so you can suffer another failure. The more servers and disks you have, the more often disks are going to fail, not to mention other components. Both hardware and software. I like running on the safer side. That said, I'm not sure RAID 10 is the answer either. We're going to be doing some experimenting with other disk layouts shortly. We've inherited a lot of our architecture, and many things have changed in that time. We're probably going to test out RAID 5 and 6 to start with and see how much we lose from the parity calculations. -Todd On Wed, Oct 22, 2014 at 3:59 PM, Jonathan Weeks jonathanbwe...@gmail.com wrote: Neha, Do you mean RAID 10 or RAID 5 or 6? With RAID 5 or 6, recovery is definitely very painful, but less so with RAID 10. We have been using the guidance here: http://www.youtube.com/watch?v=19DvtEC0EbQ#t=190 (LinkedIn Site Reliability Engineers state they run RAID 10 on all Kafka clusters @34:40 or so) Plus: https://cwiki.apache.org/confluence/display/KAFKA/Operations LinkedIn Hardware We are using dual quad-core Intel Xeon machines with 24GB of memory. In general this should not matter too much, we only see pretty low CPU usage at peak even with GZIP compression enabled and a number of clients that don't batch requests. The memory is probably more than is needed for caching the active segments of the log. The disk throughput is important. We have 8x7200 rpm SATA drives in a RAID 10 array. In general this is the performance bottleneck, and more disks is more better. Depending on how you configure flush behavior you may or may not benefit from more expensive disks (if you flush often then higher RPM SAS drives may be better). OS Settings We use Linux. Ext4 is the filesystem and we run using software RAID 10. We haven't benchmarked filesystems so other filesystems may be superior. We have added two tuning changes: (1) we upped the number of file descriptors since we have lots of topics and lots of connections, and (2) we upped the max socket buffer size to enable high-performance data transfer between data centers (described here). Best Regards, -Jonathan On Oct 22, 2014, at 3:44 PM, Neha Narkhede neha.narkh...@gmail.com wrote: In my experience, RAID 10 doesn't really provide value in the presence of replication. When a disk fails, the RAID resync process is so I/O intensive that it renders the broker useless until it completes. When this happens, you actually have to take the broker out of rotation and move the leaders off of it to prevent it from serving requests in a degraded state. You might as well shutdown the broker, delete the broker's data and let it catch up from the leader. On Wed, Oct 22, 2014 at 11:20 AM, Gwen Shapira gshap...@cloudera.com wrote: Makes sense. Thanks :) On Wed, Oct 22, 2014 at 11:10 AM, Jonathan Weeks jonathanbwe...@gmail.com wrote: There are various costs when a broker fails, including broker leader election for each partition, etc., as well as exposing possible issues for in-flight messages, and client rebalancing etc. So even though replication provides partition redundancy, RAID 10 on each broker is usually a good tradeoff to prevent the typical most common cause of broker server failure (e.g. disk failure) as well, and overall
Re: Erratic behavior when quickly re-balancing
Hello Eric, 1) The rebalance failures is mainly on ZK session timeout, you could try to increase your zk session timeout value and see if that helps. 2) The new consumer in 0.9 re-write will resolve this problem by getting rid of the ZK dependency and use a centralized coordinator for rebalance logic. Guozhang On Wed, Oct 22, 2014 at 9:32 AM, Eric Bottard ebott...@pivotal.io wrote: For the purpose of a unit/integration test for Spring XD, I am creating several consumers (in the same group) in quick succession. With just 3 consumers, this triggers 2 rebalances that (on some machines) can't be dealt with in the default 4*2000ms and fails. I have created a simple use case that reproduces this out of the context of Spring XD. It can be found in the main() method of [1]. If it does not fail on your machine, I believe bumping the number of creations to 4 or 5 may do it.. So, I have a couple of questions: - on my machine, when using various combinations of rebalance.backoff.ms and rebalance.max.retries, this failure always seems to happen after 30s, whatever the combination. On some other (more powerful) machines, it seems to never fail. Is this actually cpu bound? 30s sounds a lot like twice the default tick, so is this at all related to an ephemeral node timing out? - given that this is for the purposes of an integration test, is there any other parameter that I could tweak to have the system settle down faster? - Is this something that is likely to change with the 0.9 rewrite (I saw that the current de-centralized rebalancing mechanics are the cause of other issues)? As a workaround, I tried waiting for nodes to re-appear in ZooKeeper (code at [2]), but this is still very slow (not to mention that this would be very intrusive to the tests I'm trying to write) Lastly, I should mention that I do need to create 3 consumers sequentially (as opposed to say, use a 3 threads consumer). The test in question simply happens to mimic the creation of consumers that may well be on 3 separate machines Best, [1] https://gist.github.com/ericbottard/91aa9ee114c6091e5b7b [2] https://gist.github.com/ericbottard/de87f1edc8eee2b9bee5 -- Eric Bottard -- -- Guozhang
Re: How many partition can one single machine handle in Kafka?
Todd, Thank you for the information. With 28,000+ files and 14 disks, that makes there are averagely about 4000 open files on two disk ( which is treated as one single disk) , am I right? How do you manage to make the all the write operation to thest 4000 open files be sequential to the disk? As far as I know, write operation to different files on the same disk will cause random write, which is not good for performance. xiaobinshe 2014-10-23 1:00 GMT+08:00 Todd Palino tpal...@gmail.com: In fact there are many more than 4000 open files. Many of our brokers run with 28,000+ open files (regular file handles, not network connections). In our case, we're beefing up the disk performance as much as we can by running in a RAID-10 configuration with 14 disks. -Todd On Tue, Oct 21, 2014 at 7:58 PM, Xiaobin She xiaobin...@gmail.com wrote: Todd, Actually I'm wondering how kafka handle so much partition, with one partition there is at least one file on disk, and with 4000 partition, there will be at least 4000 files. When all these partitions have write request, how did Kafka make the write operation on the disk to be sequential (which is emphasized in the design document of Kafka) and make sure the disk access is effective? Thank you for your reply. xiaobinshe 2014-10-22 5:10 GMT+08:00 Todd Palino tpal...@gmail.com: As far as the number of partitions a single broker can handle, we've set our cap at 4000 partitions (including replicas). Above that we've seen some performance and stability issues. -Todd On Tue, Oct 21, 2014 at 12:15 AM, Xiaobin She xiaobin...@gmail.com wrote: hello, everyone I'm new to kafka, I'm wondering what's the max num of partition can one siggle machine handle in Kafka? Is there an sugeest num? Thanks. xiaobinshe