Replacing brokers in a cluster (0.8)

2013-07-22 Thread Jason Rosenberg
I'm planning to upgrade a 0.8 cluster from 2 old nodes, to 3 new ones
(better hardware).  I'm using a replication factor of 2.

I'm thinking the plan should be to spin up the 3 new nodes, and operate as
a 5 node cluster for a while.  Then first remove 1 of the old nodes, and
wait for the partitions on the removed node to get replicated to the other
nodes.  Then, do the same for the other old node.

Does this sound sensible?

How does the cluster decide when to re-replicate partitions that are on a
node that is no longer available?  Does it only happen if/when new messages
arrive for that partition?  Is it on a partition by partition basis?

Or is it a cluster-level decision that a broker is no longer valid, in
which case all affected partitions would immediately get replicated to new
brokers as needed?

I'm just wondering how I will know when it will be safe to take down my
second old node, after the first one is removed, etc.

Thanks,

Jason


Re: Replacing brokers in a cluster (0.8)

2013-07-22 Thread Glenn Nethercutt
This seems like the type of behavior I'd ultimately want from the 
controlled shutdown tool 
https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-1.ControlledShutdown.


Currently, I believe the ShutdownBroker causes new leaders to be 
selected for any partition the dying node was leading, but I don't think 
it explicitly forces a rebalance for topics in which the dying node was 
just an ISR (in-sync replica set) member. Ostensibly, leadership 
elections are what we want to avoid, due to the Zookeeper chattiness 
that would ensue for ensembles with lots of partitions, but I'd wager 
we'd benefit from a reduction in rebalances too.  The preferred 
replication election tool also seems to have some similar level of 
control (manual selection of the preferred replicas), but still doesn't 
let you add/remove brokers from the ISR directly.  I know the 
kafka-reassign-partitions tool lets you specify a full list of 
partitions and replica assignment, but I don't know how easily 
integrated that will be with the lifecycle you described.


Anyone know if controlled shutdown is the right tool for this? Our 
devops team will certainly be interested in the canonical answer as well.


--glenn

On 07/22/2013 05:14 AM, Jason Rosenberg wrote:

I'm planning to upgrade a 0.8 cluster from 2 old nodes, to 3 new ones
(better hardware).  I'm using a replication factor of 2.

I'm thinking the plan should be to spin up the 3 new nodes, and operate as
a 5 node cluster for a while.  Then first remove 1 of the old nodes, and
wait for the partitions on the removed node to get replicated to the other
nodes.  Then, do the same for the other old node.

Does this sound sensible?

How does the cluster decide when to re-replicate partitions that are on a
node that is no longer available?  Does it only happen if/when new messages
arrive for that partition?  Is it on a partition by partition basis?

Or is it a cluster-level decision that a broker is no longer valid, in
which case all affected partitions would immediately get replicated to new
brokers as needed?

I'm just wondering how I will know when it will be safe to take down my
second old node, after the first one is removed, etc.

Thanks,

Jason





Re: Replacing brokers in a cluster (0.8)

2013-07-22 Thread Jason Rosenberg
Is the kafka-reassign-partitions tool something I can experiment with now
(this will only be staging data, in the first go-round).  How does it work?
 Do I manually have to specify each replica I want to move?  This would be
cumbersome, as I have on the order of 100's of topicsOr does the tool
have the ability to specify all replicas on a particular broker?  How can I
easily check whether a partition has all its replicas in the ISR?

For some reason, I had thought there would be a default behavior, whereby a
replica could automatically be declared dead after a configurable timeout
period.

Re-assigning broker id's would not be ideal, since I have a scheme
currently whereby broker id's are auto-generated, from a hostname/ip, etc.
 I could make it work, but it's not my preference to override that!

Jason


