The following pull request was submitted through Github.
It can be accessed and reviewed at: https://github.com/lxc/lxd/pull/6236

This e-mail was sent by the LXC bot, direct replies will not reach the author
unless they happen to be subscribed to this list.

=== Description (from pull-request) ===

From 5662fd6289f09760d1bc500023ebd9b1cab95c9a Mon Sep 17 00:00:00 2001
From: Thomas Hipp <thomas.h...@canonical.com>
Date: Wed, 25 Sep 2019 09:31:45 +0200
Subject: [PATCH 1/2] lxd: Move events to new events package

This moves part of the event handling into a separate event package.

Signed-off-by: Thomas Hipp <thomas.h...@canonical.com>
---
 lxd/container.go     |   3 +-
 lxd/container_lxc.go |  27 ++---
 lxd/daemon.go        |   3 +-
 lxd/devlxd.go        |  41 +++----
 lxd/events.go        | 191 +------------------------------
 lxd/events/events.go | 259 +++++++++++++++++++++++++++++++++++++++++++
 lxd/main.go          |   7 +-
 lxd/main_forkdns.go  |   3 +-
 lxd/operations.go    |  21 ++--
 9 files changed, 317 insertions(+), 238 deletions(-)
 create mode 100644 lxd/events/events.go

diff --git a/lxd/container.go b/lxd/container.go
index 45c7687dba..722db5fcef 100644
--- a/lxd/container.go
+++ b/lxd/container.go
@@ -20,6 +20,7 @@ import (
        "github.com/lxc/lxd/lxd/db"
        "github.com/lxc/lxd/lxd/device"
        "github.com/lxc/lxd/lxd/device/config"
+       "github.com/lxc/lxd/lxd/events"
        "github.com/lxc/lxd/lxd/instance"
        "github.com/lxc/lxd/lxd/state"
        "github.com/lxc/lxd/lxd/sys"
@@ -681,7 +682,7 @@ func containerCreateAsSnapshot(s *state.State, args 
db.ContainerArgs, sourceInst
                os.RemoveAll(sourceInstance.StatePath())
        }
 
-       eventSendLifecycle(sourceInstance.Project(), 
"container-snapshot-created",
+       events.SendLifecycle(sourceInstance.Project(), 
"container-snapshot-created",
                fmt.Sprintf("/1.0/containers/%s", sourceInstance.Name()),
                map[string]interface{}{
                        "snapshot_name": args.Name,
diff --git a/lxd/container_lxc.go b/lxd/container_lxc.go
index 67e2e5f2d6..c75d4774da 100644
--- a/lxd/container_lxc.go
+++ b/lxd/container_lxc.go
@@ -30,6 +30,7 @@ import (
        "github.com/lxc/lxd/lxd/db/query"
        "github.com/lxc/lxd/lxd/device"
        "github.com/lxc/lxd/lxd/device/config"
+       "github.com/lxc/lxd/lxd/events"
        "github.com/lxc/lxd/lxd/instance"
        "github.com/lxc/lxd/lxd/maas"
        "github.com/lxc/lxd/lxd/project"
@@ -517,7 +518,7 @@ func containerLXCCreate(s *state.State, args 
db.ContainerArgs) (container, error
        }
 
        logger.Info("Created container", ctxMap)
-       eventSendLifecycle(c.project, "container-created",
+       events.SendLifecycle(c.project, "container-created",
                fmt.Sprintf("/1.0/containers/%s", c.name), nil)
 
        return c, nil
@@ -2688,7 +2689,7 @@ func (c *containerLXC) Start(stateful bool) error {
        }
 
        logger.Info("Started container", ctxMap)
-       eventSendLifecycle(c.project, "container-started",
+       events.SendLifecycle(c.project, "container-started",
                fmt.Sprintf("/1.0/containers/%s", c.name), nil)
 
        return nil
@@ -2856,7 +2857,7 @@ func (c *containerLXC) Stop(stateful bool) error {
 
                op.Done(nil)
                logger.Info("Stopped container", ctxMap)
-               eventSendLifecycle(c.project, "container-stopped",
+               events.SendLifecycle(c.project, "container-stopped",
                        fmt.Sprintf("/1.0/containers/%s", c.name), nil)
                return nil
        } else if shared.PathExists(c.StatePath()) {
@@ -2912,7 +2913,7 @@ func (c *containerLXC) Stop(stateful bool) error {
        }
 
        logger.Info("Stopped container", ctxMap)
-       eventSendLifecycle(c.project, "container-stopped",
+       events.SendLifecycle(c.project, "container-stopped",
                fmt.Sprintf("/1.0/containers/%s", c.name), nil)
 
        return nil
@@ -2973,7 +2974,7 @@ func (c *containerLXC) Shutdown(timeout time.Duration) 
error {
        }
 
        logger.Info("Shut down container", ctxMap)
-       eventSendLifecycle(c.project, "container-shutdown",
+       events.SendLifecycle(c.project, "container-shutdown",
                fmt.Sprintf("/1.0/containers/%s", c.name), nil)
 
        return nil
@@ -3166,7 +3167,7 @@ func (c *containerLXC) Freeze() error {
        }
 
        logger.Info("Froze container", ctxMap)
-       eventSendLifecycle(c.project, "container-paused",
+       events.SendLifecycle(c.project, "container-paused",
                fmt.Sprintf("/1.0/containers/%s", c.name), nil)
 
        return err
@@ -3211,7 +3212,7 @@ func (c *containerLXC) Unfreeze() error {
        }
 
        logger.Info("Unfroze container", ctxMap)
-       eventSendLifecycle(c.project, "container-resumed",
+       events.SendLifecycle(c.project, "container-resumed",
                fmt.Sprintf("/1.0/containers/%s", c.name), nil)
 
        return err
@@ -3605,7 +3606,7 @@ func (c *containerLXC) Restore(sourceContainer Instance, 
stateful bool) error {
                return nil
        }
 
-       eventSendLifecycle(c.project, "container-snapshot-restored",
+       events.SendLifecycle(c.project, "container-snapshot-restored",
                fmt.Sprintf("/1.0/containers/%s", c.name), 
map[string]interface{}{
                        "snapshot_name": c.name,
                })
@@ -3762,12 +3763,12 @@ func (c *containerLXC) Delete() error {
        logger.Info("Deleted container", ctxMap)
 
        if c.IsSnapshot() {
-               eventSendLifecycle(c.project, "container-snapshot-deleted",
+               events.SendLifecycle(c.project, "container-snapshot-deleted",
                        fmt.Sprintf("/1.0/containers/%s", c.name), 
map[string]interface{}{
                                "snapshot_name": c.name,
                        })
        } else {
-               eventSendLifecycle(c.project, "container-deleted",
+               events.SendLifecycle(c.project, "container-deleted",
                        fmt.Sprintf("/1.0/containers/%s", c.name), nil)
        }
 
@@ -3928,13 +3929,13 @@ func (c *containerLXC) Rename(newName string) error {
        logger.Info("Renamed container", ctxMap)
 
        if c.IsSnapshot() {
-               eventSendLifecycle(c.project, "container-snapshot-renamed",
+               events.SendLifecycle(c.project, "container-snapshot-renamed",
                        fmt.Sprintf("/1.0/containers/%s", oldName), 
map[string]interface{}{
                                "new_name":      newName,
                                "snapshot_name": oldName,
                        })
        } else {
-               eventSendLifecycle(c.project, "container-renamed",
+               events.SendLifecycle(c.project, "container-renamed",
                        fmt.Sprintf("/1.0/containers/%s", oldName), 
map[string]interface{}{
                                "new_name": newName,
                        })
@@ -4832,7 +4833,7 @@ func (c *containerLXC) Update(args db.ContainerArgs, 
userRequested bool) error {
                endpoint = fmt.Sprintf("/1.0/containers/%s", c.name)
        }
 
-       eventSendLifecycle(c.project, "container-updated", endpoint, nil)
+       events.SendLifecycle(c.project, "container-updated", endpoint, nil)
 
        return nil
 }
diff --git a/lxd/daemon.go b/lxd/daemon.go
index d9a0f1d3a7..2d2d433ee5 100644
--- a/lxd/daemon.go
+++ b/lxd/daemon.go
@@ -32,6 +32,7 @@ import (
        "github.com/lxc/lxd/lxd/db"
        "github.com/lxc/lxd/lxd/device"
        "github.com/lxc/lxd/lxd/endpoints"
+       "github.com/lxc/lxd/lxd/events"
        "github.com/lxc/lxd/lxd/maas"
        "github.com/lxc/lxd/lxd/node"
        "github.com/lxc/lxd/lxd/rbac"
@@ -919,7 +920,7 @@ func (d *Daemon) startClusterTasks() {
        d.clusterTasks.Add(cluster.HeartbeatTask(d.gateway))
 
        // Events
-       d.clusterTasks.Add(cluster.Events(d.endpoints, d.cluster, eventForward))
+       d.clusterTasks.Add(cluster.Events(d.endpoints, d.cluster, 
events.Forward))
 
        // Auto-sync images across the cluster (daily)
        d.clusterTasks.Add(autoSyncImagesTask(d))
diff --git a/lxd/devlxd.go b/lxd/devlxd.go
index 7ceeb96685..60bbdae845 100644
--- a/lxd/devlxd.go
+++ b/lxd/devlxd.go
@@ -17,8 +17,8 @@ import (
 
        "github.com/gorilla/mux"
        "github.com/gorilla/websocket"
-       "github.com/pborman/uuid"
 
+       "github.com/lxc/lxd/lxd/events"
        "github.com/lxc/lxd/lxd/instance"
        "github.com/lxc/lxd/lxd/util"
        "github.com/lxc/lxd/shared"
@@ -104,7 +104,7 @@ var devlxdMetadataGet = devLxdHandler{"/1.0/meta-data", 
func(d *Daemon, c contai
 }}
 
 var devlxdEventsLock sync.Mutex
-var devlxdEventListeners map[int]map[string]*eventListener = 
make(map[int]map[string]*eventListener)
+var devlxdEventListeners map[int]map[string]*events.Listener = 
make(map[int]map[string]*events.Listener)
 
 var devlxdEventsGet = devLxdHandler{"/1.0/events", func(d *Daemon, c 
container, w http.ResponseWriter, r *http.Request) *devLxdResponse {
        typeStr := r.FormValue("type")
@@ -117,26 +117,20 @@ var devlxdEventsGet = devLxdHandler{"/1.0/events", func(d 
*Daemon, c container,
                return &devLxdResponse{"internal server error", 
http.StatusInternalServerError, "raw"}
        }
 
-       listener := eventListener{
-               project:      c.Project(),
-               active:       make(chan bool, 1),
-               connection:   conn,
-               id:           uuid.NewRandom().String(),
-               messageTypes: strings.Split(typeStr, ","),
-       }
+       listener := events.NewEventListener(c.Project(), conn, 
strings.Split(typeStr, ","), "", false)
 
        devlxdEventsLock.Lock()
        cid := c.Id()
        _, ok := devlxdEventListeners[cid]
        if !ok {
-               devlxdEventListeners[cid] = map[string]*eventListener{}
+               devlxdEventListeners[cid] = map[string]*events.Listener{}
        }
-       devlxdEventListeners[cid][listener.id] = &listener
+       devlxdEventListeners[cid][listener.ID()] = listener
        devlxdEventsLock.Unlock()
 
-       logger.Debugf("New container event listener for '%s': %s", c.Name(), 
listener.id)
+       logger.Debugf("New container event listener for '%s': %s", c.Name(), 
listener.ID())
 
-       <-listener.active
+       listener.Wait()
 
        return &devLxdResponse{"websocket", http.StatusOK, "websocket"}
 }}
@@ -161,37 +155,36 @@ func devlxdEventSend(c container, eventType string, 
eventMessage interface{}) er
        }
 
        for _, listener := range listeners {
-               if !shared.StringInSlice(eventType, listener.messageTypes) {
+               if !shared.StringInSlice(eventType, listener.MessageTypes()) {
                        continue
                }
 
-               go func(listener *eventListener, body []byte) {
+               go func(listener *events.Listener, body []byte) {
                        // Check that the listener still exists
                        if listener == nil {
                                return
                        }
 
                        // Ensure there is only a single even going out at the 
time
-                       listener.lock.Lock()
-                       defer listener.lock.Unlock()
+                       listener.Lock()
+                       defer listener.Unlock()
 
                        // Make sure we're not done already
-                       if listener.done {
+                       if listener.IsDone() {
                                return
                        }
 
-                       err = 
listener.connection.WriteMessage(websocket.TextMessage, body)
+                       err = 
listener.Connection().WriteMessage(websocket.TextMessage, body)
                        if err != nil {
                                // Remove the listener from the list
                                devlxdEventsLock.Lock()
-                               delete(devlxdEventListeners[cid], listener.id)
+                               delete(devlxdEventListeners[cid], listener.ID())
                                devlxdEventsLock.Unlock()
 
                                // Disconnect the listener
-                               listener.connection.Close()
-                               listener.active <- false
-                               listener.done = true
-                               logger.Debugf("Disconnected container event 
listener for '%s': %s", c.Name(), listener.id)
+                               listener.Connection().Close()
+                               listener.Deactivate()
+                               logger.Debugf("Disconnected container event 
listener for '%s': %s", c.Name(), listener.ID())
                        }
                }(listener, body)
        }
diff --git a/lxd/events.go b/lxd/events.go
index 8040d20a4a..471e56c1c4 100644
--- a/lxd/events.go
+++ b/lxd/events.go
@@ -1,20 +1,12 @@
 package main
 
 import (
-       "encoding/json"
-       "fmt"
        "net/http"
        "strings"
-       "sync"
-       "time"
-
-       "github.com/gorilla/websocket"
-       log "github.com/lxc/lxd/shared/log15"
-       "github.com/pborman/uuid"
 
        "github.com/lxc/lxd/lxd/db"
+       "github.com/lxc/lxd/lxd/events"
        "github.com/lxc/lxd/shared"
-       "github.com/lxc/lxd/shared/api"
        "github.com/lxc/lxd/shared/logger"
 )
 
@@ -24,61 +16,6 @@ var eventsCmd = APIEndpoint{
        Get: APIEndpointAction{Handler: eventsGet, AccessHandler: 
AllowAuthenticated},
 }
 
-type eventsHandler struct {
-}
-
-func logContextMap(ctx []interface{}) map[string]string {
-       var key string
-       ctxMap := map[string]string{}
-
-       for _, entry := range ctx {
-               if key == "" {
-                       key = entry.(string)
-               } else {
-                       ctxMap[key] = fmt.Sprintf("%v", entry)
-                       key = ""
-               }
-       }
-
-       return ctxMap
-}
-
-func (h eventsHandler) Log(r *log.Record) error {
-       eventSend("", "logging", api.EventLogging{
-               Message: r.Msg,
-               Level:   r.Lvl.String(),
-               Context: logContextMap(r.Ctx)})
-       return nil
-}
-
-func eventSendLifecycle(project, action, source string,
-       context map[string]interface{}) error {
-       eventSend(project, "lifecycle", api.EventLifecycle{
-               Action:  action,
-               Source:  source,
-               Context: context})
-       return nil
-}
-
-var eventsLock sync.Mutex
-var eventListeners map[string]*eventListener = make(map[string]*eventListener)
-
-type eventListener struct {
-       project      string
-       connection   *websocket.Conn
-       messageTypes []string
-       active       chan bool
-       id           string
-       lock         sync.Mutex
-       done         bool
-       location     string
-
-       // If true, this listener won't get events forwarded from other
-       // nodes. It only used by listeners created internally by LXD nodes
-       // connecting to other LXD nodes to get their local events only.
-       noForward bool
-}
-
 type eventsServe struct {
        req *http.Request
        d   *Daemon
@@ -117,27 +54,16 @@ func eventsSocket(d *Daemon, r *http.Request, w 
http.ResponseWriter) error {
                return err
        }
 
-       listener := eventListener{
-               project:      project,
-               active:       make(chan bool, 1),
-               connection:   c,
-               id:           uuid.NewRandom().String(),
-               messageTypes: strings.Split(typeStr, ","),
-               location:     serverName,
-       }
-
        // If this request is an internal one initiated by another node wanting
        // to watch the events on this node, set the listener to broadcast only
        // local events.
-       listener.noForward = isClusterNotification(r)
+       listener := events.NewEventListener(project, c, strings.Split(typeStr, 
","), serverName, isClusterNotification(r))
 
-       eventsLock.Lock()
-       eventListeners[listener.id] = &listener
-       eventsLock.Unlock()
+       events.AddListener(listener)
 
-       logger.Debugf("New event listener: %s", listener.id)
+       logger.Debugf("New event listener: %s", listener.ID())
 
-       <-listener.active
+       listener.Wait()
 
        return nil
 }
@@ -145,110 +71,3 @@ func eventsSocket(d *Daemon, r *http.Request, w 
http.ResponseWriter) error {
 func eventsGet(d *Daemon, r *http.Request) Response {
        return &eventsServe{req: r, d: d}
 }
-
-func eventSend(project, eventType string, eventMessage interface{}) error {
-       encodedMessage, err := json.Marshal(eventMessage)
-       if err != nil {
-               return err
-       }
-       event := api.Event{
-               Type:      eventType,
-               Timestamp: time.Now(),
-               Metadata:  encodedMessage,
-       }
-
-       return eventBroadcast(project, event, false)
-}
-
-func eventBroadcast(project string, event api.Event, isForward bool) error {
-       eventsLock.Lock()
-       listeners := eventListeners
-       for _, listener := range listeners {
-               if project != "" && listener.project != "*" && project != 
listener.project {
-                       continue
-               }
-
-               if isForward && listener.noForward {
-                       continue
-               }
-
-               if !shared.StringInSlice(event.Type, listener.messageTypes) {
-                       continue
-               }
-
-               go func(listener *eventListener, event api.Event) {
-                       // Check that the listener still exists
-                       if listener == nil {
-                               return
-                       }
-
-                       // Ensure there is only a single even going out at the 
time
-                       listener.lock.Lock()
-                       defer listener.lock.Unlock()
-
-                       // Make sure we're not done already
-                       if listener.done {
-                               return
-                       }
-
-                       // Set the Location to the expected serverName
-                       if event.Location == "" {
-                               eventCopy := api.Event{}
-                               err := shared.DeepCopy(&event, &eventCopy)
-                               if err != nil {
-                                       return
-                               }
-                               eventCopy.Location = listener.location
-
-                               event = eventCopy
-                       }
-
-                       body, err := json.Marshal(event)
-                       if err != nil {
-                               return
-                       }
-
-                       err = 
listener.connection.WriteMessage(websocket.TextMessage, body)
-                       if err != nil {
-                               // Remove the listener from the list
-                               eventsLock.Lock()
-                               delete(eventListeners, listener.id)
-                               eventsLock.Unlock()
-
-                               // Disconnect the listener
-                               listener.connection.Close()
-                               listener.active <- false
-                               listener.done = true
-                               logger.Debugf("Disconnected event listener: 
%s", listener.id)
-                       }
-               }(listener, event)
-       }
-       eventsLock.Unlock()
-
-       return nil
-}
-
-// Forward to the local events dispatcher an event received from another node .
-func eventForward(id int64, event api.Event) {
-       if event.Type == "logging" {
-               // Parse the message
-               logEntry := api.EventLogging{}
-               err := json.Unmarshal(event.Metadata, &logEntry)
-               if err != nil {
-                       return
-               }
-
-               if !debug && logEntry.Level == "dbug" {
-                       return
-               }
-
-               if !debug && !verbose && logEntry.Level == "info" {
-                       return
-               }
-       }
-
-       err := eventBroadcast("", event, true)
-       if err != nil {
-               logger.Warnf("Failed to forward event from node %d: %v", id, 
err)
-       }
-}
diff --git a/lxd/events/events.go b/lxd/events/events.go
new file mode 100644
index 0000000000..f2ae3fd5c8
--- /dev/null
+++ b/lxd/events/events.go
@@ -0,0 +1,259 @@
+package events
+
+import (
+       "encoding/json"
+       "fmt"
+       "sync"
+       "time"
+
+       "github.com/pborman/uuid"
+
+       "github.com/gorilla/websocket"
+       "github.com/lxc/lxd/shared"
+       "github.com/lxc/lxd/shared/api"
+       log "github.com/lxc/lxd/shared/log15"
+       "github.com/lxc/lxd/shared/logger"
+)
+
+var debug bool
+var verbose bool
+
+var eventsLock sync.Mutex
+var eventListeners map[string]*Listener = make(map[string]*Listener)
+
+// Listener describes an event listener.
+type Listener struct {
+       project      string
+       connection   *websocket.Conn
+       messageTypes []string
+       active       chan bool
+       id           string
+       lock         sync.Mutex
+       done         bool
+       location     string
+
+       // If true, this listener won't get events forwarded from other
+       // nodes. It only used by listeners created internally by LXD nodes
+       // connecting to other LXD nodes to get their local events only.
+       noForward bool
+}
+
+// NewEventListener creates and returns a new event listener.
+func NewEventListener(project string, connection *websocket.Conn, messageTypes 
[]string, location string, noForward bool) *Listener {
+       return &Listener{
+               project:      project,
+               connection:   connection,
+               messageTypes: messageTypes,
+               location:     location,
+               noForward:    noForward,
+               active:       make(chan bool, 1),
+               id:           uuid.NewRandom().String(),
+       }
+}
+
+// MessageTypes returns a list of message types the listener will be notified 
of.
+func (e *Listener) MessageTypes() []string {
+       return e.messageTypes
+}
+
+// IsDone returns true if the listener is done.
+func (e *Listener) IsDone() bool {
+       return e.done
+}
+
+// Connection returns the underlying websocket connection.
+func (e *Listener) Connection() *websocket.Conn {
+       return e.connection
+}
+
+// ID returns the listener ID.
+func (e *Listener) ID() string {
+       return e.id
+}
+
+// Wait waits for a message on its active channel, then returns.
+func (e *Listener) Wait() {
+       <-e.active
+}
+
+// Lock locks the internal mutex.
+func (e *Listener) Lock() {
+       e.lock.Lock()
+}
+
+// Unlock unlocks the internal mutex.
+func (e *Listener) Unlock() {
+       e.lock.Unlock()
+}
+
+// Deactivate deactivates the event listener.
+func (e *Listener) Deactivate() {
+       e.active <- false
+       e.done = true
+}
+
+// Handler describes an event handler.
+type Handler struct {
+}
+
+// NewEventHandler creates and returns a new event handler.
+func NewEventHandler() *Handler {
+       return &Handler{}
+}
+
+// Log sends a new logging event.
+func (h Handler) Log(r *log.Record) error {
+       Send("", "logging", api.EventLogging{
+               Message: r.Msg,
+               Level:   r.Lvl.String(),
+               Context: logContextMap(r.Ctx)})
+       return nil
+}
+
+// Init sets the debug and verbose flags.
+func Init(d bool, v bool) {
+       debug = d
+       verbose = v
+}
+
+// AddListener adds the given listener to the internal list of listeners which
+// are notified when events are broadcasted.
+func AddListener(listener *Listener) {
+       eventsLock.Lock()
+       eventListeners[listener.id] = listener
+       eventsLock.Unlock()
+}
+
+// SendLifecycle broadcasts a lifecycle event.
+func SendLifecycle(project, action, source string,
+       context map[string]interface{}) error {
+       Send(project, "lifecycle", api.EventLifecycle{
+               Action:  action,
+               Source:  source,
+               Context: context})
+       return nil
+}
+
+// Send broadcasts a custom event.
+func Send(project, eventType string, eventMessage interface{}) error {
+       encodedMessage, err := json.Marshal(eventMessage)
+       if err != nil {
+               return err
+       }
+       event := api.Event{
+               Type:      eventType,
+               Timestamp: time.Now(),
+               Metadata:  encodedMessage,
+       }
+
+       return broadcast(project, event, false)
+}
+
+// Forward to the local events dispatcher an event received from another node.
+func Forward(id int64, event api.Event) {
+       if event.Type == "logging" {
+               // Parse the message
+               logEntry := api.EventLogging{}
+               err := json.Unmarshal(event.Metadata, &logEntry)
+               if err != nil {
+                       return
+               }
+
+               if !debug && logEntry.Level == "dbug" {
+                       return
+               }
+
+               if !debug && !verbose && logEntry.Level == "info" {
+                       return
+               }
+       }
+
+       err := broadcast("", event, true)
+       if err != nil {
+               logger.Warnf("Failed to forward event from node %d: %v", id, 
err)
+       }
+}
+
+func logContextMap(ctx []interface{}) map[string]string {
+       var key string
+       ctxMap := map[string]string{}
+
+       for _, entry := range ctx {
+               if key == "" {
+                       key = entry.(string)
+               } else {
+                       ctxMap[key] = fmt.Sprintf("%v", entry)
+                       key = ""
+               }
+       }
+
+       return ctxMap
+}
+
+func broadcast(project string, event api.Event, isForward bool) error {
+       eventsLock.Lock()
+       listeners := eventListeners
+       for _, listener := range listeners {
+               if project != "" && listener.project != "*" && project != 
listener.project {
+                       continue
+               }
+
+               if isForward && listener.noForward {
+                       continue
+               }
+
+               if !shared.StringInSlice(event.Type, listener.messageTypes) {
+                       continue
+               }
+
+               go func(listener *Listener, event api.Event) {
+                       // Check that the listener still exists
+                       if listener == nil {
+                               return
+                       }
+
+                       // Ensure there is only a single even going out at the 
time
+                       listener.lock.Lock()
+                       defer listener.lock.Unlock()
+
+                       // Make sure we're not done already
+                       if listener.done {
+                               return
+                       }
+
+                       // Set the Location to the expected serverName
+                       if event.Location == "" {
+                               eventCopy := api.Event{}
+                               err := shared.DeepCopy(&event, &eventCopy)
+                               if err != nil {
+                                       return
+                               }
+                               eventCopy.Location = listener.location
+
+                               event = eventCopy
+                       }
+
+                       body, err := json.Marshal(event)
+                       if err != nil {
+                               return
+                       }
+
+                       err = 
listener.connection.WriteMessage(websocket.TextMessage, body)
+                       if err != nil {
+                               // Remove the listener from the list
+                               eventsLock.Lock()
+                               delete(eventListeners, listener.id)
+                               eventsLock.Unlock()
+
+                               // Disconnect the listener
+                               listener.connection.Close()
+                               listener.active <- false
+                               listener.done = true
+                               logger.Debugf("Disconnected event listener: 
%s", listener.id)
+                       }
+               }(listener, event)
+       }
+       eventsLock.Unlock()
+
+       return nil
+}
diff --git a/lxd/main.go b/lxd/main.go
index af52a93423..476ad9ecb4 100644
--- a/lxd/main.go
+++ b/lxd/main.go
@@ -7,6 +7,7 @@ import (
 
        "github.com/spf13/cobra"
 
+       "github.com/lxc/lxd/lxd/events"
        "github.com/lxc/lxd/shared/logger"
        "github.com/lxc/lxd/shared/logging"
        "github.com/lxc/lxd/shared/version"
@@ -39,14 +40,16 @@ func (c *cmdGlobal) Run(cmd *cobra.Command, args []string) 
error {
        debug = c.flagLogDebug
        verbose = c.flagLogVerbose
 
+       // Set debug and verbose for the events package
+       events.Init(debug, verbose)
+
        // Setup logger
        syslog := ""
        if c.flagLogSyslog {
                syslog = "lxd"
        }
 
-       handler := eventsHandler{}
-       log, err := logging.GetLogger(syslog, c.flagLogFile, c.flagLogVerbose, 
c.flagLogDebug, handler)
+       log, err := logging.GetLogger(syslog, c.flagLogFile, c.flagLogVerbose, 
c.flagLogDebug, events.NewEventHandler())
        if err != nil {
                return err
        }
diff --git a/lxd/main_forkdns.go b/lxd/main_forkdns.go
index e339593c49..0ba1d322b9 100644
--- a/lxd/main_forkdns.go
+++ b/lxd/main_forkdns.go
@@ -14,6 +14,7 @@ import (
        "github.com/spf13/cobra"
        "gopkg.in/fsnotify.v0"
 
+       "github.com/lxc/lxd/lxd/events"
        "github.com/lxc/lxd/shared"
        "github.com/lxc/lxd/shared/dnsutil"
        "github.com/lxc/lxd/shared/logger"
@@ -442,7 +443,7 @@ func (c *cmdForkDNS) Run(cmd *cobra.Command, args []string) 
error {
                return fmt.Errorf("Missing required arguments")
        }
 
-       log, err := logging.GetLogger("lxd-forkdns", "", false, false, 
eventsHandler{})
+       log, err := logging.GetLogger("lxd-forkdns", "", false, false, 
events.NewEventHandler())
        if err != nil {
                return err
        }
diff --git a/lxd/operations.go b/lxd/operations.go
index d0908e31e3..c96409b58e 100644
--- a/lxd/operations.go
+++ b/lxd/operations.go
@@ -14,6 +14,7 @@ import (
 
        "github.com/lxc/lxd/lxd/cluster"
        "github.com/lxc/lxd/lxd/db"
+       "github.com/lxc/lxd/lxd/events"
        "github.com/lxc/lxd/lxd/node"
        "github.com/lxc/lxd/lxd/util"
        "github.com/lxc/lxd/shared"
@@ -154,7 +155,7 @@ func (op *operation) Run() (chan error, error) {
                                logger.Debugf("Failure for %s operation: %s: 
%s", op.class.String(), op.id, err)
 
                                _, md, _ := op.Render()
-                               eventSend(op.project, "operation", md)
+                               events.Send(op.project, "operation", md)
                                return
                        }
 
@@ -167,7 +168,7 @@ func (op *operation) Run() (chan error, error) {
                        op.lock.Lock()
                        logger.Debugf("Success for %s operation: %s", 
op.class.String(), op.id)
                        _, md, _ := op.Render()
-                       eventSend(op.project, "operation", md)
+                       events.Send(op.project, "operation", md)
                        op.lock.Unlock()
                }(op, chanRun)
        }
@@ -175,7 +176,7 @@ func (op *operation) Run() (chan error, error) {
 
        logger.Debugf("Started %s operation: %s", op.class.String(), op.id)
        _, md, _ := op.Render()
-       eventSend(op.project, "operation", md)
+       events.Send(op.project, "operation", md)
 
        return chanRun, nil
 }
@@ -207,7 +208,7 @@ func (op *operation) Cancel() (chan error, error) {
 
                                logger.Debugf("Failed to cancel %s operation: 
%s: %s", op.class.String(), op.id, err)
                                _, md, _ := op.Render()
-                               eventSend(op.project, "operation", md)
+                               events.Send(op.project, "operation", md)
                                return
                        }
 
@@ -219,13 +220,13 @@ func (op *operation) Cancel() (chan error, error) {
 
                        logger.Debugf("Cancelled %s operation: %s", 
op.class.String(), op.id)
                        _, md, _ := op.Render()
-                       eventSend(op.project, "operation", md)
+                       events.Send(op.project, "operation", md)
                }(op, oldStatus, chanCancel)
        }
 
        logger.Debugf("Cancelling %s operation: %s", op.class.String(), op.id)
        _, md, _ := op.Render()
-       eventSend(op.project, "operation", md)
+       events.Send(op.project, "operation", md)
 
        if op.canceler != nil {
                err := op.canceler.Cancel()
@@ -244,7 +245,7 @@ func (op *operation) Cancel() (chan error, error) {
 
        logger.Debugf("Cancelled %s operation: %s", op.class.String(), op.id)
        _, md, _ = op.Render()
-       eventSend(op.project, "operation", md)
+       events.Send(op.project, "operation", md)
 
        return chanCancel, nil
 }
@@ -383,7 +384,7 @@ func (op *operation) UpdateResources(opResources 
map[string][]string) error {
 
        logger.Debugf("Updated resources for %s operation: %s", 
op.class.String(), op.id)
        _, md, _ := op.Render()
-       eventSend(op.project, "operation", md)
+       events.Send(op.project, "operation", md)
 
        return nil
 }
@@ -409,7 +410,7 @@ func (op *operation) UpdateMetadata(opMetadata interface{}) 
error {
 
        logger.Debugf("Updated metadata for %s operation: %s", 
op.class.String(), op.id)
        _, md, _ := op.Render()
-       eventSend(op.project, "operation", md)
+       events.Send(op.project, "operation", md)
 
        return nil
 }
@@ -472,7 +473,7 @@ func operationCreate(cluster *db.Cluster, project string, 
opClass operationClass
 
        logger.Debugf("New %s operation: %s", op.class.String(), op.id)
        _, md, _ := op.Render()
-       eventSend(op.project, "operation", md)
+       events.Send(op.project, "operation", md)
 
        return &op, nil
 }

From 841007fabda56bb6b4755a0606a137bddadd711b Mon Sep 17 00:00:00 2001
From: Thomas Hipp <thomas.h...@canonical.com>
Date: Wed, 25 Sep 2019 09:39:50 +0200
Subject: [PATCH 2/2] test: Add events package to static analysis test

Signed-off-by: Thomas Hipp <thomas.h...@canonical.com>
---
 test/suites/static_analysis.sh | 1 +
 1 file changed, 1 insertion(+)

diff --git a/test/suites/static_analysis.sh b/test/suites/static_analysis.sh
index 8f40270444..21291beeb6 100644
--- a/test/suites/static_analysis.sh
+++ b/test/suites/static_analysis.sh
@@ -78,6 +78,7 @@ test_static_analysis() {
       golint -set_exit_status lxd/db/query
       golint -set_exit_status lxd/db/schema
       golint -set_exit_status lxd/endpoints
+      golint -set_exit_status lxd/events
       golint -set_exit_status lxd/maas
       #golint -set_exit_status lxd/migration
       golint -set_exit_status lxd/node
_______________________________________________
lxc-devel mailing list
lxc-devel@lists.linuxcontainers.org
http://lists.linuxcontainers.org/listinfo/lxc-devel

Reply via email to