The following pull request was submitted through Github. It can be accessed and reviewed at: https://github.com/lxc/lxd/pull/6539
This e-mail was sent by the LXC bot, direct replies will not reach the author unless they happen to be subscribed to this list. === Description (from pull-request) === With this change, connections to other cluster nodes for non-notify operations will now be held until a working event connection is established with the target. This should avoid race conditions where we forward a request to another node but then fail to forward operation updates due to not being connected to that node's events stream yet.
From 769f4fdfcc484b73b39343505bb030101e7c63a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgra...@ubuntu.com> Date: Sun, 1 Dec 2019 23:27:16 -0500 Subject: [PATCH 1/6] lxd/cluster: More reliable event delivery MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Stéphane Graber <stgra...@ubuntu.com> --- lxd/cluster/connect.go | 22 ++++++++++++++++++++++ lxd/cluster/events.go | 39 +++++++++++++++++++++++++-------------- 2 files changed, 47 insertions(+), 14 deletions(-) diff --git a/lxd/cluster/connect.go b/lxd/cluster/connect.go index 43ec165eb6..bd9c3881cb 100644 --- a/lxd/cluster/connect.go +++ b/lxd/cluster/connect.go @@ -4,6 +4,7 @@ import ( "encoding/base64" "encoding/pem" "fmt" + "time" lxd "github.com/lxc/lxd/client" "github.com/lxc/lxd/lxd/db" @@ -20,6 +21,27 @@ import ( // value 'lxd-cluster-notifier', which can be used in some cases to distinguish // between a regular client request and an internal cluster request. func Connect(address string, cert *shared.CertInfo, notify bool) (lxd.InstanceServer, error) { + // Wait for a connection to the events API first for non-notify connections. + if !notify { + connected := false + for i := 0; i < 20; i++ { + listenersLock.Lock() + _, ok := listeners[address] + listenersLock.Unlock() + + if ok { + connected = true + break + } + + time.Sleep(500 * time.Millisecond) + } + + if !connected { + return nil, fmt.Errorf("Missing event connection with target cluster member") + } + } + args := &lxd.ConnectionArgs{ TLSServerCert: string(cert.PublicKey()), TLSClientCert: string(cert.PublicKey()), diff --git a/lxd/cluster/events.go b/lxd/cluster/events.go index bbcef4c0a7..8e77a0d46a 100644 --- a/lxd/cluster/events.go +++ b/lxd/cluster/events.go @@ -2,6 +2,7 @@ package cluster import ( "context" + "sync" "time" lxd "github.com/lxc/lxd/client" @@ -13,28 +14,28 @@ import ( "github.com/lxc/lxd/shared/logger" ) +var listeners = map[string]*lxd.EventListener{} +var listenersLock sync.Mutex + // Events starts a task that continuously monitors the list of cluster nodes and // maintains a pool of websocket connections against all of them, in order to // get notified about events. // // Whenever an event is received the given callback is invoked. func Events(endpoints *endpoints.Endpoints, cluster *db.Cluster, f func(int64, api.Event)) (task.Func, task.Schedule) { - listeners := map[int64]*lxd.EventListener{} - // Update our pool of event listeners. Since database queries are // blocking, we spawn the actual logic in a goroutine, to abort // immediately when we receive the stop signal. update := func(ctx context.Context) { ch := make(chan struct{}) go func() { - eventsUpdateListeners(endpoints, cluster, listeners, f) + eventsUpdateListeners(endpoints, cluster, f) ch <- struct{}{} }() select { case <-ch: case <-ctx.Done(): } - } schedule := task.Every(time.Second) @@ -42,7 +43,7 @@ func Events(endpoints *endpoints.Endpoints, cluster *db.Cluster, f func(int64, a return update, schedule } -func eventsUpdateListeners(endpoints *endpoints.Endpoints, cluster *db.Cluster, listeners map[int64]*lxd.EventListener, f func(int64, api.Event)) { +func eventsUpdateListeners(endpoints *endpoints.Endpoints, cluster *db.Cluster, f func(int64, api.Event)) { // Get the current cluster nodes. var nodes []db.NodeInfo var offlineThreshold time.Duration @@ -72,27 +73,31 @@ func eventsUpdateListeners(endpoints *endpoints.Endpoints, cluster *db.Cluster, address := endpoints.NetworkAddress() - ids := make([]int, len(nodes)) + addresses := make([]string, len(nodes)) for i, node := range nodes { - ids[i] = int(node.ID) + addresses[i] = node.Address // Don't bother trying to connect to offline nodes, or to ourselves. if node.IsOffline(offlineThreshold) || node.Address == address { continue } - _, ok := listeners[node.ID] + listenersLock.Lock() + listener, ok := listeners[node.Address] // The node has already a listener associated to it. if ok { // Double check that the listener is still // connected. If it is, just move on, other // we'll try to connect again. - if listeners[node.ID].IsActive() { + if listeners[node.Address].IsActive() { + listenersLock.Unlock() continue } - delete(listeners, node.ID) + + delete(listeners, node.Address) } + listenersLock.Unlock() listener, err := eventsConnect(node.Address, endpoints.NetworkCert()) if err != nil { @@ -101,14 +106,20 @@ func eventsUpdateListeners(endpoints *endpoints.Endpoints, cluster *db.Cluster, } logger.Debugf("Listening for events on node %s", node.Address) listener.AddHandler(nil, func(event api.Event) { f(node.ID, event) }) - listeners[node.ID] = listener + + listenersLock.Lock() + listeners[node.Address] = listener + listenersLock.Unlock() } - for id, listener := range listeners { - if !shared.IntInSlice(int(id), ids) { + + listenersLock.Lock() + for address, listener := range listeners { + if !shared.StringInSlice(address, addresses) { listener.Disconnect() - delete(listeners, id) + delete(listeners, address) } } + listenersLock.Unlock() } // Establish a client connection to get events from the given node. From d09c28ff48c71d51323741e3771bcbc902625dfc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgra...@ubuntu.com> Date: Mon, 2 Dec 2019 23:54:02 -0500 Subject: [PATCH 2/6] lxd/response: Coding style MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Stéphane Graber <stgra...@ubuntu.com> --- lxd/response/response.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/lxd/response/response.go b/lxd/response/response.go index 28572de771..128b1015d0 100644 --- a/lxd/response/response.go +++ b/lxd/response/response.go @@ -139,6 +139,7 @@ func Conflict(err error) Response { if err != nil { message = err.Error() } + return &errorResponse{http.StatusConflict, message} } @@ -148,6 +149,7 @@ func Forbidden(err error) Response { if err != nil { message = err.Error() } + return &errorResponse{http.StatusForbidden, message} } @@ -162,6 +164,7 @@ func NotFound(err error) Response { if err != nil { message = err.Error() } + return &errorResponse{http.StatusNotFound, message} } @@ -171,6 +174,7 @@ func NotImplemented(err error) Response { if err != nil { message = err.Error() } + return &errorResponse{http.StatusNotImplemented, message} } @@ -186,6 +190,7 @@ func Unavailable(err error) Response { if err != nil { message = err.Error() } + return &errorResponse{http.StatusServiceUnavailable, message} } @@ -308,6 +313,7 @@ func (r *fileResponse) Render(w http.ResponseWriter) error { return err } defer fd.Close() + rd = fd } else { rd = bytes.NewReader(entry.Buffer) @@ -361,6 +367,7 @@ func (r *forwardedResponse) Render(w http.ResponseWriter) error { if err != nil { return err } + for key := range r.request.Header { forwarded.Header.Set(key, r.request.Header.Get(key)) } @@ -369,6 +376,7 @@ func (r *forwardedResponse) Render(w http.ResponseWriter) error { if err != nil { return err } + response, err := httpClient.Do(forwarded) if err != nil { return err From aaa0c0f86c217f2e98358295e313dffd674b4b32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgra...@ubuntu.com> Date: Mon, 2 Dec 2019 23:54:10 -0500 Subject: [PATCH 3/6] lxd/operations: Use ForwardedResponse MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Stéphane Graber <stgra...@ubuntu.com> --- lxd/operations.go | 23 +++-------------------- 1 file changed, 3 insertions(+), 20 deletions(-) diff --git a/lxd/operations.go b/lxd/operations.go index 780aa18fdc..4bc0ba22b3 100644 --- a/lxd/operations.go +++ b/lxd/operations.go @@ -16,7 +16,6 @@ import ( "github.com/lxc/lxd/lxd/util" "github.com/lxc/lxd/shared" "github.com/lxc/lxd/shared/api" - "github.com/lxc/lxd/shared/logger" ) var operationCmd = APIEndpoint{ @@ -82,12 +81,7 @@ func operationGet(d *Daemon, r *http.Request) response.Response { return response.SmartError(err) } - body, _, err = client.GetOperation(id) - if err != nil { - return response.SmartError(err) - } - - return response.SyncResponse(true, body) + return response.ForwardedResponse(client, r) } func operationDelete(d *Daemon, r *http.Request) response.Response { @@ -136,12 +130,7 @@ func operationDelete(d *Daemon, r *http.Request) response.Response { return response.SmartError(err) } - err = client.DeleteOperation(id) - if err != nil { - return response.SmartError(err) - } - - return response.EmptySyncResponse + return response.ForwardedResponse(client, r) } func operationsGet(d *Daemon, r *http.Request) response.Response { @@ -361,12 +350,7 @@ func operationWaitGet(d *Daemon, r *http.Request) response.Response { return response.SmartError(err) } - apiOp, _, err := client.GetOperationWait(id, timeout) - if err != nil { - return response.SmartError(err) - } - - return response.SyncResponse(true, apiOp) + return response.ForwardedResponse(client, r) } type operationWebSocket struct { @@ -447,7 +431,6 @@ func operationWebsocketGet(d *Daemon, r *http.Request) response.Response { return response.SmartError(err) } - logger.Debugf("Forward operation websocket from node %s", address) source, err := client.GetOperationWebsocket(id, secret) if err != nil { return response.SmartError(err) From e4bb1480e5c04a49de1c77d97b294e23fee68d76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgra...@ubuntu.com> Date: Mon, 2 Dec 2019 23:54:34 -0500 Subject: [PATCH 4/6] lxd/images: Coding style MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Stéphane Graber <stgra...@ubuntu.com> --- lxd/images.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lxd/images.go b/lxd/images.go index a4d934b0c9..95dec7e775 100644 --- a/lxd/images.go +++ b/lxd/images.go @@ -1925,6 +1925,7 @@ func imageExport(d *Daemon, r *http.Request) response.Response { if err != nil { return response.SmartError(err) } + return response.ForwardedResponse(client, r) } @@ -2217,12 +2218,14 @@ func imageSyncBetweenNodes(d *Daemon, project string, fingerprint string) error if err != nil { return errors.Wrap(err, "Failed to fetch the leader node address") } + var targetNodeAddress string if shared.StringInSlice(leader, addresses) { targetNodeAddress = leader } else { targetNodeAddress = addresses[0] } + client, err := cluster.Connect(targetNodeAddress, d.endpoints.NetworkCert(), true) if err != nil { return errors.Wrap(err, "Failed to connect node for image synchronization") From 194a3e44fe7ce91a94849ef14e241bc191e330d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgra...@ubuntu.com> Date: Mon, 2 Dec 2019 23:55:26 -0500 Subject: [PATCH 5/6] lxd/cluster: Coding style MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Stéphane Graber <stgra...@ubuntu.com> --- lxd/api_cluster.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/lxd/api_cluster.go b/lxd/api_cluster.go index 7ea90a760e..3f02fdd0f4 100644 --- a/lxd/api_cluster.go +++ b/lxd/api_cluster.go @@ -612,6 +612,7 @@ func clusterPutJoin(d *Daemon, req api.ClusterPut) response.Response { if err != nil { return err } + err = clusterRebalance(client) if err != nil { return err @@ -922,10 +923,12 @@ func clusterNodeDelete(d *Daemon, r *http.Request) response.Response { if err != nil { return response.SmartError(err) } + networks, err := d.cluster.Networks() if err != nil { return response.SmartError(err) } + for _, name := range networks { err := client.DeleteNetwork(name) if err != nil { @@ -938,6 +941,7 @@ func clusterNodeDelete(d *Daemon, r *http.Request) response.Response { if err != nil && err != db.ErrNoSuchObject { return response.SmartError(err) } + for _, name := range pools { err := client.DeleteStoragePool(name) if err != nil { @@ -965,6 +969,7 @@ func clusterNodeDelete(d *Daemon, r *http.Request) response.Response { if err != nil { return response.SmartError(err) } + put := api.ClusterPut{} put.Enabled = false _, err = client.UpdateCluster(put, "") @@ -982,17 +987,19 @@ func tryClusterRebalance(d *Daemon) error { leader, err := d.gateway.LeaderAddress() if err != nil { // This is not a fatal error, so let's just log it. - return errors.Wrap(err, "failed to get current leader member") + return errors.Wrap(err, "Failed to get current leader member") } cert := d.endpoints.NetworkCert() client, err := cluster.Connect(leader, cert, true) if err != nil { - return errors.Wrap(err, "failed to connect to leader member") + return errors.Wrap(err, "Failed to connect to leader member") } + _, _, err = client.RawQuery("POST", "/internal/cluster/rebalance", nil, "") if err != nil { - return errors.Wrap(err, "request to rebalance cluster failed") + return errors.Wrap(err, "Request to rebalance cluster failed") } + return nil } @@ -1159,6 +1166,7 @@ func internalClusterPostRebalance(d *Daemon, r *http.Request) response.Response if err != nil { return response.SmartError(err) } + _, _, err = client.RawQuery("POST", "/internal/cluster/promote", post, "") if err != nil { return response.SmartError(err) From c65b847a0a2b541cd6ffd6af843540db795552e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgra...@ubuntu.com> Date: Mon, 2 Dec 2019 23:56:49 -0500 Subject: [PATCH 6/6] lxd: Tweak cluster.Connect calls MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Stéphane Graber <stgra...@ubuntu.com> --- lxd/api_cluster.go | 6 +++--- lxd/container_post.go | 2 +- lxd/networks.go | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/lxd/api_cluster.go b/lxd/api_cluster.go index 3f02fdd0f4..209fb08ed4 100644 --- a/lxd/api_cluster.go +++ b/lxd/api_cluster.go @@ -608,7 +608,7 @@ func clusterPutJoin(d *Daemon, req api.ClusterPut) response.Response { // Add the cluster flag from the agent version.UserAgentFeatures([]string{"cluster"}) - client, err = cluster.Connect(req.ClusterAddress, d.endpoints.NetworkCert(), false) + client, err = cluster.Connect(req.ClusterAddress, d.endpoints.NetworkCert(), true) if err != nil { return err } @@ -965,7 +965,7 @@ func clusterNodeDelete(d *Daemon, r *http.Request) response.Response { if force != 1 { // Try to gracefully reset the database on the node. cert := d.endpoints.NetworkCert() - client, err := cluster.Connect(address, cert, false) + client, err := cluster.Connect(address, cert, true) if err != nil { return response.SmartError(err) } @@ -1162,7 +1162,7 @@ func internalClusterPostRebalance(d *Daemon, r *http.Request) response.Response } cert := d.endpoints.NetworkCert() - client, err := cluster.Connect(address, cert, false) + client, err := cluster.Connect(address, cert, true) if err != nil { return response.SmartError(err) } diff --git a/lxd/container_post.go b/lxd/container_post.go index b8b6ba8592..0bcff60ed2 100644 --- a/lxd/container_post.go +++ b/lxd/container_post.go @@ -297,7 +297,7 @@ func containerPostClusteringMigrate(d *Daemon, c instance.Instance, oldName, new run := func(*operations.Operation) error { // Connect to the source host, i.e. ourselves (the node the container is running on). - source, err := cluster.Connect(sourceAddress, cert, false) + source, err := cluster.Connect(sourceAddress, cert, true) if err != nil { return errors.Wrap(err, "Failed to connect to source server") } diff --git a/lxd/networks.go b/lxd/networks.go index a7b0336209..057e3dc53d 100644 --- a/lxd/networks.go +++ b/lxd/networks.go @@ -2316,7 +2316,7 @@ func (n *network) refreshForkdnsServerAddresses(heartbeatData *cluster.APIHeartb continue } - client, err := cluster.Connect(node.Address, cert, false) + client, err := cluster.Connect(node.Address, cert, true) if err != nil { return err }
_______________________________________________ lxc-devel mailing list lxc-devel@lists.linuxcontainers.org http://lists.linuxcontainers.org/listinfo/lxc-devel