The following pull request was submitted through Github.
It can be accessed and reviewed at: https://github.com/lxc/lxd/pull/5986

This e-mail was sent by the LXC bot, direct replies will not reach the author
unless they happen to be subscribed to this list.

=== Description (from pull-request) ===
This PR adds an additional field to the `APIHeartbeatMember` struct called `Nodes`.

It is equivalent to the existing `Members` field, except that it is a map keyed on node address rather than node ID.

This is because sending a unified set of node information by combining the `nodes` and `raft_nodes` tables is problematic because the `ID` field in each table is not guaranteed to be the same for the each of the raft nodes.

For backwards compatibility with non-upgrade nodes, the old `Members` field is still sent, although it will potentially be missing some nodes in the case of an ID clash between the 2 tables.

Also includes fix from @stgraber in https://github.com/lxc/lxd/pull/5985
From 4767b74dfdeddf8aa43d61addcab23df66cb37c5 Mon Sep 17 00:00:00 2001
From: Thomas Parrott <thomas.parr...@canonical.com>
Date: Thu, 18 Jul 2019 11:32:10 +0100
Subject: [PATCH 1/3] cluster/hearbeat: Adds new Nodes field to heartbeat
 struct

This new field is a map keyed by node address rather than node ID.

This is to avoid issues with raft_nodes and nodes tables not having the same 
IDs for the same node.

For backwards compatiblity with non-upgraded nodes, the old Members map (keyed 
by Node ID) is still sent.

Due to the way Go unmarshals JSON, if old nodes that do not have the Nodes 
field in the heartbeat struct will just ignore it.

Signed-off-by: Thomas Parrott <thomas.parr...@canonical.com>
---
 lxd/cluster/gateway.go   | 11 +++++--
 lxd/cluster/heartbeat.go | 67 +++++++++++++++++++++++++---------------
 2 files changed, 50 insertions(+), 28 deletions(-)

diff --git a/lxd/cluster/gateway.go b/lxd/cluster/gateway.go
index 78a9264a46..2d048c0112 100644
--- a/lxd/cluster/gateway.go
+++ b/lxd/cluster/gateway.go
@@ -178,10 +178,15 @@ func (g *Gateway) HandlerFuncs(nodeRefreshTask 
func(*APIHeartbeat)) map[string]h
                        }
 
                        nodes := make([]db.RaftNode, 0)
