The following pull request was submitted through Github.
It can be accessed and reviewed at: https://github.com/lxc/lxd/pull/6539

This e-mail was sent by the LXC bot, direct replies will not reach the author
unless they happen to be subscribed to this list.

=== Description (from pull-request) ===
With this change, connections to other cluster nodes for non-notify operations will now be held until a working event connection is established with the target.

This should avoid race conditions where we forward a request to another node but then fail to forward operation updates due to not being connected to that node's events stream yet.
From 769f4fdfcc484b73b39343505bb030101e7c63a4 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgra...@ubuntu.com>
Date: Sun, 1 Dec 2019 23:27:16 -0500
Subject: [PATCH 1/6] lxd/cluster: More reliable event delivery
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgra...@ubuntu.com>
---
 lxd/cluster/connect.go | 22 ++++++++++++++++++++++
 lxd/cluster/events.go  | 39 +++++++++++++++++++++++++--------------
 2 files changed, 47 insertions(+), 14 deletions(-)

diff --git a/lxd/cluster/connect.go b/lxd/cluster/connect.go
index 43ec165eb6..bd9c3881cb 100644
--- a/lxd/cluster/connect.go
+++ b/lxd/cluster/connect.go
@@ -4,6 +4,7 @@ import (
        "encoding/base64"
        "encoding/pem"
        "fmt"
+       "time"
 
        lxd "github.com/lxc/lxd/client"
        "github.com/lxc/lxd/lxd/db"
@@ -20,6 +21,27 @@ import (
 // value 'lxd-cluster-notifier', which can be used in some cases to distinguish
 // between a regular client request and an internal cluster request.
 func Connect(address string, cert *shared.CertInfo, notify bool) 
(lxd.InstanceServer, error) {
+       // Wait for a connection to the events API first for non-notify 
connections.
+       if !notify {
+               connected := false
+               for i := 0; i < 20; i++ {
+                       listenersLock.Lock()
+                       _, ok := listeners[address]
+                       listenersLock.Unlock()
+
+                       if ok {
+                               connected = true
+                               break
+                       }
+
+                       time.Sleep(500 * time.Millisecond)
+               }
+
+               if !connected {
+                       return nil, fmt.Errorf("Missing event connection with 
target cluster member")
+               }
+       }
+
        args := &lxd.ConnectionArgs{
                TLSServerCert: string(cert.PublicKey()),
                TLSClientCert: string(cert.PublicKey()),
diff --git a/lxd/cluster/events.go b/lxd/cluster/events.go
index bbcef4c0a7..8e77a0d46a 100644
--- a/lxd/cluster/events.go
+++ b/lxd/cluster/events.go
@@ -2,6 +2,7 @@ package cluster
 
 import (
        "context"
+       "sync"
        "time"
 
        lxd "github.com/lxc/lxd/client"
@@ -13,28 +14,28 @@ import (
        "github.com/lxc/lxd/shared/logger"
 )
 
+var listeners = map[string]*lxd.EventListener{}
+var listenersLock sync.Mutex
+
 // Events starts a task that continuously monitors the list of cluster nodes 
and
 // maintains a pool of websocket connections against all of them, in order to
 // get notified about events.
 //
 // Whenever an event is received the given callback is invoked.
 func Events(endpoints *endpoints.Endpoints, cluster *db.Cluster, f func(int64, 
api.Event)) (task.Func, task.Schedule) {
-       listeners := map[int64]*lxd.EventListener{}
-
        // Update our pool of event listeners. Since database queries are
        // blocking, we spawn the actual logic in a goroutine, to abort
        // immediately when we receive the stop signal.
        update := func(ctx context.Context) {
                ch := make(chan struct{})
                go func() {
-                       eventsUpdateListeners(endpoints, cluster, listeners, f)
+                       eventsUpdateListeners(endpoints, cluster, f)
                        ch <- struct{}{}
                }()
                select {
                case <-ch:
                case <-ctx.Done():
                }
-
        }
 
        schedule := task.Every(time.Second)
@@ -42,7 +43,7 @@ func Events(endpoints *endpoints.Endpoints, cluster 
*db.Cluster, f func(int64, a
        return update, schedule
 }
 
-func eventsUpdateListeners(endpoints *endpoints.Endpoints, cluster 
*db.Cluster, listeners map[int64]*lxd.EventListener, f func(int64, api.Event)) {
+func eventsUpdateListeners(endpoints *endpoints.Endpoints, cluster 
*db.Cluster, f func(int64, api.Event)) {
        // Get the current cluster nodes.
        var nodes []db.NodeInfo
        var offlineThreshold time.Duration
@@ -72,27 +73,31 @@ func eventsUpdateListeners(endpoints *endpoints.Endpoints, 
cluster *db.Cluster,
 
        address := endpoints.NetworkAddress()
 
-       ids := make([]int, len(nodes))
+       addresses := make([]string, len(nodes))
        for i, node := range nodes {
-               ids[i] = int(node.ID)
+               addresses[i] = node.Address
 
                // Don't bother trying to connect to offline nodes, or to 
ourselves.
                if node.IsOffline(offlineThreshold) || node.Address == address {
                        continue
                }
 
-               _, ok := listeners[node.ID]
+               listenersLock.Lock()
+               listener, ok := listeners[node.Address]
 
                // The node has already a listener associated to it.
                if ok {
                        // Double check that the listener is still
                        // connected. If it is, just move on, other
                        // we'll try to connect again.
-                       if listeners[node.ID].IsActive() {
+                       if listeners[node.Address].IsActive() {
+                               listenersLock.Unlock()
                                continue
                        }
-                       delete(listeners, node.ID)
+
+                       delete(listeners, node.Address)
                }
+               listenersLock.Unlock()
 
                listener, err := eventsConnect(node.Address, 
endpoints.NetworkCert())
                if err != nil {
@@ -101,14 +106,20 @@ func eventsUpdateListeners(endpoints 
*endpoints.Endpoints, cluster *db.Cluster,
                }
                logger.Debugf("Listening for events on node %s", node.Address)
                listener.AddHandler(nil, func(event api.Event) { f(node.ID, 
event) })
-               listeners[node.ID] = listener
+
+               listenersLock.Lock()
+               listeners[node.Address] = listener
+               listenersLock.Unlock()
        }
-       for id, listener := range listeners {
-               if !shared.IntInSlice(int(id), ids) {
+
+       listenersLock.Lock()
+       for address, listener := range listeners {
+               if !shared.StringInSlice(address, addresses) {
                        listener.Disconnect()
-                       delete(listeners, id)
+                       delete(listeners, address)
                }
        }
+       listenersLock.Unlock()
 }
 
 // Establish a client connection to get events from the given node.

From d09c28ff48c71d51323741e3771bcbc902625dfc Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgra...@ubuntu.com>
Date: Mon, 2 Dec 2019 23:54:02 -0500
Subject: [PATCH 2/6] lxd/response: Coding style
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgra...@ubuntu.com>
---
 lxd/response/response.go | 8 ++++++++
 1 file changed, 8 insertions(+)

diff --git a/lxd/response/response.go b/lxd/response/response.go
index 28572de771..128b1015d0 100644
--- a/lxd/response/response.go
+++ b/lxd/response/response.go
@@ -139,6 +139,7 @@ func Conflict(err error) Response {
        if err != nil {
                message = err.Error()
        }
+
        return &errorResponse{http.StatusConflict, message}
 }
 
@@ -148,6 +149,7 @@ func Forbidden(err error) Response {
        if err != nil {
                message = err.Error()
        }
+
        return &errorResponse{http.StatusForbidden, message}
 }
 
@@ -162,6 +164,7 @@ func NotFound(err error) Response {
        if err != nil {
                message = err.Error()
        }
+
        return &errorResponse{http.StatusNotFound, message}
 }
 
@@ -171,6 +174,7 @@ func NotImplemented(err error) Response {
        if err != nil {
                message = err.Error()
        }
+
        return &errorResponse{http.StatusNotImplemented, message}
 }
 
@@ -186,6 +190,7 @@ func Unavailable(err error) Response {
        if err != nil {
                message = err.Error()
        }
+
        return &errorResponse{http.StatusServiceUnavailable, message}
 }
 
@@ -308,6 +313,7 @@ func (r *fileResponse) Render(w http.ResponseWriter) error {
                                return err
                        }
                        defer fd.Close()
+
                        rd = fd
                } else {
                        rd = bytes.NewReader(entry.Buffer)
@@ -361,6 +367,7 @@ func (r *forwardedResponse) Render(w http.ResponseWriter) 
error {
        if err != nil {
                return err
        }
+
        for key := range r.request.Header {
                forwarded.Header.Set(key, r.request.Header.Get(key))
        }
@@ -369,6 +376,7 @@ func (r *forwardedResponse) Render(w http.ResponseWriter) 
error {
        if err != nil {
                return err
        }
+
        response, err := httpClient.Do(forwarded)
        if err != nil {
                return err

From aaa0c0f86c217f2e98358295e313dffd674b4b32 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgra...@ubuntu.com>
Date: Mon, 2 Dec 2019 23:54:10 -0500
Subject: [PATCH 3/6] lxd/operations: Use ForwardedResponse
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgra...@ubuntu.com>
---
 lxd/operations.go | 23 +++--------------------
 1 file changed, 3 insertions(+), 20 deletions(-)

diff --git a/lxd/operations.go b/lxd/operations.go
index 780aa18fdc..4bc0ba22b3 100644
--- a/lxd/operations.go
+++ b/lxd/operations.go
@@ -16,7 +16,6 @@ import (
        "github.com/lxc/lxd/lxd/util"
        "github.com/lxc/lxd/shared"
        "github.com/lxc/lxd/shared/api"
-       "github.com/lxc/lxd/shared/logger"
 )
 
 var operationCmd = APIEndpoint{
@@ -82,12 +81,7 @@ func operationGet(d *Daemon, r *http.Request) 
response.Response {
                return response.SmartError(err)
        }
 
-       body, _, err = client.GetOperation(id)
-       if err != nil {
-               return response.SmartError(err)
-       }
-
-       return response.SyncResponse(true, body)
+       return response.ForwardedResponse(client, r)
 }
 
 func operationDelete(d *Daemon, r *http.Request) response.Response {
@@ -136,12 +130,7 @@ func operationDelete(d *Daemon, r *http.Request) 
response.Response {
                return response.SmartError(err)
        }
 
-       err = client.DeleteOperation(id)
-       if err != nil {
-               return response.SmartError(err)
-       }
-
-       return response.EmptySyncResponse
+       return response.ForwardedResponse(client, r)
 }
 
 func operationsGet(d *Daemon, r *http.Request) response.Response {
@@ -361,12 +350,7 @@ func operationWaitGet(d *Daemon, r *http.Request) 
response.Response {
                return response.SmartError(err)
        }
 
-       apiOp, _, err := client.GetOperationWait(id, timeout)
-       if err != nil {
-               return response.SmartError(err)
-       }
-
-       return response.SyncResponse(true, apiOp)
+       return response.ForwardedResponse(client, r)
 }
 
 type operationWebSocket struct {
@@ -447,7 +431,6 @@ func operationWebsocketGet(d *Daemon, r *http.Request) 
response.Response {
                return response.SmartError(err)
        }
 
-       logger.Debugf("Forward operation websocket from node %s", address)
        source, err := client.GetOperationWebsocket(id, secret)
        if err != nil {
                return response.SmartError(err)

From e4bb1480e5c04a49de1c77d97b294e23fee68d76 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgra...@ubuntu.com>
Date: Mon, 2 Dec 2019 23:54:34 -0500
Subject: [PATCH 4/6] lxd/images: Coding style
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgra...@ubuntu.com>
---
 lxd/images.go | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/lxd/images.go b/lxd/images.go
index a4d934b0c9..95dec7e775 100644
--- a/lxd/images.go
+++ b/lxd/images.go
@@ -1925,6 +1925,7 @@ func imageExport(d *Daemon, r *http.Request) 
response.Response {
                if err != nil {
                        return response.SmartError(err)
                }
+
                return response.ForwardedResponse(client, r)
        }
 
@@ -2217,12 +2218,14 @@ func imageSyncBetweenNodes(d *Daemon, project string, 
fingerprint string) error
        if err != nil {
                return errors.Wrap(err, "Failed to fetch the leader node 
address")
        }
+
        var targetNodeAddress string
        if shared.StringInSlice(leader, addresses) {
                targetNodeAddress = leader
        } else {
                targetNodeAddress = addresses[0]
        }
+
        client, err := cluster.Connect(targetNodeAddress, 
d.endpoints.NetworkCert(), true)
        if err != nil {
                return errors.Wrap(err, "Failed to connect node for image 
synchronization")

From 194a3e44fe7ce91a94849ef14e241bc191e330d7 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgra...@ubuntu.com>
Date: Mon, 2 Dec 2019 23:55:26 -0500
Subject: [PATCH 5/6] lxd/cluster: Coding style
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgra...@ubuntu.com>
---
 lxd/api_cluster.go | 14 +++++++++++---
 1 file changed, 11 insertions(+), 3 deletions(-)

diff --git a/lxd/api_cluster.go b/lxd/api_cluster.go
index 7ea90a760e..3f02fdd0f4 100644
--- a/lxd/api_cluster.go
+++ b/lxd/api_cluster.go
@@ -612,6 +612,7 @@ func clusterPutJoin(d *Daemon, req api.ClusterPut) 
response.Response {
                if err != nil {
                        return err
                }
+
                err = clusterRebalance(client)
                if err != nil {
                        return err
@@ -922,10 +923,12 @@ func clusterNodeDelete(d *Daemon, r *http.Request) 
response.Response {
                if err != nil {
                        return response.SmartError(err)
                }
+
                networks, err := d.cluster.Networks()
                if err != nil {
                        return response.SmartError(err)
                }
+
                for _, name := range networks {
                        err := client.DeleteNetwork(name)
                        if err != nil {
@@ -938,6 +941,7 @@ func clusterNodeDelete(d *Daemon, r *http.Request) 
response.Response {
                if err != nil && err != db.ErrNoSuchObject {
                        return response.SmartError(err)
                }
+
                for _, name := range pools {
                        err := client.DeleteStoragePool(name)
                        if err != nil {
@@ -965,6 +969,7 @@ func clusterNodeDelete(d *Daemon, r *http.Request) 
response.Response {
                if err != nil {
                        return response.SmartError(err)
                }
+
                put := api.ClusterPut{}
                put.Enabled = false
                _, err = client.UpdateCluster(put, "")
@@ -982,17 +987,19 @@ func tryClusterRebalance(d *Daemon) error {
        leader, err := d.gateway.LeaderAddress()
        if err != nil {
                // This is not a fatal error, so let's just log it.
-               return errors.Wrap(err, "failed to get current leader member")
+               return errors.Wrap(err, "Failed to get current leader member")
        }
        cert := d.endpoints.NetworkCert()
        client, err := cluster.Connect(leader, cert, true)
        if err != nil {
-               return errors.Wrap(err, "failed to connect to leader member")
+               return errors.Wrap(err, "Failed to connect to leader member")
        }
+
        _, _, err = client.RawQuery("POST", "/internal/cluster/rebalance", nil, 
"")
        if err != nil {
-               return errors.Wrap(err, "request to rebalance cluster failed")
+               return errors.Wrap(err, "Request to rebalance cluster failed")
        }
+
        return nil
 }
 
@@ -1159,6 +1166,7 @@ func internalClusterPostRebalance(d *Daemon, r 
*http.Request) response.Response
        if err != nil {
                return response.SmartError(err)
        }
+
        _, _, err = client.RawQuery("POST", "/internal/cluster/promote", post, 
"")
        if err != nil {
                return response.SmartError(err)

From c65b847a0a2b541cd6ffd6af843540db795552e7 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgra...@ubuntu.com>
Date: Mon, 2 Dec 2019 23:56:49 -0500
Subject: [PATCH 6/6] lxd: Tweak cluster.Connect calls
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgra...@ubuntu.com>
---
 lxd/api_cluster.go    | 6 +++---
 lxd/container_post.go | 2 +-
 lxd/networks.go       | 2 +-
 3 files changed, 5 insertions(+), 5 deletions(-)

diff --git a/lxd/api_cluster.go b/lxd/api_cluster.go
index 3f02fdd0f4..209fb08ed4 100644
--- a/lxd/api_cluster.go
+++ b/lxd/api_cluster.go
@@ -608,7 +608,7 @@ func clusterPutJoin(d *Daemon, req api.ClusterPut) 
response.Response {
                // Add the cluster flag from the agent
                version.UserAgentFeatures([]string{"cluster"})
 
-               client, err = cluster.Connect(req.ClusterAddress, 
d.endpoints.NetworkCert(), false)
+               client, err = cluster.Connect(req.ClusterAddress, 
d.endpoints.NetworkCert(), true)
                if err != nil {
                        return err
                }
@@ -965,7 +965,7 @@ func clusterNodeDelete(d *Daemon, r *http.Request) 
response.Response {
        if force != 1 {
                // Try to gracefully reset the database on the node.
                cert := d.endpoints.NetworkCert()
-               client, err := cluster.Connect(address, cert, false)
+               client, err := cluster.Connect(address, cert, true)
                if err != nil {
                        return response.SmartError(err)
                }
@@ -1162,7 +1162,7 @@ func internalClusterPostRebalance(d *Daemon, r 
*http.Request) response.Response
        }
 
        cert := d.endpoints.NetworkCert()
-       client, err := cluster.Connect(address, cert, false)
+       client, err := cluster.Connect(address, cert, true)
        if err != nil {
                return response.SmartError(err)
        }
diff --git a/lxd/container_post.go b/lxd/container_post.go
index b8b6ba8592..0bcff60ed2 100644
--- a/lxd/container_post.go
+++ b/lxd/container_post.go
@@ -297,7 +297,7 @@ func containerPostClusteringMigrate(d *Daemon, c 
instance.Instance, oldName, new
 
        run := func(*operations.Operation) error {
                // Connect to the source host, i.e. ourselves (the node the 
container is running on).
-               source, err := cluster.Connect(sourceAddress, cert, false)
+               source, err := cluster.Connect(sourceAddress, cert, true)
                if err != nil {
                        return errors.Wrap(err, "Failed to connect to source 
server")
                }
diff --git a/lxd/networks.go b/lxd/networks.go
index a7b0336209..057e3dc53d 100644
--- a/lxd/networks.go
+++ b/lxd/networks.go
@@ -2316,7 +2316,7 @@ func (n *network) 
refreshForkdnsServerAddresses(heartbeatData *cluster.APIHeartb
                        continue
                }
 
-               client, err := cluster.Connect(node.Address, cert, false)
+               client, err := cluster.Connect(node.Address, cert, true)
                if err != nil {
                        return err
                }
_______________________________________________
lxc-devel mailing list
lxc-devel@lists.linuxcontainers.org
http://lists.linuxcontainers.org/listinfo/lxc-devel

Reply via email to