On Mon, Jul 22, 2013 at 11:50 AM, Jun Rao jun...@gmail.com wrote:

 A replica's data won't be automatically moved to another broker where there
 are failures. This is because we don't know if the failure is transient or
 permanent. The right tool to use is the kafka-reassign-partitions tool. It
 hasn't been thoroughly tested tough. We hope to harden it in the final
 0.8.0 release.

 You can also replace a broker with a new server by keeping the same broker
 id. When the new server starts up, it will replica data from the leader.
 You know the data is fully replicated when both replicas are in ISR.

 Thanks,

 Jun


 On Mon, Jul 22, 2013 at 2:14 AM, Jason Rosenberg j...@squareup.com wrote:

  I'm planning to upgrade a 0.8 cluster from 2 old nodes, to 3 new ones
  (better hardware).  I'm using a replication factor of 2.
 
  I'm thinking the plan should be to spin up the 3 new nodes, and operate
 as
  a 5 node cluster for a while.  Then first remove 1 of the old nodes, and
  wait for the partitions on the removed node to get replicated to the
 other
  nodes.  Then, do the same for the other old node.
 
  Does this sound sensible?
 
  How does the cluster decide when to re-replicate partitions that are on a
  node that is no longer available?  Does it only happen if/when new
 messages
  arrive for that partition?  Is it on a partition by partition basis?
 
  Or is it a cluster-level decision that a broker is no longer valid, in
  which case all affected partitions would immediately get replicated to
 new
  brokers as needed?
 
  I'm just wondering how I will know when it will be safe to take down my
  second old node, after the first one is removed, etc.
 
  Thanks,
 
  Jason
 



Re: Replacing brokers in a cluster (0.8)

2013-07-22 Thread Scott Clasen
Here's a ruby cli that you can use to replace brokers...it shells out to
the kafka-reassign-partitions.sh tool after figuring out broker lists from
zk. Hope its useful.


#!/usr/bin/env ruby

require 'excon'
require 'json'
require 'zookeeper'

def replace(arr, o, n)
  arr.map{|v| v == o ? n : v }
end

if ARGV.length != 4
  puts Usage: bundle exec bin/replace-instance zkstr topic-name
old-broker-id new-broker-id
else
  zkstr = ARGV[0]
  zk = Zookeeper.new(zkstr)
  topic = ARGV[1]
  old = ARGV[2].to_i
  new = ARGV[3].to_i
  puts Replacing broker #{old} with #{new} on all partitions of topic
