Replacing brokers in a cluster (0.8)
I'm planning to upgrade a 0.8 cluster from 2 old nodes, to 3 new ones (better hardware). I'm using a replication factor of 2. I'm thinking the plan should be to spin up the 3 new nodes, and operate as a 5 node cluster for a while. Then first remove 1 of the old nodes, and wait for the partitions on the removed node to get replicated to the other nodes. Then, do the same for the other old node. Does this sound sensible? How does the cluster decide when to re-replicate partitions that are on a node that is no longer available? Does it only happen if/when new messages arrive for that partition? Is it on a partition by partition basis? Or is it a cluster-level decision that a broker is no longer valid, in which case all affected partitions would immediately get replicated to new brokers as needed? I'm just wondering how I will know when it will be safe to take down my second old node, after the first one is removed, etc. Thanks, Jason
Re: Replacing brokers in a cluster (0.8)
This seems like the type of behavior I'd ultimately want from the controlled shutdown tool https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-1.ControlledShutdown. Currently, I believe the ShutdownBroker causes new leaders to be selected for any partition the dying node was leading, but I don't think it explicitly forces a rebalance for topics in which the dying node was just an ISR (in-sync replica set) member. Ostensibly, leadership elections are what we want to avoid, due to the Zookeeper chattiness that would ensue for ensembles with lots of partitions, but I'd wager we'd benefit from a reduction in rebalances too. The preferred replication election tool also seems to have some similar level of control (manual selection of the preferred replicas), but still doesn't let you add/remove brokers from the ISR directly. I know the kafka-reassign-partitions tool lets you specify a full list of partitions and replica assignment, but I don't know how easily integrated that will be with the lifecycle you described. Anyone know if controlled shutdown is the right tool for this? Our devops team will certainly be interested in the canonical answer as well. --glenn On 07/22/2013 05:14 AM, Jason Rosenberg wrote: I'm planning to upgrade a 0.8 cluster from 2 old nodes, to 3 new ones (better hardware). I'm using a replication factor of 2. I'm thinking the plan should be to spin up the 3 new nodes, and operate as a 5 node cluster for a while. Then first remove 1 of the old nodes, and wait for the partitions on the removed node to get replicated to the other nodes. Then, do the same for the other old node. Does this sound sensible? How does the cluster decide when to re-replicate partitions that are on a node that is no longer available? Does it only happen if/when new messages arrive for that partition? Is it on a partition by partition basis? Or is it a cluster-level decision that a broker is no longer valid, in which case all affected partitions would immediately get replicated to new brokers as needed? I'm just wondering how I will know when it will be safe to take down my second old node, after the first one is removed, etc. Thanks, Jason
Re: Replacing brokers in a cluster (0.8)
Is the kafka-reassign-partitions tool something I can experiment with now (this will only be staging data, in the first go-round). How does it work? Do I manually have to specify each replica I want to move? This would be cumbersome, as I have on the order of 100's of topicsOr does the tool have the ability to specify all replicas on a particular broker? How can I easily check whether a partition has all its replicas in the ISR? For some reason, I had thought there would be a default behavior, whereby a replica could automatically be declared dead after a configurable timeout period. Re-assigning broker id's would not be ideal, since I have a scheme currently whereby broker id's are auto-generated, from a hostname/ip, etc. I could make it work, but it's not my preference to override that! Jason On Mon, Jul 22, 2013 at 11:50 AM, Jun Rao jun...@gmail.com wrote: A replica's data won't be automatically moved to another broker where there are failures. This is because we don't know if the failure is transient or permanent. The right tool to use is the kafka-reassign-partitions tool. It hasn't been thoroughly tested tough. We hope to harden it in the final 0.8.0 release. You can also replace a broker with a new server by keeping the same broker id. When the new server starts up, it will replica data from the leader. You know the data is fully replicated when both replicas are in ISR. Thanks, Jun On Mon, Jul 22, 2013 at 2:14 AM, Jason Rosenberg j...@squareup.com wrote: I'm planning to upgrade a 0.8 cluster from 2 old nodes, to 3 new ones (better hardware). I'm using a replication factor of 2. I'm thinking the plan should be to spin up the 3 new nodes, and operate as a 5 node cluster for a while. Then first remove 1 of the old nodes, and wait for the partitions on the removed node to get replicated to the other nodes. Then, do the same for the other old node. Does this sound sensible? How does the cluster decide when to re-replicate partitions that are on a node that is no longer available? Does it only happen if/when new messages arrive for that partition? Is it on a partition by partition basis? Or is it a cluster-level decision that a broker is no longer valid, in which case all affected partitions would immediately get replicated to new brokers as needed? I'm just wondering how I will know when it will be safe to take down my second old node, after the first one is removed, etc. Thanks, Jason
Re: Replacing brokers in a cluster (0.8)
Here's a ruby cli that you can use to replace brokers...it shells out to the kafka-reassign-partitions.sh tool after figuring out broker lists from zk. Hope its useful. #!/usr/bin/env ruby require 'excon' require 'json' require 'zookeeper' def replace(arr, o, n) arr.map{|v| v == o ? n : v } end if ARGV.length != 4 puts Usage: bundle exec bin/replace-instance zkstr topic-name old-broker-id new-broker-id else zkstr = ARGV[0] zk = Zookeeper.new(zkstr) topic = ARGV[1] old = ARGV[2].to_i new = ARGV[3].to_i puts Replacing broker #{old} with #{new} on all partitions of topic #{topic} current = JSON.parse(zk.get(:path = /brokers/topics/#{topic})[:data]) replacements_array = [] replacements = {partitions = replacements_array} current[partitions].each { |partition, brokers| replacements_array.push({topic = topic, partition = partition.to_i, replicas = replace(brokers, old, new)}) } replacement_json = JSON.generate(replacements) file = /tmp/replace-#{topic}-#{old}-#{new} if File.exist?(file) File.delete file end File.open(file, 'w') { |f| f.write(replacement_json) } puts ./bin/kafka-reassign-partitions.sh --zookeeper #{zkstr} --path-to-json-file #{file} system ./bin/kafka-reassign-partitions.sh --zookeeper #{zkstr} --path-to-json-file #{file} On Mon, Jul 22, 2013 at 10:40 AM, Jason Rosenberg j...@squareup.com wrote: Is the kafka-reassign-partitions tool something I can experiment with now (this will only be staging data, in the first go-round). How does it work? Do I manually have to specify each replica I want to move? This would be cumbersome, as I have on the order of 100's of topicsOr does the tool have the ability to specify all replicas on a particular broker? How can I easily check whether a partition has all its replicas in the ISR? For some reason, I had thought there would be a default behavior, whereby a replica could automatically be declared dead after a configurable timeout period. Re-assigning broker id's would not be ideal, since I have a scheme currently whereby broker id's are auto-generated, from a hostname/ip, etc. I could make it work, but it's not my preference to override that! Jason On Mon, Jul 22, 2013 at 11:50 AM, Jun Rao jun...@gmail.com wrote: A replica's data won't be automatically moved to another broker where there are failures. This is because we don't know if the failure is transient or permanent. The right tool to use is the kafka-reassign-partitions tool. It hasn't been thoroughly tested tough. We hope to harden it in the final 0.8.0 release. You can also replace a broker with a new server by keeping the same broker id. When the new server starts up, it will replica data from the leader. You know the data is fully replicated when both replicas are in ISR. Thanks, Jun On Mon, Jul 22, 2013 at 2:14 AM, Jason Rosenberg j...@squareup.com wrote: I'm planning to upgrade a 0.8 cluster from 2 old nodes, to 3 new ones (better hardware). I'm using a replication factor of 2. I'm thinking the plan should be to spin up the 3 new nodes, and operate as a 5 node cluster for a while. Then first remove 1 of the old nodes, and wait for the partitions on the removed node to get replicated to the other nodes. Then, do the same for the other old node. Does this sound sensible? How does the cluster decide when to re-replicate partitions that are on a node that is no longer available? Does it only happen if/when new messages arrive for that partition? Is it on a partition by partition basis? Or is it a cluster-level decision that a broker is no longer valid, in which case all affected partitions would immediately get replicated to new brokers as needed? I'm just wondering how I will know when it will be safe to take down my second old node, after the first one is removed, etc. Thanks, Jason
Re: Replacing brokers in a cluster (0.8)
You can try kafka-reassign-partitions now. You do have to specify the new replica assignment manually. We are improving that tool to make it more automatic. Thanks, Jun On Mon, Jul 22, 2013 at 10:40 AM, Jason Rosenberg j...@squareup.com wrote: Is the kafka-reassign-partitions tool something I can experiment with now (this will only be staging data, in the first go-round). How does it work? Do I manually have to specify each replica I want to move? This would be cumbersome, as I have on the order of 100's of topicsOr does the tool have the ability to specify all replicas on a particular broker? How can I easily check whether a partition has all its replicas in the ISR? For some reason, I had thought there would be a default behavior, whereby a replica could automatically be declared dead after a configurable timeout period. Re-assigning broker id's would not be ideal, since I have a scheme currently whereby broker id's are auto-generated, from a hostname/ip, etc. I could make it work, but it's not my preference to override that! Jason On Mon, Jul 22, 2013 at 11:50 AM, Jun Rao jun...@gmail.com wrote: A replica's data won't be automatically moved to another broker where there are failures. This is because we don't know if the failure is transient or permanent. The right tool to use is the kafka-reassign-partitions tool. It hasn't been thoroughly tested tough. We hope to harden it in the final 0.8.0 release. You can also replace a broker with a new server by keeping the same broker id. When the new server starts up, it will replica data from the leader. You know the data is fully replicated when both replicas are in ISR. Thanks, Jun On Mon, Jul 22, 2013 at 2:14 AM, Jason Rosenberg j...@squareup.com wrote: I'm planning to upgrade a 0.8 cluster from 2 old nodes, to 3 new ones (better hardware). I'm using a replication factor of 2. I'm thinking the plan should be to spin up the 3 new nodes, and operate as a 5 node cluster for a while. Then first remove 1 of the old nodes, and wait for the partitions on the removed node to get replicated to the other nodes. Then, do the same for the other old node. Does this sound sensible? How does the cluster decide when to re-replicate partitions that are on a node that is no longer available? Does it only happen if/when new messages arrive for that partition? Is it on a partition by partition basis? Or is it a cluster-level decision that a broker is no longer valid, in which case all affected partitions would immediately get replicated to new brokers as needed? I'm just wondering how I will know when it will be safe to take down my second old node, after the first one is removed, etc. Thanks, Jason