The following pull request was submitted through Github. It can be accessed and reviewed at: https://github.com/lxc/lxd/pull/5986
This e-mail was sent by the LXC bot, direct replies will not reach the author unless they happen to be subscribed to this list. === Description (from pull-request) === This PR adds an additional field to the `APIHeartbeatMember` struct called `Nodes`. It is equivalent to the existing `Members` field, except that it is a map keyed on node address rather than node ID. This is because sending a unified set of node information by combining the `nodes` and `raft_nodes` tables is problematic because the `ID` field in each table is not guaranteed to be the same for the each of the raft nodes. For backwards compatibility with non-upgrade nodes, the old `Members` field is still sent, although it will potentially be missing some nodes in the case of an ID clash between the 2 tables. Also includes fix from @stgraber in https://github.com/lxc/lxd/pull/5985
From 4767b74dfdeddf8aa43d61addcab23df66cb37c5 Mon Sep 17 00:00:00 2001 From: Thomas Parrott <thomas.parr...@canonical.com> Date: Thu, 18 Jul 2019 11:32:10 +0100 Subject: [PATCH 1/3] cluster/hearbeat: Adds new Nodes field to heartbeat struct This new field is a map keyed by node address rather than node ID. This is to avoid issues with raft_nodes and nodes tables not having the same IDs for the same node. For backwards compatiblity with non-upgraded nodes, the old Members map (keyed by Node ID) is still sent. Due to the way Go unmarshals JSON, if old nodes that do not have the Nodes field in the heartbeat struct will just ignore it. Signed-off-by: Thomas Parrott <thomas.parr...@canonical.com> --- lxd/cluster/gateway.go | 11 +++++-- lxd/cluster/heartbeat.go | 67 +++++++++++++++++++++++++--------------- 2 files changed, 50 insertions(+), 28 deletions(-) diff --git a/lxd/cluster/gateway.go b/lxd/cluster/gateway.go index 78a9264a46..2d048c0112 100644 --- a/lxd/cluster/gateway.go +++ b/lxd/cluster/gateway.go @@ -178,10 +178,15 @@ func (g *Gateway) HandlerFuncs(nodeRefreshTask func(*APIHeartbeat)) map[string]h } nodes := make([]db.RaftNode, 0) - for _, node := range heartbeatData.Members { - if node.Raft { + for _, node := range heartbeatData.Nodes { + if node.RaftID > 0 || node.Raft { + // Prefer RaftID over ID (but deal with legacy heartbeats). + nodeID := node.RaftID + if nodeID == 0 { + nodeID = node.ID + } nodes = append(nodes, db.RaftNode{ - ID: node.ID, + ID: nodeID, Address: node.Address, }) } diff --git a/lxd/cluster/heartbeat.go b/lxd/cluster/heartbeat.go index 45bca1d908..3eda81eecd 100644 --- a/lxd/cluster/heartbeat.go +++ b/lxd/cluster/heartbeat.go @@ -19,12 +19,13 @@ import ( // APIHeartbeatMember contains specific cluster node info. type APIHeartbeatMember struct { - ID int64 - Address string - Raft bool - LastHeartbeat time.Time - Online bool // Calculated from offline threshold and LastHeatbeat time. - updated bool // Has node been updated during this heartbeat run. Not sent to nodes. + ID int64 // ID field value in nodes table. + Address string // Host and Port of node. + RaftID int64 // ID field value in raft_nodes table, zero if non-raft node. + Raft bool // Deprecated, use non-zero RaftID instead to indicate raft node. + LastHeartbeat time.Time // Last time we received a successful response from node. + Online bool // Calculated from offline threshold and LastHeatbeat time. + updated bool // Has node been updated during this heartbeat run. Not sent to nodes. } // APIHeartbeatVersion contains max versions for all nodes in cluster. @@ -35,8 +36,9 @@ type APIHeartbeatVersion struct { // APIHeartbeat contains data sent to nodes in heartbeat. type APIHeartbeat struct { - sync.Mutex // Used to control access to Members maps. - Members map[int64]APIHeartbeatMember + sync.Mutex // Used to control access to Members maps. + Members map[int64]APIHeartbeatMember // Deprecated, use Nodes instead. + Nodes map[string]APIHeartbeatMember Version APIHeartbeatVersion Time time.Time @@ -52,33 +54,32 @@ func (hbState *APIHeartbeat) Update(fullStateList bool, raftNodes []db.RaftNode, var maxSchemaVersion, maxAPIExtensionsVersion int hbState.Time = time.Now() - if hbState.Members == nil { - hbState.Members = make(map[int64]APIHeartbeatMember) + if hbState.Nodes == nil { + hbState.Nodes = make(map[string]APIHeartbeatMember) } // If we've been supplied a fresh set of node states, this is a full state list. hbState.FullStateList = fullStateList - // Add raft nodes first with the raft flag set to true, but missing LastHeartbeat time. + // Add raft nodes first with the raft ID and flag set to true, but missing LastHeartbeat time. for _, node := range raftNodes { - member, exists := hbState.Members[node.ID] + member, exists := hbState.Nodes[node.Address] if !exists { member = APIHeartbeatMember{ - ID: node.ID, Address: node.Address, } } + member.RaftID = node.ID member.Raft = true - hbState.Members[node.ID] = member + hbState.Nodes[node.Address] = member } // Add remaining nodes, and when if existing node is found, update status. for _, node := range allNodes { - member, exists := hbState.Members[node.ID] + member, exists := hbState.Nodes[node.Address] if !exists { member = APIHeartbeatMember{ - ID: node.ID, Address: node.Address, } } @@ -87,8 +88,9 @@ func (hbState *APIHeartbeat) Update(fullStateList bool, raftNodes []db.RaftNode, member.LastHeartbeat = node.Heartbeat } + member.ID = node.ID member.Online = !member.LastHeartbeat.Before(time.Now().Add(-offlineThreshold)) - hbState.Members[node.ID] = member + hbState.Nodes[node.Address] = member // Keep a record of highest APIExtensions and Schema version seen in all nodes. if node.APIExtensions > maxAPIExtensionsVersion { @@ -105,13 +107,28 @@ func (hbState *APIHeartbeat) Update(fullStateList bool, raftNodes []db.RaftNode, APIExtensions: maxAPIExtensionsVersion, } + // Convert the Nodes map to a legacy Members map for non-upgraded nodes. + // The legacy format is keyed by node ID, but this didn't work well as the IDs in the nodes + // and raft_nodes tables are not guaranteed to be the same. + hbState.Members = make(map[int64]APIHeartbeatMember) + + for _, node := range hbState.Nodes { + // Check if node is raft node, if so we send the Raft ID as Node ID for consistency + // with raft_nodes table on receiving side. + if node.RaftID > 0 { + node.ID = node.RaftID + } + + hbState.Members[node.ID] = node + } + return } // Send sends heartbeat requests to the nodes supplied and updates heartbeat state. func (hbState *APIHeartbeat) Send(ctx context.Context, cert *shared.CertInfo, localAddress string, nodes []db.NodeInfo, delay bool) { heartbeatsWg := sync.WaitGroup{} - sendHeartbeat := func(nodeID int64, address string, delay bool, heartbeatData *APIHeartbeat) { + sendHeartbeat := func(address string, delay bool, heartbeatData *APIHeartbeat) { defer heartbeatsWg.Done() if delay { @@ -125,7 +142,7 @@ func (hbState *APIHeartbeat) Send(ctx context.Context, cert *shared.CertInfo, lo if err == nil { hbState.Lock() // Ensure only update nodes that exist in Members already. - hbNode, existing := hbState.Members[nodeID] + hbNode, existing := hbState.Nodes[address] if !existing { return } @@ -133,7 +150,7 @@ func (hbState *APIHeartbeat) Send(ctx context.Context, cert *shared.CertInfo, lo hbNode.LastHeartbeat = time.Now() hbNode.Online = true hbNode.updated = true - hbState.Members[nodeID] = hbNode + hbState.Nodes[address] = hbNode hbState.Unlock() logger.Debugf("Successful heartbeat for %s", address) } else { @@ -145,18 +162,18 @@ func (hbState *APIHeartbeat) Send(ctx context.Context, cert *shared.CertInfo, lo // Special case for the local node - just record the time now. if node.Address == localAddress { hbState.Lock() - hbNode := hbState.Members[node.ID] + hbNode := hbState.Nodes[node.Address] hbNode.LastHeartbeat = time.Now() hbNode.Online = true hbNode.updated = true - hbState.Members[node.ID] = hbNode + hbState.Nodes[node.Address] = hbNode hbState.Unlock() continue } // Parallelize the rest. heartbeatsWg.Add(1) - go sendHeartbeat(node.ID, node.Address, delay, hbState) + go sendHeartbeat(node.Address, delay, hbState) } heartbeatsWg.Wait() } @@ -281,7 +298,7 @@ func (g *Gateway) heartbeat(ctx context.Context, initialHeartbeat bool) { for _, currentNode := range currentNodes { existing := false for _, node := range allNodes { - if node.Address == currentNode.Address && node.ID == currentNode.ID { + if node.Address == currentNode.Address { existing = true break } @@ -307,7 +324,7 @@ func (g *Gateway) heartbeat(ctx context.Context, initialHeartbeat bool) { } err = g.Cluster.Transaction(func(tx *db.ClusterTx) error { - for _, node := range hbState.Members { + for _, node := range hbState.Nodes { if !node.updated { continue } From c2084c8b3b6a04df2762929eed709f1a2e0b080c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgra...@ubuntu.com> Date: Thu, 18 Jul 2019 04:37:28 -0400 Subject: [PATCH 2/3] lxd/cluster/membership: Fix new DB server id MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This effectively reverts commit 1b52ef853533efb7e0837a1aae731f925043617b. Signed-off-by: Stéphane Graber <stgra...@ubuntu.com> --- lxd/cluster/membership.go | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go index 671cd4a886..8a1b186c1f 100644 --- a/lxd/cluster/membership.go +++ b/lxd/cluster/membership.go @@ -468,7 +468,6 @@ func Rebalance(state *state.State, gateway *Gateway) (string, []db.RaftNode, err // Check if we have a spare node that we can turn into a database one. address := "" - id := int64(-1) err = state.Cluster.Transaction(func(tx *db.ClusterTx) error { config, err := ConfigLoad(tx) if err != nil { @@ -489,7 +488,6 @@ func Rebalance(state *state.State, gateway *Gateway) (string, []db.RaftNode, err logger.Debugf( "Found spare node %s (%s) to be promoted as database node", node.Name, node.Address) address = node.Address - id = node.ID break } @@ -504,9 +502,20 @@ func Rebalance(state *state.State, gateway *Gateway) (string, []db.RaftNode, err return "", currentRaftNodes, nil } - // Update the local raft_table adding the new member and building a new - // list. - updatedRaftNodes := append(currentRaftNodes, db.RaftNode{ID: id, Address: address}) + // Figure out the next ID in the raft_nodes table + var updatedRaftNodes []db.RaftNode + err = gateway.db.Transaction(func(tx *db.NodeTx) error { + id, err := tx.RaftNodeAdd(address) + if err != nil { + return errors.Wrap(err, "Failed to add new raft node") + } + + updatedRaftNodes = append(currentRaftNodes, db.RaftNode{ID: id, Address: address}) + return nil + }) + if err != nil { + return "", nil, err + } return address, updatedRaftNodes, nil } From db8f8728c4209d2d440eb904faf84c684d2c94f0 Mon Sep 17 00:00:00 2001 From: Thomas Parrott <thomas.parr...@canonical.com> Date: Thu, 18 Jul 2019 11:48:13 +0100 Subject: [PATCH 3/3] daemon: Updates use of heartbeat data Members field to Nodes field Signed-off-by: Thomas Parrott <thomas.parr...@canonical.com> --- lxd/daemon.go | 12 ++++++------ lxd/networks.go | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/lxd/daemon.go b/lxd/daemon.go index 6f6e1ef9fd..e494ee39dc 100644 --- a/lxd/daemon.go +++ b/lxd/daemon.go @@ -1330,13 +1330,13 @@ func (d *Daemon) hasNodeListChanged(heartbeatData *cluster.APIHeartbeat) bool { } // Member count has changed. - if len(d.lastNodeList.Members) != len(heartbeatData.Members) { + if len(d.lastNodeList.Nodes) != len(heartbeatData.Nodes) { return true } - // Check for node address changes. - for lastMemberID, lastMember := range d.lastNodeList.Members { - if heartbeatData.Members[lastMemberID].Address != lastMember.Address { + // Check for node ID changes. + for lastMemberAddress, lastMember := range d.lastNodeList.Nodes { + if heartbeatData.Nodes[lastMemberAddress].ID != lastMember.ID { return true } } @@ -1358,12 +1358,12 @@ func (d *Daemon) NodeRefreshTask(heartbeatData *cluster.APIHeartbeat) { // Only refresh forkdns peers if the full state list has been generated. if heartbeatData.FullStateList { - for i, node := range heartbeatData.Members { + for i, node := range heartbeatData.Nodes { // Exclude nodes that the leader considers offline. // This is to avoid forkdns delaying results by querying an offline node. if !node.Online { logger.Warnf("Excluding offline node from refresh: %+v", node) - delete(heartbeatData.Members, i) + delete(heartbeatData.Nodes, i) } } diff --git a/lxd/networks.go b/lxd/networks.go index 9fa46146c5..7dc1de70b4 100644 --- a/lxd/networks.go +++ b/lxd/networks.go @@ -2147,7 +2147,7 @@ func (n *network) refreshForkdnsServerAddresses(heartbeatData *cluster.APIHeartb logger.Infof("Refreshing forkdns peers for %v", n.name) cert := n.state.Endpoints.NetworkCert() - for _, node := range heartbeatData.Members { + for _, node := range heartbeatData.Nodes { if node.Address == localAddress { // No need to query ourselves. continue
_______________________________________________ lxc-devel mailing list lxc-devel@lists.linuxcontainers.org http://lists.linuxcontainers.org/listinfo/lxc-devel