#{topic}

  current = JSON.parse(zk.get(:path = /brokers/topics/#{topic})[:data])
  replacements_array = []
  replacements = {partitions = replacements_array}
  current[partitions].each { |partition, brokers|
replacements_array.push({topic = topic, partition = partition.to_i,
replicas = replace(brokers, old, new)}) }

  replacement_json = JSON.generate(replacements)


  file = /tmp/replace-#{topic}-#{old}-#{new}
  if File.exist?(file)
File.delete file
  end
  File.open(file, 'w') { |f| f.write(replacement_json) }

  puts ./bin/kafka-reassign-partitions.sh --zookeeper #{zkstr}
--path-to-json-file #{file}
  system ./bin/kafka-reassign-partitions.sh --zookeeper #{zkstr}
--path-to-json-file #{file}





On Mon, Jul 22, 2013 at 10:40 AM, Jason Rosenberg j...@squareup.com wrote:

 Is the kafka-reassign-partitions tool something I can experiment with now
 (this will only be staging data, in the first go-round).  How does it work?
  Do I manually have to specify each replica I want to move?  This would be
 cumbersome, as I have on the order of 100's of topicsOr does the tool
 have the ability to specify all replicas on a particular broker?  How can I
 easily check whether a partition has all its replicas in the ISR?

 For some reason, I had thought there would be a default behavior, whereby a
 replica could automatically be declared dead after a configurable timeout
 period.

 Re-assigning broker id's would not be ideal, since I have a scheme
 currently whereby broker id's are auto-generated, from a hostname/ip, etc.
  I could make it work, but it's not my preference to override that!

 Jason


 On Mon, Jul 22, 2013 at 11:50 AM, Jun Rao jun...@gmail.com wrote:

  A replica's data won't be automatically moved to another broker where
 there
  are failures. This is because we don't know if the failure is transient
 or
  permanent. The right tool to use is the kafka-reassign-partitions tool.
 It
  hasn't been thoroughly tested tough. We hope to harden it in the final
  0.8.0 release.
 
  You can also replace a broker with a new server by keeping the same
 broker
  id. When the new server starts up, it will replica data from the leader.
  You know the data is fully replicated when both replicas are in ISR.
 
  Thanks,
 
  Jun
 
 
  On Mon, Jul 22, 2013 at 2:14 AM, Jason Rosenberg j...@squareup.com
 wrote:
 
   I'm planning to upgrade a 0.8 cluster from 2 old nodes, to 3 new ones
   (better hardware).  I'm using a replication factor of 2.
  
   I'm thinking the plan should be to spin up the 3 new nodes, and operate
  as
   a 5 node cluster for a while.  Then first remove 1 of the old nodes,
 and
   wait for the partitions on the removed node to get replicated to the
  other
   nodes.  Then, do the same for the other old node.
  
   Does this sound sensible?
  
   How does the cluster decide when to re-replicate partitions that are
 on a
   node that is no longer available?  Does it only happen if/when new
  messages
   arrive for that partition?  Is it on a partition by partition basis?
  
   Or is it a cluster-level decision that a broker is no longer valid, in
   which case all affected partitions would immediately get replicated to
  new
   brokers as needed?
  
   I'm just wondering how I will know when it will be safe to take down my
   second old node, after the first one is removed, etc.
  
   Thanks,
  
   Jason
  
 



Re: Replacing brokers in a cluster (0.8)

2013-07-22 Thread Jun Rao
You can try kafka-reassign-partitions now. You do have to specify the new
replica assignment manually. We are improving that tool to make it more
automatic.

Thanks,

Jun


On Mon, Jul 22, 2013 at 10:40 AM, Jason Rosenberg j...@squareup.com wrote:

 Is the kafka-reassign-partitions tool something I can experiment with now
 (this will only be staging data, in the first go-round).  How does it work?
  Do I manually have to specify each replica I want to move?  This would be
 cumbersome, as I have on the order of 100's of topicsOr does the tool
 have the ability to specify all replicas on a particular broker?  How can I
 easily check whether a partition has all its replicas in the ISR?

 For some reason, I had thought there would be a default behavior, whereby a
 replica could automatically be declared dead after a configurable timeout
 period.

 Re-assigning broker id's would not be ideal, since I have a scheme
 currently whereby broker id's are auto-generated, from a hostname/ip, etc.
  I could make it work, but it's not my preference to override that!

 Jason


 On Mon, Jul 22, 2013 at 11:50 AM, Jun Rao jun...@gmail.com wrote:

  A replica's data won't be automatically moved to another broker where
 there
  are failures. This is because we don't know if the failure is transient
 or
  permanent. The right tool to use is the kafka-reassign-partitions tool.
 It
  hasn't been thoroughly tested tough. We hope to harden it in the final
  0.8.0 release.
 
  You can also replace a broker with a new server by keeping the same
 broker
  id. When the new server starts up, it will replica data from the leader.
  You know the data is fully replicated when both replicas are in ISR.
 
  Thanks,
 
  Jun
 
 
  On Mon, Jul 22, 2013 at 2:14 AM, Jason Rosenberg j...@squareup.com
 wrote:
 
   I'm planning to upgrade a 0.8 cluster from 2 old nodes, to 3 new ones
   (better hardware).  I'm using a replication factor of 2.
  
   I'm thinking the plan should be to spin up the 3 new nodes, and operate
  as
   a 5 node cluster for a while.  Then first remove 1 of the old nodes,
 and
   wait for the partitions on the removed node to get replicated to the
  other
   nodes.  Then, do the same for the other old node.
  
   Does this sound sensible?
  
   How does the cluster decide when to re-replicate partitions that are
 on a
   node that is no longer available?  Does it only happen if/when new
  messages
   arrive for that partition?  Is it on a partition by partition basis?
  
   Or is it a cluster-level decision that a broker is no longer valid, in
   which case all affected partitions would immediately get replicated to
  new
   brokers as needed?
  
   I'm just wondering how I will know when it will be safe to take down my
   second old node, after the first one is removed, etc.
  
   Thanks,
  
   Jason