The following pull request was submitted through Github.
It can be accessed and reviewed at: https://github.com/lxc/lxd/pull/6265

This e-mail was sent by the LXC bot, direct replies will not reach the author
unless they happen to be subscribed to this list.

=== Description (from pull-request) ===
This allows multiple users of the events package with their own list of listeners and configuration.
From 7770da74f3d9199d7985e783bdbee215dfb95824 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgra...@ubuntu.com>
Date: Mon, 30 Sep 2019 15:05:13 -0400
Subject: [PATCH 1/5] lxd/main_forkdns: Don't setup event logger
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgra...@ubuntu.com>
---
 lxd/main_forkdns.go | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/lxd/main_forkdns.go b/lxd/main_forkdns.go
index 0ba1d322b9..7a6833a174 100644
--- a/lxd/main_forkdns.go
+++ b/lxd/main_forkdns.go
@@ -14,7 +14,6 @@ import (
        "github.com/spf13/cobra"
        "gopkg.in/fsnotify.v0"
 
-       "github.com/lxc/lxd/lxd/events"
        "github.com/lxc/lxd/shared"
        "github.com/lxc/lxd/shared/dnsutil"
        "github.com/lxc/lxd/shared/logger"
@@ -443,7 +442,7 @@ func (c *cmdForkDNS) Run(cmd *cobra.Command, args []string) 
error {
                return fmt.Errorf("Missing required arguments")
        }
 
-       log, err := logging.GetLogger("lxd-forkdns", "", false, false, 
events.NewEventHandler())
+       log, err := logging.GetLogger("lxd-forkdns", "", false, false, nil)
        if err != nil {
                return err
        }

From 66495c83fbfab65a95068f6b00dcf8a60316d7ab Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgra...@ubuntu.com>
Date: Mon, 30 Sep 2019 15:08:58 -0400
Subject: [PATCH 2/5] lxd/events: redesign
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgra...@ubuntu.com>
---
 lxd/events/events.go  | 208 ++++++++++++++++++------------------------
 lxd/events/logging.go |  50 ++++++++++
 2 files changed, 137 insertions(+), 121 deletions(-)
 create mode 100644 lxd/events/logging.go

diff --git a/lxd/events/events.go b/lxd/events/events.go
index f2ae3fd5c8..2d1b46e081 100644
--- a/lxd/events/events.go
+++ b/lxd/events/events.go
@@ -11,36 +11,21 @@ import (
        "github.com/gorilla/websocket"
        "github.com/lxc/lxd/shared"
        "github.com/lxc/lxd/shared/api"
-       log "github.com/lxc/lxd/shared/log15"
        "github.com/lxc/lxd/shared/logger"
 )
 
-var debug bool
-var verbose bool
+// Server represents an instance of an event server.
+type Server struct {
+       debug bool
+       verbose bool
 
-var eventsLock sync.Mutex
-var eventListeners map[string]*Listener = make(map[string]*Listener)
-
-// Listener describes an event listener.
-type Listener struct {
-       project      string
-       connection   *websocket.Conn
-       messageTypes []string
-       active       chan bool
-       id           string
-       lock         sync.Mutex
-       done         bool
-       location     string
-
-       // If true, this listener won't get events forwarded from other
-       // nodes. It only used by listeners created internally by LXD nodes
-       // connecting to other LXD nodes to get their local events only.
-       noForward bool
+       listeners map[string]*Listener
+       lock sync.Mutex
 }
 
 // NewEventListener creates and returns a new event listener.
-func NewEventListener(project string, connection *websocket.Conn, messageTypes 
[]string, location string, noForward bool) *Listener {
-       return &Listener{
+func (s *Server) AddListener(project string, connection *websocket.Conn, 
messageTypes []string, location string, noForward bool) error {
+       listener := &Listener{
                project:      project,
                connection:   connection,
                messageTypes: messageTypes,
@@ -49,85 +34,23 @@ func NewEventListener(project string, connection 
*websocket.Conn, messageTypes [
                active:       make(chan bool, 1),
                id:           uuid.NewRandom().String(),
        }
-}
-
-// MessageTypes returns a list of message types the listener will be notified 
of.
-func (e *Listener) MessageTypes() []string {
-       return e.messageTypes
-}
-
-// IsDone returns true if the listener is done.
-func (e *Listener) IsDone() bool {
-       return e.done
-}
-
-// Connection returns the underlying websocket connection.
-func (e *Listener) Connection() *websocket.Conn {
-       return e.connection
-}
 
-// ID returns the listener ID.
-func (e *Listener) ID() string {
-       return e.id
-}
-
-// Wait waits for a message on its active channel, then returns.
-func (e *Listener) Wait() {
-       <-e.active
-}
-
-// Lock locks the internal mutex.
-func (e *Listener) Lock() {
-       e.lock.Lock()
-}
-
-// Unlock unlocks the internal mutex.
-func (e *Listener) Unlock() {
-       e.lock.Unlock()
-}
-
-// Deactivate deactivates the event listener.
-func (e *Listener) Deactivate() {
-       e.active <- false
-       e.done = true
-}
+       s.lock.Lock()
+       defer s.lock.Unlock()
 
-// Handler describes an event handler.
-type Handler struct {
-}
+       if s.listeners[listener.id] != nil {
+               return fmt.Errorf("A listener with id '%s' already exists", 
listener.id)
+       }
 
-// NewEventHandler creates and returns a new event handler.
-func NewEventHandler() *Handler {
-       return &Handler{}
-}
+       s.listeners[listener.id] = listener
 
-// Log sends a new logging event.
-func (h Handler) Log(r *log.Record) error {
-       Send("", "logging", api.EventLogging{
-               Message: r.Msg,
-               Level:   r.Lvl.String(),
-               Context: logContextMap(r.Ctx)})
        return nil
 }
 
-// Init sets the debug and verbose flags.
-func Init(d bool, v bool) {
-       debug = d
-       verbose = v
-}
-
-// AddListener adds the given listener to the internal list of listeners which
-// are notified when events are broadcasted.
-func AddListener(listener *Listener) {
-       eventsLock.Lock()
-       eventListeners[listener.id] = listener
-       eventsLock.Unlock()
-}
-
 // SendLifecycle broadcasts a lifecycle event.
-func SendLifecycle(project, action, source string,
+func (s *Server) SendLifecycle(project, action, source string,
        context map[string]interface{}) error {
-       Send(project, "lifecycle", api.EventLifecycle{
+       s.Send(project, "lifecycle", api.EventLifecycle{
                Action:  action,
                Source:  source,
                Context: context})
@@ -135,7 +58,7 @@ func SendLifecycle(project, action, source string,
 }
 
 // Send broadcasts a custom event.
-func Send(project, eventType string, eventMessage interface{}) error {
+func (s *Server) Send(project, eventType string, eventMessage interface{}) 
error {
        encodedMessage, err := json.Marshal(eventMessage)
        if err != nil {
                return err
@@ -146,11 +69,11 @@ func Send(project, eventType string, eventMessage 
interface{}) error {
                Metadata:  encodedMessage,
        }
 
-       return broadcast(project, event, false)
+       return s.broadcast(project, event, false)
 }
 
 // Forward to the local events dispatcher an event received from another node.
-func Forward(id int64, event api.Event) {
+func (s *Server) Forward(id int64, event api.Event) {
        if event.Type == "logging" {
                // Parse the message
                logEntry := api.EventLogging{}
@@ -159,40 +82,25 @@ func Forward(id int64, event api.Event) {
                        return
                }
 
-               if !debug && logEntry.Level == "dbug" {
+               if !s.debug && logEntry.Level == "dbug" {
                        return
                }
 
-               if !debug && !verbose && logEntry.Level == "info" {
+               if !s.debug && !s.verbose && logEntry.Level == "info" {
                        return
                }
        }
 
-       err := broadcast("", event, true)
+       err := s.broadcast("", event, true)
        if err != nil {
                logger.Warnf("Failed to forward event from node %d: %v", id, 
err)
        }
 }
 
-func logContextMap(ctx []interface{}) map[string]string {
-       var key string
-       ctxMap := map[string]string{}
 
-       for _, entry := range ctx {
-               if key == "" {
-                       key = entry.(string)
-               } else {
-                       ctxMap[key] = fmt.Sprintf("%v", entry)
-                       key = ""
-               }
-       }
-
-       return ctxMap
-}
-
-func broadcast(project string, event api.Event, isForward bool) error {
-       eventsLock.Lock()
-       listeners := eventListeners
+func (s *Server) broadcast(project string, event api.Event, isForward bool) 
error {
+       s.lock.Lock()
+       listeners := s.listeners
        for _, listener := range listeners {
                if project != "" && listener.project != "*" && project != 
listener.project {
                        continue
@@ -241,9 +149,9 @@ func broadcast(project string, event api.Event, isForward 
bool) error {
                        err = 
listener.connection.WriteMessage(websocket.TextMessage, body)
                        if err != nil {
                                // Remove the listener from the list
-                               eventsLock.Lock()
-                               delete(eventListeners, listener.id)
-                               eventsLock.Unlock()
+                               s.lock.Lock()
+                               delete(s.listeners, listener.id)
+                               s.lock.Unlock()
 
                                // Disconnect the listener
                                listener.connection.Close()
@@ -253,7 +161,65 @@ func broadcast(project string, event api.Event, isForward 
bool) error {
                        }
                }(listener, event)
        }
-       eventsLock.Unlock()
+       s.lock.Unlock()
 
        return nil
 }
+
+// Listener describes an event listener.
+type Listener struct {
+       project      string
+       connection   *websocket.Conn
+       messageTypes []string
+       active       chan bool
+       id           string
+       lock         sync.Mutex
+       done         bool
+       location     string
+
+       // If true, this listener won't get events forwarded from other
+       // nodes. It only used by listeners created internally by LXD nodes
+       // connecting to other LXD nodes to get their local events only.
+       noForward bool
+}
+
+// MessageTypes returns a list of message types the listener will be notified 
of.
+func (e *Listener) MessageTypes() []string {
+       return e.messageTypes
+}
+
+// IsDone returns true if the listener is done.
+func (e *Listener) IsDone() bool {
+       return e.done
+}
+
+// Connection returns the underlying websocket connection.
+func (e *Listener) Connection() *websocket.Conn {
+       return e.connection
+}
+
+// ID returns the listener ID.
+func (e *Listener) ID() string {
+       return e.id
+}
+
+// Wait waits for a message on its active channel, then returns.
+func (e *Listener) Wait() {
+       <-e.active
+}
+
+// Lock locks the internal mutex.
+func (e *Listener) Lock() {
+       e.lock.Lock()
+}
+
+// Unlock unlocks the internal mutex.
+func (e *Listener) Unlock() {
+       e.lock.Unlock()
+}
+
+// Deactivate deactivates the event listener.
+func (e *Listener) Deactivate() {
+       e.active <- false
+       e.done = true
+}
diff --git a/lxd/events/logging.go b/lxd/events/logging.go
new file mode 100644
index 0000000000..fed38c4a8a
--- /dev/null
+++ b/lxd/events/logging.go
@@ -0,0 +1,50 @@
+package events
+
+import (
+       "fmt"
+
+       log "github.com/lxc/lxd/shared/log15"
+
+       "github.com/lxc/lxd/shared/api"
+)
+
+// LoggingServer controls what server to use for messages coming from the 
logger.
+var LoggingServer *Server
+
+// Handler describes an event handler.
+type Handler struct {
+}
+
+// NewEventHandler creates and returns a new event handler.
+func NewEventHandler() *Handler {
+       return &Handler{}
+}
+
+// Log sends a new logging event.
+func (h Handler) Log(r *log.Record) error {
+       if LoggingServer == nil {
+               return fmt.Errorf("No configured event server for logging 
messages")
+       }
+
+       LoggingServer.Send("", "logging", api.EventLogging{
+               Message: r.Msg,
+               Level:   r.Lvl.String(),
+               Context: logContextMap(r.Ctx)})
+       return nil
+}
+
+func logContextMap(ctx []interface{}) map[string]string {
+       var key string
+       ctxMap := map[string]string{}
+
+       for _, entry := range ctx {
+               if key == "" {
+                       key = entry.(string)
+               } else {
+                       ctxMap[key] = fmt.Sprintf("%v", entry)
+                       key = ""
+               }
+       }
+
+       return ctxMap
+}

From c0e64888ed89a8931a42016fb97928e76a2d87d6 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgra...@ubuntu.com>
Date: Mon, 30 Sep 2019 15:56:49 -0400
Subject: [PATCH 3/5] lxd/events: Add event server
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgra...@ubuntu.com>
---
 lxd/events/events.go | 38 ++++++++++++++++++++++++--------------
 1 file changed, 24 insertions(+), 14 deletions(-)

diff --git a/lxd/events/events.go b/lxd/events/events.go
index 2d1b46e081..02396ddf4c 100644
--- a/lxd/events/events.go
+++ b/lxd/events/events.go
@@ -16,17 +16,28 @@ import (
 
 // Server represents an instance of an event server.
 type Server struct {
-       debug bool
+       debug   bool
        verbose bool
 
        listeners map[string]*Listener
-       lock sync.Mutex
+       lock      sync.Mutex
+}
+
+// NewServer returns a new event server.
+func NewServer(debug bool, verbose bool) *Server {
+       server := &Server{
+               debug:     debug,
+               verbose:   verbose,
+               listeners: map[string]*Listener{},
+       }
+
+       return server
 }
 
 // NewEventListener creates and returns a new event listener.
-func (s *Server) AddListener(project string, connection *websocket.Conn, 
messageTypes []string, location string, noForward bool) error {
+func (s *Server) AddListener(group string, connection *websocket.Conn, 
messageTypes []string, location string, noForward bool) (*Listener, error) {
        listener := &Listener{
-               project:      project,
+               group:        group,
                connection:   connection,
                messageTypes: messageTypes,
                location:     location,
@@ -39,18 +50,18 @@ func (s *Server) AddListener(project string, connection 
*websocket.Conn, message
        defer s.lock.Unlock()
 
        if s.listeners[listener.id] != nil {
-               return fmt.Errorf("A listener with id '%s' already exists", 
listener.id)
+               return nil, fmt.Errorf("A listener with id '%s' already 
exists", listener.id)
        }
 
        s.listeners[listener.id] = listener
 
-       return nil
+       return listener, nil
 }
 
 // SendLifecycle broadcasts a lifecycle event.
-func (s *Server) SendLifecycle(project, action, source string,
+func (s *Server) SendLifecycle(group, action, source string,
        context map[string]interface{}) error {
-       s.Send(project, "lifecycle", api.EventLifecycle{
+       s.Send(group, "lifecycle", api.EventLifecycle{
                Action:  action,
                Source:  source,
                Context: context})
@@ -58,7 +69,7 @@ func (s *Server) SendLifecycle(project, action, source string,
 }
 
 // Send broadcasts a custom event.
-func (s *Server) Send(project, eventType string, eventMessage interface{}) 
error {
+func (s *Server) Send(group, eventType string, eventMessage interface{}) error 
{
        encodedMessage, err := json.Marshal(eventMessage)
        if err != nil {
                return err
@@ -69,7 +80,7 @@ func (s *Server) Send(project, eventType string, eventMessage 
interface{}) error
                Metadata:  encodedMessage,
        }
 
-       return s.broadcast(project, event, false)
+       return s.broadcast(group, event, false)
 }
 
 // Forward to the local events dispatcher an event received from another node.
@@ -97,12 +108,11 @@ func (s *Server) Forward(id int64, event api.Event) {
        }
 }
 
-
-func (s *Server) broadcast(project string, event api.Event, isForward bool) 
error {
+func (s *Server) broadcast(group string, event api.Event, isForward bool) 
error {
        s.lock.Lock()
        listeners := s.listeners
        for _, listener := range listeners {
-               if project != "" && listener.project != "*" && project != 
listener.project {
+               if group != "" && listener.group != "*" && group != 
listener.group {
                        continue
                }
 
@@ -168,7 +178,7 @@ func (s *Server) broadcast(project string, event api.Event, 
isForward bool) erro
 
 // Listener describes an event listener.
 type Listener struct {
-       project      string
+       group        string
        connection   *websocket.Conn
        messageTypes []string
        active       chan bool

From 9993ebf503b805281ce79106d20131e6c53e02c8 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgra...@ubuntu.com>
Date: Mon, 30 Sep 2019 15:57:16 -0400
Subject: [PATCH 4/5] lxd/state: Carry event server instances
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgra...@ubuntu.com>
---
 lxd/state/state.go   | 34 ++++++++++++++++++++++++----------
 lxd/state/testing.go |  2 +-
 2 files changed, 25 insertions(+), 11 deletions(-)

diff --git a/lxd/state/state.go b/lxd/state/state.go
index 48ae76e10e..6ff4c17656 100644
--- a/lxd/state/state.go
+++ b/lxd/state/state.go
@@ -3,6 +3,7 @@ package state
 import (
        "github.com/lxc/lxd/lxd/db"
        "github.com/lxc/lxd/lxd/endpoints"
+       "github.com/lxc/lxd/lxd/events"
        "github.com/lxc/lxd/lxd/maas"
        "github.com/lxc/lxd/lxd/sys"
 )
@@ -11,21 +12,34 @@ import (
 // and the operating system. It's typically used by model entities such as
 // containers, volumes, etc. in order to perform changes.
 type State struct {
-       Node      *db.Node
-       Cluster   *db.Cluster
-       MAAS      *maas.Controller
-       OS        *sys.OS
+       // Databases
+       Node    *db.Node
+       Cluster *db.Cluster
+
+       // MAAS server
+       MAAS *maas.Controller
+
+       // OS access
+       OS *sys.OS
+
+       // LXD server
        Endpoints *endpoints.Endpoints
+
+       // Event server
+       DevlxdEvents *events.Server
+       Events       *events.Server
 }
 
 // NewState returns a new State object with the given database and operating
 // system components.
-func NewState(node *db.Node, cluster *db.Cluster, maas *maas.Controller, os 
*sys.OS, endpoints *endpoints.Endpoints) *State {
+func NewState(node *db.Node, cluster *db.Cluster, maas *maas.Controller, os 
*sys.OS, endpoints *endpoints.Endpoints, events *events.Server, devlxdEvents 
*events.Server) *State {
        return &State{
-               Node:      node,
-               Cluster:   cluster,
-               MAAS:      maas,
-               OS:        os,
-               Endpoints: endpoints,
+               Node:         node,
+               Cluster:      cluster,
+               MAAS:         maas,
+               OS:           os,
+               Endpoints:    endpoints,
+               DevlxdEvents: devlxdEvents,
+               Events:       events,
        }
 }
diff --git a/lxd/state/testing.go b/lxd/state/testing.go
index f4733458de..c53111c02f 100644
--- a/lxd/state/testing.go
+++ b/lxd/state/testing.go
@@ -23,7 +23,7 @@ func NewTestState(t *testing.T) (*State, func()) {
                osCleanup()
        }
 
-       state := NewState(node, cluster, nil, os, nil)
+       state := NewState(node, cluster, nil, os, nil, nil, nil)
 
        return state, cleanup
 }

From b2e1ddaf2907114479c826b0f1ba712f07582091 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgra...@ubuntu.com>
Date: Mon, 30 Sep 2019 15:57:35 -0400
Subject: [PATCH 5/5] lxd: Switch to new event structure
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgra...@ubuntu.com>
---
 lxd/container.go             |  3 +-
 lxd/container_lxc.go         | 27 +++++++-------
 lxd/daemon.go                | 13 ++++++-
 lxd/devlxd.go                | 72 +++---------------------------------
 lxd/events.go                |  8 ++--
 lxd/main.go                  |  3 --
 lxd/operations/operations.go | 21 +++++------
 7 files changed, 45 insertions(+), 102 deletions(-)

diff --git a/lxd/container.go b/lxd/container.go
index 832cc368bb..dce1e8f191 100644
--- a/lxd/container.go
+++ b/lxd/container.go
@@ -20,7 +20,6 @@ import (
        "github.com/lxc/lxd/lxd/db"
        "github.com/lxc/lxd/lxd/device"
        deviceConfig "github.com/lxc/lxd/lxd/device/config"
-       "github.com/lxc/lxd/lxd/events"
        "github.com/lxc/lxd/lxd/instance/instancetype"
        "github.com/lxc/lxd/lxd/operations"
        "github.com/lxc/lxd/lxd/state"
@@ -683,7 +682,7 @@ func containerCreateAsSnapshot(s *state.State, args 
db.InstanceArgs, sourceInsta
                os.RemoveAll(sourceInstance.StatePath())
        }
 
-       events.SendLifecycle(sourceInstance.Project(), 
"container-snapshot-created",
+       s.Events.SendLifecycle(sourceInstance.Project(), 
"container-snapshot-created",
                fmt.Sprintf("/1.0/containers/%s", sourceInstance.Name()),
                map[string]interface{}{
                        "snapshot_name": args.Name,
diff --git a/lxd/container_lxc.go b/lxd/container_lxc.go
index 2dbb6ebd72..f2c7195b69 100644
--- a/lxd/container_lxc.go
+++ b/lxd/container_lxc.go
@@ -31,7 +31,6 @@ import (
        "github.com/lxc/lxd/lxd/db/query"
        "github.com/lxc/lxd/lxd/device"
        deviceConfig "github.com/lxc/lxd/lxd/device/config"
-       "github.com/lxc/lxd/lxd/events"
        "github.com/lxc/lxd/lxd/instance/instancetype"
        "github.com/lxc/lxd/lxd/instance/operationlock"
        "github.com/lxc/lxd/lxd/maas"
@@ -433,7 +432,7 @@ func containerLXCCreate(s *state.State, args 
db.InstanceArgs) (container, error)
        }
 
        logger.Info("Created container", ctxMap)
-       events.SendLifecycle(c.project, "container-created",
+       c.state.Events.SendLifecycle(c.project, "container-created",
                fmt.Sprintf("/1.0/containers/%s", c.name), nil)
 
        return c, nil
@@ -2554,7 +2553,7 @@ func (c *containerLXC) Start(stateful bool) error {
        }
 
        logger.Info("Started container", ctxMap)
-       events.SendLifecycle(c.project, "container-started",
+       c.state.Events.SendLifecycle(c.project, "container-started",
                fmt.Sprintf("/1.0/containers/%s", c.name), nil)
 
        return nil
@@ -2722,7 +2721,7 @@ func (c *containerLXC) Stop(stateful bool) error {
 
                op.Done(nil)
                logger.Info("Stopped container", ctxMap)
-               events.SendLifecycle(c.project, "container-stopped",
+               c.state.Events.SendLifecycle(c.project, "container-stopped",
                        fmt.Sprintf("/1.0/containers/%s", c.name), nil)
                return nil
        } else if shared.PathExists(c.StatePath()) {
@@ -2778,7 +2777,7 @@ func (c *containerLXC) Stop(stateful bool) error {
        }
 
        logger.Info("Stopped container", ctxMap)
-       events.SendLifecycle(c.project, "container-stopped",
+       c.state.Events.SendLifecycle(c.project, "container-stopped",
                fmt.Sprintf("/1.0/containers/%s", c.name), nil)
 
        return nil
@@ -2839,7 +2838,7 @@ func (c *containerLXC) Shutdown(timeout time.Duration) 
error {
        }
 
        logger.Info("Shut down container", ctxMap)
-       events.SendLifecycle(c.project, "container-shutdown",
+       c.state.Events.SendLifecycle(c.project, "container-shutdown",
                fmt.Sprintf("/1.0/containers/%s", c.name), nil)
 
        return nil
@@ -3032,7 +3031,7 @@ func (c *containerLXC) Freeze() error {
        }
 
        logger.Info("Froze container", ctxMap)
-       events.SendLifecycle(c.project, "container-paused",
+       c.state.Events.SendLifecycle(c.project, "container-paused",
                fmt.Sprintf("/1.0/containers/%s", c.name), nil)
 
        return err
@@ -3077,7 +3076,7 @@ func (c *containerLXC) Unfreeze() error {
        }
 
        logger.Info("Unfroze container", ctxMap)
-       events.SendLifecycle(c.project, "container-resumed",
+       c.state.Events.SendLifecycle(c.project, "container-resumed",
                fmt.Sprintf("/1.0/containers/%s", c.name), nil)
 
        return err
@@ -3471,7 +3470,7 @@ func (c *containerLXC) Restore(sourceContainer Instance, 
stateful bool) error {
                return nil
        }
 
-       events.SendLifecycle(c.project, "container-snapshot-restored",
+       c.state.Events.SendLifecycle(c.project, "container-snapshot-restored",
                fmt.Sprintf("/1.0/containers/%s", c.name), 
map[string]interface{}{
                        "snapshot_name": c.name,
                })
@@ -3628,12 +3627,12 @@ func (c *containerLXC) Delete() error {
        logger.Info("Deleted container", ctxMap)
 
        if c.IsSnapshot() {
-               events.SendLifecycle(c.project, "container-snapshot-deleted",
+               c.state.Events.SendLifecycle(c.project, 
"container-snapshot-deleted",
                        fmt.Sprintf("/1.0/containers/%s", c.name), 
map[string]interface{}{
                                "snapshot_name": c.name,
                        })
        } else {
-               events.SendLifecycle(c.project, "container-deleted",
+               c.state.Events.SendLifecycle(c.project, "container-deleted",
                        fmt.Sprintf("/1.0/containers/%s", c.name), nil)
        }
 
@@ -3794,13 +3793,13 @@ func (c *containerLXC) Rename(newName string) error {
        logger.Info("Renamed container", ctxMap)
 
        if c.IsSnapshot() {
-               events.SendLifecycle(c.project, "container-snapshot-renamed",
+               c.state.Events.SendLifecycle(c.project, 
"container-snapshot-renamed",
                        fmt.Sprintf("/1.0/containers/%s", oldName), 
map[string]interface{}{
                                "new_name":      newName,
                                "snapshot_name": oldName,
                        })
        } else {
-               events.SendLifecycle(c.project, "container-renamed",
+               c.state.Events.SendLifecycle(c.project, "container-renamed",
                        fmt.Sprintf("/1.0/containers/%s", oldName), 
map[string]interface{}{
                                "new_name": newName,
                        })
@@ -4698,7 +4697,7 @@ func (c *containerLXC) Update(args db.InstanceArgs, 
userRequested bool) error {
                endpoint = fmt.Sprintf("/1.0/containers/%s", c.name)
        }
 
-       events.SendLifecycle(c.project, "container-updated", endpoint, nil)
+       c.state.Events.SendLifecycle(c.project, "container-updated", endpoint, 
nil)
 
        return nil
 }
diff --git a/lxd/daemon.go b/lxd/daemon.go
index dc50d1adaf..319a0a8839 100644
--- a/lxd/daemon.go
+++ b/lxd/daemon.go
@@ -62,6 +62,10 @@ type Daemon struct {
        readyChan    chan struct{} // Closed when LXD is fully ready
        shutdownChan chan struct{}
 
+       // Event servers
+       devlxdEvents *events.Server
+       events       *events.Server
+
        // Tasks registry for long-running background tasks
        // Keep clustering tasks separate as they cause a lot of CPU wakeups
        tasks        task.Group
@@ -132,8 +136,13 @@ func (m *IdentityClientWrapper) DeclaredIdentity(ctx 
context.Context, declared m
 
 // NewDaemon returns a new Daemon object with the given configuration.
 func NewDaemon(config *DaemonConfig, os *sys.OS) *Daemon {
+       lxdEvents := events.NewServer(debug, verbose)
+       devlxdEvents := events.NewServer(debug, verbose)
+
        return &Daemon{
                config:       config,
+               devlxdEvents: devlxdEvents,
+               events:       lxdEvents,
                os:           os,
                setupChan:    make(chan struct{}),
                readyChan:    make(chan struct{}),
@@ -333,7 +342,7 @@ func isJSONRequest(r *http.Request) bool {
 
 // State creates a new State instance linked to our internal db and os.
 func (d *Daemon) State() *state.State {
-       return state.NewState(d.db, d.cluster, d.maas, d.os, d.endpoints)
+       return state.NewState(d.db, d.cluster, d.maas, d.os, d.endpoints, 
d.events, d.devlxdEvents)
 }
 
 // UnixSocket returns the full path to the unix.socket file that this daemon is
@@ -924,7 +933,7 @@ func (d *Daemon) startClusterTasks() {
        d.clusterTasks.Add(cluster.HeartbeatTask(d.gateway))
 
        // Events
-       d.clusterTasks.Add(cluster.Events(d.endpoints, d.cluster, 
events.Forward))
+       d.clusterTasks.Add(cluster.Events(d.endpoints, d.cluster, 
d.events.Forward))
 
        // Auto-sync images across the cluster (daily)
        d.clusterTasks.Add(autoSyncImagesTask(d))
diff --git a/lxd/devlxd.go b/lxd/devlxd.go
index f5f2baab2d..7862f06aa9 100644
--- a/lxd/devlxd.go
+++ b/lxd/devlxd.go
@@ -1,7 +1,6 @@
 package main
 
 import (
-       "encoding/json"
        "fmt"
        "io/ioutil"
        "net"
@@ -16,10 +15,9 @@ import (
        "unsafe"
 
        "github.com/gorilla/mux"
-       "github.com/gorilla/websocket"
 
-       "github.com/lxc/lxd/lxd/events"
        "github.com/lxc/lxd/lxd/instance/instancetype"
+       "github.com/lxc/lxd/lxd/project"
        "github.com/lxc/lxd/lxd/state"
        "github.com/lxc/lxd/lxd/ucred"
        "github.com/lxc/lxd/lxd/util"
@@ -105,9 +103,6 @@ var devlxdMetadataGet = devLxdHandler{"/1.0/meta-data", 
func(d *Daemon, c contai
        return okResponse(fmt.Sprintf("#cloud-config\ninstance-id: 
%s\nlocal-hostname: %s\n%s", c.Name(), c.Name(), value), "raw")
 }}
 
-var devlxdEventsLock sync.Mutex
-var devlxdEventListeners map[int]map[string]*events.Listener = 
make(map[int]map[string]*events.Listener)
-
 var devlxdEventsGet = devLxdHandler{"/1.0/events", func(d *Daemon, c 
container, w http.ResponseWriter, r *http.Request) *devLxdResponse {
        typeStr := r.FormValue("type")
        if typeStr == "" {
@@ -119,18 +114,12 @@ var devlxdEventsGet = devLxdHandler{"/1.0/events", func(d 
*Daemon, c container,
                return &devLxdResponse{"internal server error", 
http.StatusInternalServerError, "raw"}
        }
 
-       listener := events.NewEventListener(c.Project(), conn, 
strings.Split(typeStr, ","), "", false)
-
-       devlxdEventsLock.Lock()
-       cid := c.Id()
-       _, ok := devlxdEventListeners[cid]
-       if !ok {
-               devlxdEventListeners[cid] = map[string]*events.Listener{}
+       listener, err := d.devlxdEvents.AddListener(strconv.Itoa(c.Id()), conn, 
strings.Split(typeStr, ","), "", false)
+       if err != nil {
+               return &devLxdResponse{"internal server error", 
http.StatusInternalServerError, "raw"}
        }
-       devlxdEventListeners[cid][listener.ID()] = listener
-       devlxdEventsLock.Unlock()
 
-       logger.Debugf("New container event listener for '%s': %s", c.Name(), 
listener.ID())
+       logger.Debugf("New container event listener for '%s': %s", 
project.Prefix(c.Project(), c.Name()), listener.ID())
 
        listener.Wait()
 
@@ -143,56 +132,7 @@ func devlxdEventSend(c container, eventType string, 
eventMessage interface{}) er
        event["timestamp"] = time.Now()
        event["metadata"] = eventMessage
 
-       body, err := json.Marshal(event)
-       if err != nil {
-               return err
-       }
-
-       devlxdEventsLock.Lock()
-       cid := c.Id()
-       listeners, ok := devlxdEventListeners[cid]
-       if !ok {
-               devlxdEventsLock.Unlock()
-               return nil
-       }
-
-       for _, listener := range listeners {
-               if !shared.StringInSlice(eventType, listener.MessageTypes()) {
-                       continue
-               }
-
-               go func(listener *events.Listener, body []byte) {
-                       // Check that the listener still exists
-                       if listener == nil {
-                               return
-                       }
-
-                       // Ensure there is only a single even going out at the 
time
-                       listener.Lock()
-                       defer listener.Unlock()
-
-                       // Make sure we're not done already
-                       if listener.IsDone() {
-                               return
-                       }
-
-                       err = 
listener.Connection().WriteMessage(websocket.TextMessage, body)
-                       if err != nil {
-                               // Remove the listener from the list
-                               devlxdEventsLock.Lock()
-                               delete(devlxdEventListeners[cid], listener.ID())
-                               devlxdEventsLock.Unlock()
-
-                               // Disconnect the listener
-                               listener.Connection().Close()
-                               listener.Deactivate()
-                               logger.Debugf("Disconnected container event 
listener for '%s': %s", c.Name(), listener.ID())
-                       }
-               }(listener, body)
-       }
-       devlxdEventsLock.Unlock()
-
-       return nil
+       return c.DaemonState().DevlxdEvents.Send(strconv.Itoa(c.Id()), 
eventType, eventMessage)
 }
 
 var handlers = []devLxdHandler{
diff --git a/lxd/events.go b/lxd/events.go
index 215a6646f7..14f7eb7374 100644
--- a/lxd/events.go
+++ b/lxd/events.go
@@ -5,7 +5,6 @@ import (
        "strings"
 
        "github.com/lxc/lxd/lxd/db"
-       "github.com/lxc/lxd/lxd/events"
        "github.com/lxc/lxd/lxd/response"
        "github.com/lxc/lxd/shared"
        "github.com/lxc/lxd/shared/logger"
@@ -58,9 +57,10 @@ func eventsSocket(d *Daemon, r *http.Request, w 
http.ResponseWriter) error {
        // If this request is an internal one initiated by another node wanting
        // to watch the events on this node, set the listener to broadcast only
        // local events.
-       listener := events.NewEventListener(project, c, strings.Split(typeStr, 
","), serverName, isClusterNotification(r))
-
-       events.AddListener(listener)
+       listener, err := d.events.AddListener(project, c, 
strings.Split(typeStr, ","), serverName, isClusterNotification(r))
+       if err != nil {
+               return err
+       }
 
        logger.Debugf("New event listener: %s", listener.ID())
 
diff --git a/lxd/main.go b/lxd/main.go
index ce49cd5dc6..d6f2b97378 100644
--- a/lxd/main.go
+++ b/lxd/main.go
@@ -42,9 +42,6 @@ func (c *cmdGlobal) Run(cmd *cobra.Command, args []string) 
error {
        debug = c.flagLogDebug
        verbose = c.flagLogVerbose
 
-       // Set debug and verbose for the events package
-       events.Init(debug, verbose)
-
        // Set debug for the operations package
        operations.Init(debug)
 
diff --git a/lxd/operations/operations.go b/lxd/operations/operations.go
index 38bb4e77fc..56fc45565f 100644
--- a/lxd/operations/operations.go
+++ b/lxd/operations/operations.go
@@ -7,7 +7,6 @@ import (
        "time"
 
        "github.com/lxc/lxd/lxd/db"
-       "github.com/lxc/lxd/lxd/events"
        "github.com/lxc/lxd/lxd/response"
        "github.com/lxc/lxd/lxd/state"
        "github.com/lxc/lxd/shared"
@@ -168,7 +167,7 @@ func OperationCreate(state *state.State, project string, 
opClass operationClass,
 
        logger.Debugf("New %s Operation: %s", op.class.String(), op.id)
        _, md, _ := op.Render()
-       events.Send(op.project, "Operation", md)
+       op.state.Events.Send(op.project, "Operation", md)
 
        return &op, nil
 }
@@ -232,7 +231,7 @@ func (op *Operation) Run() (chan error, error) {
                                logger.Debugf("Failure for %s operation: %s: 
%s", op.class.String(), op.id, err)
 
                                _, md, _ := op.Render()
-                               events.Send(op.project, "operation", md)
+                               op.state.Events.Send(op.project, "operation", 
md)
                                return
                        }
 
@@ -245,7 +244,7 @@ func (op *Operation) Run() (chan error, error) {
                        op.lock.Lock()
                        logger.Debugf("Success for %s operation: %s", 
op.class.String(), op.id)
                        _, md, _ := op.Render()
-                       events.Send(op.project, "operation", md)
+                       op.state.Events.Send(op.project, "operation", md)
                        op.lock.Unlock()
                }(op, chanRun)
        }
@@ -253,7 +252,7 @@ func (op *Operation) Run() (chan error, error) {
 
        logger.Debugf("Started %s operation: %s", op.class.String(), op.id)
        _, md, _ := op.Render()
-       events.Send(op.project, "operation", md)
+       op.state.Events.Send(op.project, "operation", md)
 
        return chanRun, nil
 }
@@ -287,7 +286,7 @@ func (op *Operation) Cancel() (chan error, error) {
 
                                logger.Debugf("Failed to cancel %s Operation: 
%s: %s", op.class.String(), op.id, err)
                                _, md, _ := op.Render()
-                               events.Send(op.project, "Operation", md)
+                               op.state.Events.Send(op.project, "Operation", 
md)
                                return
                        }
 
@@ -299,13 +298,13 @@ func (op *Operation) Cancel() (chan error, error) {
 
                        logger.Debugf("Cancelled %s Operation: %s", 
op.class.String(), op.id)
                        _, md, _ := op.Render()
-                       events.Send(op.project, "Operation", md)
+                       op.state.Events.Send(op.project, "Operation", md)
                }(op, oldStatus, chanCancel)
        }
 
        logger.Debugf("Cancelling %s Operation: %s", op.class.String(), op.id)
        _, md, _ := op.Render()
-       events.Send(op.project, "Operation", md)
+       op.state.Events.Send(op.project, "Operation", md)
 
        if op.canceler != nil {
                err := op.canceler.Cancel()
@@ -324,7 +323,7 @@ func (op *Operation) Cancel() (chan error, error) {
 
        logger.Debugf("Cancelled %s Operation: %s", op.class.String(), op.id)
        _, md, _ = op.Render()
-       events.Send(op.project, "Operation", md)
+       op.state.Events.Send(op.project, "Operation", md)
 
        return chanCancel, nil
 }
@@ -470,7 +469,7 @@ func (op *Operation) UpdateResources(opResources 
map[string][]string) error {
 
        logger.Debugf("Updated resources for %s Operation: %s", 
op.class.String(), op.id)
        _, md, _ := op.Render()
-       events.Send(op.project, "Operation", md)
+       op.state.Events.Send(op.project, "Operation", md)
 
        return nil
 }
@@ -498,7 +497,7 @@ func (op *Operation) UpdateMetadata(opMetadata interface{}) 
error {
 
        logger.Debugf("Updated metadata for %s Operation: %s", 
op.class.String(), op.id)
        _, md, _ := op.Render()
-       events.Send(op.project, "Operation", md)
+       op.state.Events.Send(op.project, "Operation", md)
 
        return nil
 }
_______________________________________________
lxc-devel mailing list
lxc-devel@lists.linuxcontainers.org
http://lists.linuxcontainers.org/listinfo/lxc-devel

Reply via email to