The following pull request was submitted through Github. It can be accessed and reviewed at: https://github.com/lxc/lxd/pull/6265
This e-mail was sent by the LXC bot, direct replies will not reach the author unless they happen to be subscribed to this list. === Description (from pull-request) === This allows multiple users of the events package with their own list of listeners and configuration.
From 7770da74f3d9199d7985e783bdbee215dfb95824 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgra...@ubuntu.com> Date: Mon, 30 Sep 2019 15:05:13 -0400 Subject: [PATCH 1/5] lxd/main_forkdns: Don't setup event logger MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Stéphane Graber <stgra...@ubuntu.com> --- lxd/main_forkdns.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lxd/main_forkdns.go b/lxd/main_forkdns.go index 0ba1d322b9..7a6833a174 100644 --- a/lxd/main_forkdns.go +++ b/lxd/main_forkdns.go @@ -14,7 +14,6 @@ import ( "github.com/spf13/cobra" "gopkg.in/fsnotify.v0" - "github.com/lxc/lxd/lxd/events" "github.com/lxc/lxd/shared" "github.com/lxc/lxd/shared/dnsutil" "github.com/lxc/lxd/shared/logger" @@ -443,7 +442,7 @@ func (c *cmdForkDNS) Run(cmd *cobra.Command, args []string) error { return fmt.Errorf("Missing required arguments") } - log, err := logging.GetLogger("lxd-forkdns", "", false, false, events.NewEventHandler()) + log, err := logging.GetLogger("lxd-forkdns", "", false, false, nil) if err != nil { return err } From 66495c83fbfab65a95068f6b00dcf8a60316d7ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgra...@ubuntu.com> Date: Mon, 30 Sep 2019 15:08:58 -0400 Subject: [PATCH 2/5] lxd/events: redesign MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Stéphane Graber <stgra...@ubuntu.com> --- lxd/events/events.go | 208 ++++++++++++++++++------------------------ lxd/events/logging.go | 50 ++++++++++ 2 files changed, 137 insertions(+), 121 deletions(-) create mode 100644 lxd/events/logging.go diff --git a/lxd/events/events.go b/lxd/events/events.go index f2ae3fd5c8..2d1b46e081 100644 --- a/lxd/events/events.go +++ b/lxd/events/events.go @@ -11,36 +11,21 @@ import ( "github.com/gorilla/websocket" "github.com/lxc/lxd/shared" "github.com/lxc/lxd/shared/api" - log "github.com/lxc/lxd/shared/log15" "github.com/lxc/lxd/shared/logger" ) -var debug bool -var verbose bool +// Server represents an instance of an event server. +type Server struct { + debug bool + verbose bool -var eventsLock sync.Mutex -var eventListeners map[string]*Listener = make(map[string]*Listener) - -// Listener describes an event listener. -type Listener struct { - project string - connection *websocket.Conn - messageTypes []string - active chan bool - id string - lock sync.Mutex - done bool - location string - - // If true, this listener won't get events forwarded from other - // nodes. It only used by listeners created internally by LXD nodes - // connecting to other LXD nodes to get their local events only. - noForward bool + listeners map[string]*Listener + lock sync.Mutex } // NewEventListener creates and returns a new event listener. -func NewEventListener(project string, connection *websocket.Conn, messageTypes []string, location string, noForward bool) *Listener { - return &Listener{ +func (s *Server) AddListener(project string, connection *websocket.Conn, messageTypes []string, location string, noForward bool) error { + listener := &Listener{ project: project, connection: connection, messageTypes: messageTypes, @@ -49,85 +34,23 @@ func NewEventListener(project string, connection *websocket.Conn, messageTypes [ active: make(chan bool, 1), id: uuid.NewRandom().String(), } -} - -// MessageTypes returns a list of message types the listener will be notified of. -func (e *Listener) MessageTypes() []string { - return e.messageTypes -} - -// IsDone returns true if the listener is done. -func (e *Listener) IsDone() bool { - return e.done -} - -// Connection returns the underlying websocket connection. -func (e *Listener) Connection() *websocket.Conn { - return e.connection -} -// ID returns the listener ID. -func (e *Listener) ID() string { - return e.id -} - -// Wait waits for a message on its active channel, then returns. -func (e *Listener) Wait() { - <-e.active -} - -// Lock locks the internal mutex. -func (e *Listener) Lock() { - e.lock.Lock() -} - -// Unlock unlocks the internal mutex. -func (e *Listener) Unlock() { - e.lock.Unlock() -} - -// Deactivate deactivates the event listener. -func (e *Listener) Deactivate() { - e.active <- false - e.done = true -} + s.lock.Lock() + defer s.lock.Unlock() -// Handler describes an event handler. -type Handler struct { -} + if s.listeners[listener.id] != nil { + return fmt.Errorf("A listener with id '%s' already exists", listener.id) + } -// NewEventHandler creates and returns a new event handler. -func NewEventHandler() *Handler { - return &Handler{} -} + s.listeners[listener.id] = listener -// Log sends a new logging event. -func (h Handler) Log(r *log.Record) error { - Send("", "logging", api.EventLogging{ - Message: r.Msg, - Level: r.Lvl.String(), - Context: logContextMap(r.Ctx)}) return nil } -// Init sets the debug and verbose flags. -func Init(d bool, v bool) { - debug = d - verbose = v -} - -// AddListener adds the given listener to the internal list of listeners which -// are notified when events are broadcasted. -func AddListener(listener *Listener) { - eventsLock.Lock() - eventListeners[listener.id] = listener - eventsLock.Unlock() -} - // SendLifecycle broadcasts a lifecycle event. -func SendLifecycle(project, action, source string, +func (s *Server) SendLifecycle(project, action, source string, context map[string]interface{}) error { - Send(project, "lifecycle", api.EventLifecycle{ + s.Send(project, "lifecycle", api.EventLifecycle{ Action: action, Source: source, Context: context}) @@ -135,7 +58,7 @@ func SendLifecycle(project, action, source string, } // Send broadcasts a custom event. -func Send(project, eventType string, eventMessage interface{}) error { +func (s *Server) Send(project, eventType string, eventMessage interface{}) error { encodedMessage, err := json.Marshal(eventMessage) if err != nil { return err @@ -146,11 +69,11 @@ func Send(project, eventType string, eventMessage interface{}) error { Metadata: encodedMessage, } - return broadcast(project, event, false) + return s.broadcast(project, event, false) } // Forward to the local events dispatcher an event received from another node. -func Forward(id int64, event api.Event) { +func (s *Server) Forward(id int64, event api.Event) { if event.Type == "logging" { // Parse the message logEntry := api.EventLogging{} @@ -159,40 +82,25 @@ func Forward(id int64, event api.Event) { return } - if !debug && logEntry.Level == "dbug" { + if !s.debug && logEntry.Level == "dbug" { return } - if !debug && !verbose && logEntry.Level == "info" { + if !s.debug && !s.verbose && logEntry.Level == "info" { return } } - err := broadcast("", event, true) + err := s.broadcast("", event, true) if err != nil { logger.Warnf("Failed to forward event from node %d: %v", id, err) } } -func logContextMap(ctx []interface{}) map[string]string { - var key string - ctxMap := map[string]string{} - for _, entry := range ctx { - if key == "" { - key = entry.(string) - } else { - ctxMap[key] = fmt.Sprintf("%v", entry) - key = "" - } - } - - return ctxMap -} - -func broadcast(project string, event api.Event, isForward bool) error { - eventsLock.Lock() - listeners := eventListeners +func (s *Server) broadcast(project string, event api.Event, isForward bool) error { + s.lock.Lock() + listeners := s.listeners for _, listener := range listeners { if project != "" && listener.project != "*" && project != listener.project { continue @@ -241,9 +149,9 @@ func broadcast(project string, event api.Event, isForward bool) error { err = listener.connection.WriteMessage(websocket.TextMessage, body) if err != nil { // Remove the listener from the list - eventsLock.Lock() - delete(eventListeners, listener.id) - eventsLock.Unlock() + s.lock.Lock() + delete(s.listeners, listener.id) + s.lock.Unlock() // Disconnect the listener listener.connection.Close() @@ -253,7 +161,65 @@ func broadcast(project string, event api.Event, isForward bool) error { } }(listener, event) } - eventsLock.Unlock() + s.lock.Unlock() return nil } + +// Listener describes an event listener. +type Listener struct { + project string + connection *websocket.Conn + messageTypes []string + active chan bool + id string + lock sync.Mutex + done bool + location string + + // If true, this listener won't get events forwarded from other + // nodes. It only used by listeners created internally by LXD nodes + // connecting to other LXD nodes to get their local events only. + noForward bool +} + +// MessageTypes returns a list of message types the listener will be notified of. +func (e *Listener) MessageTypes() []string { + return e.messageTypes +} + +// IsDone returns true if the listener is done. +func (e *Listener) IsDone() bool { + return e.done +} + +// Connection returns the underlying websocket connection. +func (e *Listener) Connection() *websocket.Conn { + return e.connection +} + +// ID returns the listener ID. +func (e *Listener) ID() string { + return e.id +} + +// Wait waits for a message on its active channel, then returns. +func (e *Listener) Wait() { + <-e.active +} + +// Lock locks the internal mutex. +func (e *Listener) Lock() { + e.lock.Lock() +} + +// Unlock unlocks the internal mutex. +func (e *Listener) Unlock() { + e.lock.Unlock() +} + +// Deactivate deactivates the event listener. +func (e *Listener) Deactivate() { + e.active <- false + e.done = true +} diff --git a/lxd/events/logging.go b/lxd/events/logging.go new file mode 100644 index 0000000000..fed38c4a8a --- /dev/null +++ b/lxd/events/logging.go @@ -0,0 +1,50 @@ +package events + +import ( + "fmt" + + log "github.com/lxc/lxd/shared/log15" + + "github.com/lxc/lxd/shared/api" +) + +// LoggingServer controls what server to use for messages coming from the logger. +var LoggingServer *Server + +// Handler describes an event handler. +type Handler struct { +} + +// NewEventHandler creates and returns a new event handler. +func NewEventHandler() *Handler { + return &Handler{} +} + +// Log sends a new logging event. +func (h Handler) Log(r *log.Record) error { + if LoggingServer == nil { + return fmt.Errorf("No configured event server for logging messages") + } + + LoggingServer.Send("", "logging", api.EventLogging{ + Message: r.Msg, + Level: r.Lvl.String(), + Context: logContextMap(r.Ctx)}) + return nil +} + +func logContextMap(ctx []interface{}) map[string]string { + var key string + ctxMap := map[string]string{} + + for _, entry := range ctx { + if key == "" { + key = entry.(string) + } else { + ctxMap[key] = fmt.Sprintf("%v", entry) + key = "" + } + } + + return ctxMap +} From c0e64888ed89a8931a42016fb97928e76a2d87d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgra...@ubuntu.com> Date: Mon, 30 Sep 2019 15:56:49 -0400 Subject: [PATCH 3/5] lxd/events: Add event server MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Stéphane Graber <stgra...@ubuntu.com> --- lxd/events/events.go | 38 ++++++++++++++++++++++++-------------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/lxd/events/events.go b/lxd/events/events.go index 2d1b46e081..02396ddf4c 100644 --- a/lxd/events/events.go +++ b/lxd/events/events.go @@ -16,17 +16,28 @@ import ( // Server represents an instance of an event server. type Server struct { - debug bool + debug bool verbose bool listeners map[string]*Listener - lock sync.Mutex + lock sync.Mutex +} + +// NewServer returns a new event server. +func NewServer(debug bool, verbose bool) *Server { + server := &Server{ + debug: debug, + verbose: verbose, + listeners: map[string]*Listener{}, + } + + return server } // NewEventListener creates and returns a new event listener. -func (s *Server) AddListener(project string, connection *websocket.Conn, messageTypes []string, location string, noForward bool) error { +func (s *Server) AddListener(group string, connection *websocket.Conn, messageTypes []string, location string, noForward bool) (*Listener, error) { listener := &Listener{ - project: project, + group: group, connection: connection, messageTypes: messageTypes, location: location, @@ -39,18 +50,18 @@ func (s *Server) AddListener(project string, connection *websocket.Conn, message defer s.lock.Unlock() if s.listeners[listener.id] != nil { - return fmt.Errorf("A listener with id '%s' already exists", listener.id) + return nil, fmt.Errorf("A listener with id '%s' already exists", listener.id) } s.listeners[listener.id] = listener - return nil + return listener, nil } // SendLifecycle broadcasts a lifecycle event. -func (s *Server) SendLifecycle(project, action, source string, +func (s *Server) SendLifecycle(group, action, source string, context map[string]interface{}) error { - s.Send(project, "lifecycle", api.EventLifecycle{ + s.Send(group, "lifecycle", api.EventLifecycle{ Action: action, Source: source, Context: context}) @@ -58,7 +69,7 @@ func (s *Server) SendLifecycle(project, action, source string, } // Send broadcasts a custom event. -func (s *Server) Send(project, eventType string, eventMessage interface{}) error { +func (s *Server) Send(group, eventType string, eventMessage interface{}) error { encodedMessage, err := json.Marshal(eventMessage) if err != nil { return err @@ -69,7 +80,7 @@ func (s *Server) Send(project, eventType string, eventMessage interface{}) error Metadata: encodedMessage, } - return s.broadcast(project, event, false) + return s.broadcast(group, event, false) } // Forward to the local events dispatcher an event received from another node. @@ -97,12 +108,11 @@ func (s *Server) Forward(id int64, event api.Event) { } } - -func (s *Server) broadcast(project string, event api.Event, isForward bool) error { +func (s *Server) broadcast(group string, event api.Event, isForward bool) error { s.lock.Lock() listeners := s.listeners for _, listener := range listeners { - if project != "" && listener.project != "*" && project != listener.project { + if group != "" && listener.group != "*" && group != listener.group { continue } @@ -168,7 +178,7 @@ func (s *Server) broadcast(project string, event api.Event, isForward bool) erro // Listener describes an event listener. type Listener struct { - project string + group string connection *websocket.Conn messageTypes []string active chan bool From 9993ebf503b805281ce79106d20131e6c53e02c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgra...@ubuntu.com> Date: Mon, 30 Sep 2019 15:57:16 -0400 Subject: [PATCH 4/5] lxd/state: Carry event server instances MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Stéphane Graber <stgra...@ubuntu.com> --- lxd/state/state.go | 34 ++++++++++++++++++++++++---------- lxd/state/testing.go | 2 +- 2 files changed, 25 insertions(+), 11 deletions(-) diff --git a/lxd/state/state.go b/lxd/state/state.go index 48ae76e10e..6ff4c17656 100644 --- a/lxd/state/state.go +++ b/lxd/state/state.go @@ -3,6 +3,7 @@ package state import ( "github.com/lxc/lxd/lxd/db" "github.com/lxc/lxd/lxd/endpoints" + "github.com/lxc/lxd/lxd/events" "github.com/lxc/lxd/lxd/maas" "github.com/lxc/lxd/lxd/sys" ) @@ -11,21 +12,34 @@ import ( // and the operating system. It's typically used by model entities such as // containers, volumes, etc. in order to perform changes. type State struct { - Node *db.Node - Cluster *db.Cluster - MAAS *maas.Controller - OS *sys.OS + // Databases + Node *db.Node + Cluster *db.Cluster + + // MAAS server + MAAS *maas.Controller + + // OS access + OS *sys.OS + + // LXD server Endpoints *endpoints.Endpoints + + // Event server + DevlxdEvents *events.Server + Events *events.Server } // NewState returns a new State object with the given database and operating // system components. -func NewState(node *db.Node, cluster *db.Cluster, maas *maas.Controller, os *sys.OS, endpoints *endpoints.Endpoints) *State { +func NewState(node *db.Node, cluster *db.Cluster, maas *maas.Controller, os *sys.OS, endpoints *endpoints.Endpoints, events *events.Server, devlxdEvents *events.Server) *State { return &State{ - Node: node, - Cluster: cluster, - MAAS: maas, - OS: os, - Endpoints: endpoints, + Node: node, + Cluster: cluster, + MAAS: maas, + OS: os, + Endpoints: endpoints, + DevlxdEvents: devlxdEvents, + Events: events, } } diff --git a/lxd/state/testing.go b/lxd/state/testing.go index f4733458de..c53111c02f 100644 --- a/lxd/state/testing.go +++ b/lxd/state/testing.go @@ -23,7 +23,7 @@ func NewTestState(t *testing.T) (*State, func()) { osCleanup() } - state := NewState(node, cluster, nil, os, nil) + state := NewState(node, cluster, nil, os, nil, nil, nil) return state, cleanup } From b2e1ddaf2907114479c826b0f1ba712f07582091 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgra...@ubuntu.com> Date: Mon, 30 Sep 2019 15:57:35 -0400 Subject: [PATCH 5/5] lxd: Switch to new event structure MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Stéphane Graber <stgra...@ubuntu.com> --- lxd/container.go | 3 +- lxd/container_lxc.go | 27 +++++++------- lxd/daemon.go | 13 ++++++- lxd/devlxd.go | 72 +++--------------------------------- lxd/events.go | 8 ++-- lxd/main.go | 3 -- lxd/operations/operations.go | 21 +++++------ 7 files changed, 45 insertions(+), 102 deletions(-) diff --git a/lxd/container.go b/lxd/container.go index 832cc368bb..dce1e8f191 100644 --- a/lxd/container.go +++ b/lxd/container.go @@ -20,7 +20,6 @@ import ( "github.com/lxc/lxd/lxd/db" "github.com/lxc/lxd/lxd/device" deviceConfig "github.com/lxc/lxd/lxd/device/config" - "github.com/lxc/lxd/lxd/events" "github.com/lxc/lxd/lxd/instance/instancetype" "github.com/lxc/lxd/lxd/operations" "github.com/lxc/lxd/lxd/state" @@ -683,7 +682,7 @@ func containerCreateAsSnapshot(s *state.State, args db.InstanceArgs, sourceInsta os.RemoveAll(sourceInstance.StatePath()) } - events.SendLifecycle(sourceInstance.Project(), "container-snapshot-created", + s.Events.SendLifecycle(sourceInstance.Project(), "container-snapshot-created", fmt.Sprintf("/1.0/containers/%s", sourceInstance.Name()), map[string]interface{}{ "snapshot_name": args.Name, diff --git a/lxd/container_lxc.go b/lxd/container_lxc.go index 2dbb6ebd72..f2c7195b69 100644 --- a/lxd/container_lxc.go +++ b/lxd/container_lxc.go @@ -31,7 +31,6 @@ import ( "github.com/lxc/lxd/lxd/db/query" "github.com/lxc/lxd/lxd/device" deviceConfig "github.com/lxc/lxd/lxd/device/config" - "github.com/lxc/lxd/lxd/events" "github.com/lxc/lxd/lxd/instance/instancetype" "github.com/lxc/lxd/lxd/instance/operationlock" "github.com/lxc/lxd/lxd/maas" @@ -433,7 +432,7 @@ func containerLXCCreate(s *state.State, args db.InstanceArgs) (container, error) } logger.Info("Created container", ctxMap) - events.SendLifecycle(c.project, "container-created", + c.state.Events.SendLifecycle(c.project, "container-created", fmt.Sprintf("/1.0/containers/%s", c.name), nil) return c, nil @@ -2554,7 +2553,7 @@ func (c *containerLXC) Start(stateful bool) error { } logger.Info("Started container", ctxMap) - events.SendLifecycle(c.project, "container-started", + c.state.Events.SendLifecycle(c.project, "container-started", fmt.Sprintf("/1.0/containers/%s", c.name), nil) return nil @@ -2722,7 +2721,7 @@ func (c *containerLXC) Stop(stateful bool) error { op.Done(nil) logger.Info("Stopped container", ctxMap) - events.SendLifecycle(c.project, "container-stopped", + c.state.Events.SendLifecycle(c.project, "container-stopped", fmt.Sprintf("/1.0/containers/%s", c.name), nil) return nil } else if shared.PathExists(c.StatePath()) { @@ -2778,7 +2777,7 @@ func (c *containerLXC) Stop(stateful bool) error { } logger.Info("Stopped container", ctxMap) - events.SendLifecycle(c.project, "container-stopped", + c.state.Events.SendLifecycle(c.project, "container-stopped", fmt.Sprintf("/1.0/containers/%s", c.name), nil) return nil @@ -2839,7 +2838,7 @@ func (c *containerLXC) Shutdown(timeout time.Duration) error { } logger.Info("Shut down container", ctxMap) - events.SendLifecycle(c.project, "container-shutdown", + c.state.Events.SendLifecycle(c.project, "container-shutdown", fmt.Sprintf("/1.0/containers/%s", c.name), nil) return nil @@ -3032,7 +3031,7 @@ func (c *containerLXC) Freeze() error { } logger.Info("Froze container", ctxMap) - events.SendLifecycle(c.project, "container-paused", + c.state.Events.SendLifecycle(c.project, "container-paused", fmt.Sprintf("/1.0/containers/%s", c.name), nil) return err @@ -3077,7 +3076,7 @@ func (c *containerLXC) Unfreeze() error { } logger.Info("Unfroze container", ctxMap) - events.SendLifecycle(c.project, "container-resumed", + c.state.Events.SendLifecycle(c.project, "container-resumed", fmt.Sprintf("/1.0/containers/%s", c.name), nil) return err @@ -3471,7 +3470,7 @@ func (c *containerLXC) Restore(sourceContainer Instance, stateful bool) error { return nil } - events.SendLifecycle(c.project, "container-snapshot-restored", + c.state.Events.SendLifecycle(c.project, "container-snapshot-restored", fmt.Sprintf("/1.0/containers/%s", c.name), map[string]interface{}{ "snapshot_name": c.name, }) @@ -3628,12 +3627,12 @@ func (c *containerLXC) Delete() error { logger.Info("Deleted container", ctxMap) if c.IsSnapshot() { - events.SendLifecycle(c.project, "container-snapshot-deleted", + c.state.Events.SendLifecycle(c.project, "container-snapshot-deleted", fmt.Sprintf("/1.0/containers/%s", c.name), map[string]interface{}{ "snapshot_name": c.name, }) } else { - events.SendLifecycle(c.project, "container-deleted", + c.state.Events.SendLifecycle(c.project, "container-deleted", fmt.Sprintf("/1.0/containers/%s", c.name), nil) } @@ -3794,13 +3793,13 @@ func (c *containerLXC) Rename(newName string) error { logger.Info("Renamed container", ctxMap) if c.IsSnapshot() { - events.SendLifecycle(c.project, "container-snapshot-renamed", + c.state.Events.SendLifecycle(c.project, "container-snapshot-renamed", fmt.Sprintf("/1.0/containers/%s", oldName), map[string]interface{}{ "new_name": newName, "snapshot_name": oldName, }) } else { - events.SendLifecycle(c.project, "container-renamed", + c.state.Events.SendLifecycle(c.project, "container-renamed", fmt.Sprintf("/1.0/containers/%s", oldName), map[string]interface{}{ "new_name": newName, }) @@ -4698,7 +4697,7 @@ func (c *containerLXC) Update(args db.InstanceArgs, userRequested bool) error { endpoint = fmt.Sprintf("/1.0/containers/%s", c.name) } - events.SendLifecycle(c.project, "container-updated", endpoint, nil) + c.state.Events.SendLifecycle(c.project, "container-updated", endpoint, nil) return nil } diff --git a/lxd/daemon.go b/lxd/daemon.go index dc50d1adaf..319a0a8839 100644 --- a/lxd/daemon.go +++ b/lxd/daemon.go @@ -62,6 +62,10 @@ type Daemon struct { readyChan chan struct{} // Closed when LXD is fully ready shutdownChan chan struct{} + // Event servers + devlxdEvents *events.Server + events *events.Server + // Tasks registry for long-running background tasks // Keep clustering tasks separate as they cause a lot of CPU wakeups tasks task.Group @@ -132,8 +136,13 @@ func (m *IdentityClientWrapper) DeclaredIdentity(ctx context.Context, declared m // NewDaemon returns a new Daemon object with the given configuration. func NewDaemon(config *DaemonConfig, os *sys.OS) *Daemon { + lxdEvents := events.NewServer(debug, verbose) + devlxdEvents := events.NewServer(debug, verbose) + return &Daemon{ config: config, + devlxdEvents: devlxdEvents, + events: lxdEvents, os: os, setupChan: make(chan struct{}), readyChan: make(chan struct{}), @@ -333,7 +342,7 @@ func isJSONRequest(r *http.Request) bool { // State creates a new State instance linked to our internal db and os. func (d *Daemon) State() *state.State { - return state.NewState(d.db, d.cluster, d.maas, d.os, d.endpoints) + return state.NewState(d.db, d.cluster, d.maas, d.os, d.endpoints, d.events, d.devlxdEvents) } // UnixSocket returns the full path to the unix.socket file that this daemon is @@ -924,7 +933,7 @@ func (d *Daemon) startClusterTasks() { d.clusterTasks.Add(cluster.HeartbeatTask(d.gateway)) // Events - d.clusterTasks.Add(cluster.Events(d.endpoints, d.cluster, events.Forward)) + d.clusterTasks.Add(cluster.Events(d.endpoints, d.cluster, d.events.Forward)) // Auto-sync images across the cluster (daily) d.clusterTasks.Add(autoSyncImagesTask(d)) diff --git a/lxd/devlxd.go b/lxd/devlxd.go index f5f2baab2d..7862f06aa9 100644 --- a/lxd/devlxd.go +++ b/lxd/devlxd.go @@ -1,7 +1,6 @@ package main import ( - "encoding/json" "fmt" "io/ioutil" "net" @@ -16,10 +15,9 @@ import ( "unsafe" "github.com/gorilla/mux" - "github.com/gorilla/websocket" - "github.com/lxc/lxd/lxd/events" "github.com/lxc/lxd/lxd/instance/instancetype" + "github.com/lxc/lxd/lxd/project" "github.com/lxc/lxd/lxd/state" "github.com/lxc/lxd/lxd/ucred" "github.com/lxc/lxd/lxd/util" @@ -105,9 +103,6 @@ var devlxdMetadataGet = devLxdHandler{"/1.0/meta-data", func(d *Daemon, c contai return okResponse(fmt.Sprintf("#cloud-config\ninstance-id: %s\nlocal-hostname: %s\n%s", c.Name(), c.Name(), value), "raw") }} -var devlxdEventsLock sync.Mutex -var devlxdEventListeners map[int]map[string]*events.Listener = make(map[int]map[string]*events.Listener) - var devlxdEventsGet = devLxdHandler{"/1.0/events", func(d *Daemon, c container, w http.ResponseWriter, r *http.Request) *devLxdResponse { typeStr := r.FormValue("type") if typeStr == "" { @@ -119,18 +114,12 @@ var devlxdEventsGet = devLxdHandler{"/1.0/events", func(d *Daemon, c container, return &devLxdResponse{"internal server error", http.StatusInternalServerError, "raw"} } - listener := events.NewEventListener(c.Project(), conn, strings.Split(typeStr, ","), "", false) - - devlxdEventsLock.Lock() - cid := c.Id() - _, ok := devlxdEventListeners[cid] - if !ok { - devlxdEventListeners[cid] = map[string]*events.Listener{} + listener, err := d.devlxdEvents.AddListener(strconv.Itoa(c.Id()), conn, strings.Split(typeStr, ","), "", false) + if err != nil { + return &devLxdResponse{"internal server error", http.StatusInternalServerError, "raw"} } - devlxdEventListeners[cid][listener.ID()] = listener - devlxdEventsLock.Unlock() - logger.Debugf("New container event listener for '%s': %s", c.Name(), listener.ID()) + logger.Debugf("New container event listener for '%s': %s", project.Prefix(c.Project(), c.Name()), listener.ID()) listener.Wait() @@ -143,56 +132,7 @@ func devlxdEventSend(c container, eventType string, eventMessage interface{}) er event["timestamp"] = time.Now() event["metadata"] = eventMessage - body, err := json.Marshal(event) - if err != nil { - return err - } - - devlxdEventsLock.Lock() - cid := c.Id() - listeners, ok := devlxdEventListeners[cid] - if !ok { - devlxdEventsLock.Unlock() - return nil - } - - for _, listener := range listeners { - if !shared.StringInSlice(eventType, listener.MessageTypes()) { - continue - } - - go func(listener *events.Listener, body []byte) { - // Check that the listener still exists - if listener == nil { - return - } - - // Ensure there is only a single even going out at the time - listener.Lock() - defer listener.Unlock() - - // Make sure we're not done already - if listener.IsDone() { - return - } - - err = listener.Connection().WriteMessage(websocket.TextMessage, body) - if err != nil { - // Remove the listener from the list - devlxdEventsLock.Lock() - delete(devlxdEventListeners[cid], listener.ID()) - devlxdEventsLock.Unlock() - - // Disconnect the listener - listener.Connection().Close() - listener.Deactivate() - logger.Debugf("Disconnected container event listener for '%s': %s", c.Name(), listener.ID()) - } - }(listener, body) - } - devlxdEventsLock.Unlock() - - return nil + return c.DaemonState().DevlxdEvents.Send(strconv.Itoa(c.Id()), eventType, eventMessage) } var handlers = []devLxdHandler{ diff --git a/lxd/events.go b/lxd/events.go index 215a6646f7..14f7eb7374 100644 --- a/lxd/events.go +++ b/lxd/events.go @@ -5,7 +5,6 @@ import ( "strings" "github.com/lxc/lxd/lxd/db" - "github.com/lxc/lxd/lxd/events" "github.com/lxc/lxd/lxd/response" "github.com/lxc/lxd/shared" "github.com/lxc/lxd/shared/logger" @@ -58,9 +57,10 @@ func eventsSocket(d *Daemon, r *http.Request, w http.ResponseWriter) error { // If this request is an internal one initiated by another node wanting // to watch the events on this node, set the listener to broadcast only // local events. - listener := events.NewEventListener(project, c, strings.Split(typeStr, ","), serverName, isClusterNotification(r)) - - events.AddListener(listener) + listener, err := d.events.AddListener(project, c, strings.Split(typeStr, ","), serverName, isClusterNotification(r)) + if err != nil { + return err + } logger.Debugf("New event listener: %s", listener.ID()) diff --git a/lxd/main.go b/lxd/main.go index ce49cd5dc6..d6f2b97378 100644 --- a/lxd/main.go +++ b/lxd/main.go @@ -42,9 +42,6 @@ func (c *cmdGlobal) Run(cmd *cobra.Command, args []string) error { debug = c.flagLogDebug verbose = c.flagLogVerbose - // Set debug and verbose for the events package - events.Init(debug, verbose) - // Set debug for the operations package operations.Init(debug) diff --git a/lxd/operations/operations.go b/lxd/operations/operations.go index 38bb4e77fc..56fc45565f 100644 --- a/lxd/operations/operations.go +++ b/lxd/operations/operations.go @@ -7,7 +7,6 @@ import ( "time" "github.com/lxc/lxd/lxd/db" - "github.com/lxc/lxd/lxd/events" "github.com/lxc/lxd/lxd/response" "github.com/lxc/lxd/lxd/state" "github.com/lxc/lxd/shared" @@ -168,7 +167,7 @@ func OperationCreate(state *state.State, project string, opClass operationClass, logger.Debugf("New %s Operation: %s", op.class.String(), op.id) _, md, _ := op.Render() - events.Send(op.project, "Operation", md) + op.state.Events.Send(op.project, "Operation", md) return &op, nil } @@ -232,7 +231,7 @@ func (op *Operation) Run() (chan error, error) { logger.Debugf("Failure for %s operation: %s: %s", op.class.String(), op.id, err) _, md, _ := op.Render() - events.Send(op.project, "operation", md) + op.state.Events.Send(op.project, "operation", md) return } @@ -245,7 +244,7 @@ func (op *Operation) Run() (chan error, error) { op.lock.Lock() logger.Debugf("Success for %s operation: %s", op.class.String(), op.id) _, md, _ := op.Render() - events.Send(op.project, "operation", md) + op.state.Events.Send(op.project, "operation", md) op.lock.Unlock() }(op, chanRun) } @@ -253,7 +252,7 @@ func (op *Operation) Run() (chan error, error) { logger.Debugf("Started %s operation: %s", op.class.String(), op.id) _, md, _ := op.Render() - events.Send(op.project, "operation", md) + op.state.Events.Send(op.project, "operation", md) return chanRun, nil } @@ -287,7 +286,7 @@ func (op *Operation) Cancel() (chan error, error) { logger.Debugf("Failed to cancel %s Operation: %s: %s", op.class.String(), op.id, err) _, md, _ := op.Render() - events.Send(op.project, "Operation", md) + op.state.Events.Send(op.project, "Operation", md) return } @@ -299,13 +298,13 @@ func (op *Operation) Cancel() (chan error, error) { logger.Debugf("Cancelled %s Operation: %s", op.class.String(), op.id) _, md, _ := op.Render() - events.Send(op.project, "Operation", md) + op.state.Events.Send(op.project, "Operation", md) }(op, oldStatus, chanCancel) } logger.Debugf("Cancelling %s Operation: %s", op.class.String(), op.id) _, md, _ := op.Render() - events.Send(op.project, "Operation", md) + op.state.Events.Send(op.project, "Operation", md) if op.canceler != nil { err := op.canceler.Cancel() @@ -324,7 +323,7 @@ func (op *Operation) Cancel() (chan error, error) { logger.Debugf("Cancelled %s Operation: %s", op.class.String(), op.id) _, md, _ = op.Render() - events.Send(op.project, "Operation", md) + op.state.Events.Send(op.project, "Operation", md) return chanCancel, nil } @@ -470,7 +469,7 @@ func (op *Operation) UpdateResources(opResources map[string][]string) error { logger.Debugf("Updated resources for %s Operation: %s", op.class.String(), op.id) _, md, _ := op.Render() - events.Send(op.project, "Operation", md) + op.state.Events.Send(op.project, "Operation", md) return nil } @@ -498,7 +497,7 @@ func (op *Operation) UpdateMetadata(opMetadata interface{}) error { logger.Debugf("Updated metadata for %s Operation: %s", op.class.String(), op.id) _, md, _ := op.Render() - events.Send(op.project, "Operation", md) + op.state.Events.Send(op.project, "Operation", md) return nil }
_______________________________________________ lxc-devel mailing list lxc-devel@lists.linuxcontainers.org http://lists.linuxcontainers.org/listinfo/lxc-devel