-                       for _, node := range heartbeatData.Members {
-                               if node.Raft {
+                       for _, node := range heartbeatData.Nodes {
+                               if node.RaftID > 0 || node.Raft {
+                                       // Prefer RaftID over ID (but deal with 
legacy heartbeats).
+                                       nodeID := node.RaftID
+                                       if nodeID == 0 {
+                                               nodeID = node.ID
+                                       }
                                        nodes = append(nodes, db.RaftNode{
-                                               ID:      node.ID,
+                                               ID:      nodeID,
                                                Address: node.Address,
                                        })
                                }
diff --git a/lxd/cluster/heartbeat.go b/lxd/cluster/heartbeat.go
index 45bca1d908..3eda81eecd 100644
--- a/lxd/cluster/heartbeat.go
+++ b/lxd/cluster/heartbeat.go
@@ -19,12 +19,13 @@ import (
 
 // APIHeartbeatMember contains specific cluster node info.
 type APIHeartbeatMember struct {
-       ID            int64
-       Address       string
-       Raft          bool
-       LastHeartbeat time.Time
-       Online        bool // Calculated from offline threshold and 
LastHeatbeat time.
-       updated       bool // Has node been updated during this heartbeat run. 
Not sent to nodes.
+       ID            int64     // ID field value in nodes table.
+       Address       string    // Host and Port of node.
+       RaftID        int64     // ID field value in raft_nodes table, zero if 
non-raft node.
+       Raft          bool      // Deprecated, use non-zero RaftID instead to 
indicate raft node.
+       LastHeartbeat time.Time // Last time we received a successful response 
from node.
+       Online        bool      // Calculated from offline threshold and 
LastHeatbeat time.
+       updated       bool      // Has node been updated during this heartbeat 
run. Not sent to nodes.
 }
 
 // APIHeartbeatVersion contains max versions for all nodes in cluster.
@@ -35,8 +36,9 @@ type APIHeartbeatVersion struct {
 
 // APIHeartbeat contains data sent to nodes in heartbeat.
 type APIHeartbeat struct {
-       sync.Mutex // Used to control access to Members maps.
-       Members    map[int64]APIHeartbeatMember
+       sync.Mutex                              // Used to control access to 
Members maps.
+       Members    map[int64]APIHeartbeatMember // Deprecated, use Nodes 
instead.
+       Nodes      map[string]APIHeartbeatMember
        Version    APIHeartbeatVersion
        Time       time.Time
 
@@ -52,33 +54,32 @@ func (hbState *APIHeartbeat) Update(fullStateList bool, 
raftNodes []db.RaftNode,
        var maxSchemaVersion, maxAPIExtensionsVersion int
        hbState.Time = time.Now()
 
-       if hbState.Members == nil {
-               hbState.Members = make(map[int64]APIHeartbeatMember)
+       if hbState.Nodes == nil {
+               hbState.Nodes = make(map[string]APIHeartbeatMember)
        }
 
        // If we've been supplied a fresh set of node states, this is a full 
state list.
        hbState.FullStateList = fullStateList
 
-       // Add raft nodes first with the raft flag set to true, but missing 
LastHeartbeat time.
+       // Add raft nodes first with the raft ID and flag set to true, but 
missing LastHeartbeat time.
        for _, node := range raftNodes {
-               member, exists := hbState.Members[node.ID]
+               member, exists := hbState.Nodes[node.Address]
                if !exists {
                        member = APIHeartbeatMember{
-                               ID:      node.ID,
                                Address: node.Address,
                        }
                }
 
+               member.RaftID = node.ID
                member.Raft = true
-               hbState.Members[node.ID] = member
+               hbState.Nodes[node.Address] = member
        }
 
        // Add remaining nodes, and when if existing node is found, update 
status.
        for _, node := range allNodes {
-               member, exists := hbState.Members[node.ID]
+               member, exists := hbState.Nodes[node.Address]
                if !exists {
                        member = APIHeartbeatMember{
-                               ID:      node.ID,
                                Address: node.Address,
                        }
                }
@@ -87,8 +88,9 @@ func (hbState *APIHeartbeat) Update(fullStateList bool, 
raftNodes []db.RaftNode,
                        member.LastHeartbeat = node.Heartbeat
                }
 
+               member.ID = node.ID
                member.Online = 
!member.LastHeartbeat.Before(time.Now().Add(-offlineThreshold))
-               hbState.Members[node.ID] = member
+               hbState.Nodes[node.Address] = member
 
                // Keep a record of highest APIExtensions and Schema version 
seen in all nodes.
                if node.APIExtensions > maxAPIExtensionsVersion {
@@ -105,13 +107,28 @@ func (hbState *APIHeartbeat) Update(fullStateList bool, 
raftNodes []db.RaftNode,
                APIExtensions: maxAPIExtensionsVersion,
        }
 
+       // Convert the Nodes map to a legacy Members map for non-upgraded nodes.
+       // The legacy format is keyed by node ID, but this didn't work well as 
the IDs in the nodes
+       // and raft_nodes tables are not guaranteed to be the same.
+       hbState.Members = make(map[int64]APIHeartbeatMember)
+
+       for _, node := range hbState.Nodes {
+               // Check if node is raft node, if so we send the Raft ID as 
Node ID for consistency
+               // with raft_nodes table on receiving side.
+               if node.RaftID > 0 {
+                       node.ID = node.RaftID
+               }
+
+               hbState.Members[node.ID] = node
+       }
+
        return
 }
 
 // Send sends heartbeat requests to the nodes supplied and updates heartbeat 
state.
 func (hbState *APIHeartbeat) Send(ctx context.Context, cert *shared.CertInfo, 
localAddress string, nodes []db.NodeInfo, delay bool) {
        heartbeatsWg := sync.WaitGroup{}
-       sendHeartbeat := func(nodeID int64, address string, delay bool, 
heartbeatData *APIHeartbeat) {
+       sendHeartbeat := func(address string, delay bool, heartbeatData 
*APIHeartbeat) {
                defer heartbeatsWg.Done()
 
                if delay {
@@ -125,7 +142,7 @@ func (hbState *APIHeartbeat) Send(ctx context.Context, cert 
*shared.CertInfo, lo
                if err == nil {
                        hbState.Lock()
                        // Ensure only update nodes that exist in Members 
already.
-                       hbNode, existing := hbState.Members[nodeID]
+                       hbNode, existing := hbState.Nodes[address]
                        if !existing {
                                return
                        }
@@ -133,7 +150,7 @@ func (hbState *APIHeartbeat) Send(ctx context.Context, cert 
*shared.CertInfo, lo
                        hbNode.LastHeartbeat = time.Now()
                        hbNode.Online = true
                        hbNode.updated = true
-                       hbState.Members[nodeID] = hbNode
+                       hbState.Nodes[address] = hbNode
                        hbState.Unlock()
                        logger.Debugf("Successful heartbeat for %s", address)
                } else {
@@ -145,18 +162,18 @@ func (hbState *APIHeartbeat) Send(ctx context.Context, 
cert *shared.CertInfo, lo
                // Special case for the local node - just record the time now.
                if node.Address == localAddress {
                        hbState.Lock()
-                       hbNode := hbState.Members[node.ID]
+                       hbNode := hbState.Nodes[node.Address]
                        hbNode.LastHeartbeat = time.Now()
                        hbNode.Online = true
                        hbNode.updated = true
-                       hbState.Members[node.ID] = hbNode
+                       hbState.Nodes[node.Address] = hbNode
                        hbState.Unlock()
                        continue
                }
 
                // Parallelize the rest.
                heartbeatsWg.Add(1)
-               go sendHeartbeat(node.ID, node.Address, delay, hbState)
+               go sendHeartbeat(node.Address, delay, hbState)
        }
        heartbeatsWg.Wait()
 }
@@ -281,7 +298,7 @@ func (g *Gateway) heartbeat(ctx context.Context, 
initialHeartbeat bool) {
        for _, currentNode := range currentNodes {
                existing := false
                for _, node := range allNodes {
-                       if node.Address == currentNode.Address && node.ID == 
currentNode.ID {
+                       if node.Address == currentNode.Address {
                                existing = true
                                break
                        }
@@ -307,7 +324,7 @@ func (g *Gateway) heartbeat(ctx context.Context, 
initialHeartbeat bool) {
        }
 
        err = g.Cluster.Transaction(func(tx *db.ClusterTx) error {
-               for _, node := range hbState.Members {
+               for _, node := range hbState.Nodes {
                        if !node.updated {
                                continue
                        }

From c2084c8b3b6a04df2762929eed709f1a2e0b080c Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgra...@ubuntu.com>
Date: Thu, 18 Jul 2019 04:37:28 -0400
Subject: [PATCH 2/3] lxd/cluster/membership: Fix new DB server id
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

This effectively reverts commit 1b52ef853533efb7e0837a1aae731f925043617b.

Signed-off-by: Stéphane Graber <stgra...@ubuntu.com>
---
 lxd/cluster/membership.go | 19 ++++++++++++++-----
 1 file changed, 14 insertions(+), 5 deletions(-)

diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index 671cd4a886..8a1b186c1f 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -468,7 +468,6 @@ func Rebalance(state *state.State, gateway *Gateway) 
(string, []db.RaftNode, err
 
        // Check if we have a spare node that we can turn into a database one.
        address := ""
-       id := int64(-1)
        err = state.Cluster.Transaction(func(tx *db.ClusterTx) error {
                config, err := ConfigLoad(tx)
                if err != nil {
@@ -489,7 +488,6 @@ func Rebalance(state *state.State, gateway *Gateway) 
(string, []db.RaftNode, err
                        logger.Debugf(
                                "Found spare node %s (%s) to be promoted as 
database node", node.Name, node.Address)
                        address = node.Address
-                       id = node.ID
                        break
                }
 
@@ -504,9 +502,20 @@ func Rebalance(state *state.State, gateway *Gateway) 
(string, []db.RaftNode, err
                return "", currentRaftNodes, nil
        }
 
-       // Update the local raft_table adding the new member and building a new
-       // list.
-       updatedRaftNodes := append(currentRaftNodes, db.RaftNode{ID: id, 
Address: address})
+       // Figure out the next ID in the raft_nodes table
+       var updatedRaftNodes []db.RaftNode
+       err = gateway.db.Transaction(func(tx *db.NodeTx) error {
+               id, err := tx.RaftNodeAdd(address)
+               if err != nil {
+                       return errors.Wrap(err, "Failed to add new raft node")
+               }
+
+               updatedRaftNodes = append(currentRaftNodes, db.RaftNode{ID: id, 
Address: address})
+               return nil
+       })
+       if err != nil {
+               return "", nil, err
+       }
 
        return address, updatedRaftNodes, nil
 }

From db8f8728c4209d2d440eb904faf84c684d2c94f0 Mon Sep 17 00:00:00 2001
From: Thomas Parrott <thomas.parr...@canonical.com>
Date: Thu, 18 Jul 2019 11:48:13 +0100
Subject: [PATCH 3/3] daemon: Updates use of heartbeat data Members field to
 Nodes field

Signed-off-by: Thomas Parrott <thomas.parr...@canonical.com>
---
 lxd/daemon.go   | 12 ++++++------
 lxd/networks.go |  2 +-
 2 files changed, 7 insertions(+), 7 deletions(-)

diff --git a/lxd/daemon.go b/lxd/daemon.go
index 6f6e1ef9fd..e494ee39dc 100644
--- a/lxd/daemon.go
+++ b/lxd/daemon.go
@@ -1330,13 +1330,13 @@ func (d *Daemon) hasNodeListChanged(heartbeatData 
*cluster.APIHeartbeat) bool {
        }
 
        // Member count has changed.
-       if len(d.lastNodeList.Members) != len(heartbeatData.Members) {
+       if len(d.lastNodeList.Nodes) != len(heartbeatData.Nodes) {
                return true
        }
 
-       // Check for node address changes.
-       for lastMemberID, lastMember := range d.lastNodeList.Members {
-               if heartbeatData.Members[lastMemberID].Address != 
lastMember.Address {
+       // Check for node ID changes.
+       for lastMemberAddress, lastMember := range d.lastNodeList.Nodes {
+               if heartbeatData.Nodes[lastMemberAddress].ID != lastMember.ID {
                        return true
                }
        }
@@ -1358,12 +1358,12 @@ func (d *Daemon) NodeRefreshTask(heartbeatData 
*cluster.APIHeartbeat) {
 
        // Only refresh forkdns peers if the full state list has been generated.
        if heartbeatData.FullStateList {
-               for i, node := range heartbeatData.Members {
+               for i, node := range heartbeatData.Nodes {
                        // Exclude nodes that the leader considers offline.
                        // This is to avoid forkdns delaying results by 
querying an offline node.
                        if !node.Online {
                                logger.Warnf("Excluding offline node from 
refresh: %+v", node)
-                               delete(heartbeatData.Members, i)
+                               delete(heartbeatData.Nodes, i)
                        }
                }
 
diff --git a/lxd/networks.go b/lxd/networks.go
index 9fa46146c5..7dc1de70b4 100644
--- a/lxd/networks.go
+++ b/lxd/networks.go
@@ -2147,7 +2147,7 @@ func (n *network) 
refreshForkdnsServerAddresses(heartbeatData *cluster.APIHeartb
        logger.Infof("Refreshing forkdns peers for %v", n.name)
 
        cert := n.state.Endpoints.NetworkCert()
-       for _, node := range heartbeatData.Members {
+       for _, node := range heartbeatData.Nodes {
                if node.Address == localAddress {
                        // No need to query ourselves.
                        continue
_______________________________________________
lxc-devel mailing list
lxc-devel@lists.linuxcontainers.org
http://lists.linuxcontainers.org/listinfo/lxc-devel

Reply via email to