The following pull request was submitted through Github. It can be accessed and reviewed at: https://github.com/lxc/lxd/pull/6236
This e-mail was sent by the LXC bot, direct replies will not reach the author unless they happen to be subscribed to this list. === Description (from pull-request) ===
From 5662fd6289f09760d1bc500023ebd9b1cab95c9a Mon Sep 17 00:00:00 2001 From: Thomas Hipp <thomas.h...@canonical.com> Date: Wed, 25 Sep 2019 09:31:45 +0200 Subject: [PATCH 1/2] lxd: Move events to new events package This moves part of the event handling into a separate event package. Signed-off-by: Thomas Hipp <thomas.h...@canonical.com> --- lxd/container.go | 3 +- lxd/container_lxc.go | 27 ++--- lxd/daemon.go | 3 +- lxd/devlxd.go | 41 +++---- lxd/events.go | 191 +------------------------------ lxd/events/events.go | 259 +++++++++++++++++++++++++++++++++++++++++++ lxd/main.go | 7 +- lxd/main_forkdns.go | 3 +- lxd/operations.go | 21 ++-- 9 files changed, 317 insertions(+), 238 deletions(-) create mode 100644 lxd/events/events.go diff --git a/lxd/container.go b/lxd/container.go index 45c7687dba..722db5fcef 100644 --- a/lxd/container.go +++ b/lxd/container.go @@ -20,6 +20,7 @@ import ( "github.com/lxc/lxd/lxd/db" "github.com/lxc/lxd/lxd/device" "github.com/lxc/lxd/lxd/device/config" + "github.com/lxc/lxd/lxd/events" "github.com/lxc/lxd/lxd/instance" "github.com/lxc/lxd/lxd/state" "github.com/lxc/lxd/lxd/sys" @@ -681,7 +682,7 @@ func containerCreateAsSnapshot(s *state.State, args db.ContainerArgs, sourceInst os.RemoveAll(sourceInstance.StatePath()) } - eventSendLifecycle(sourceInstance.Project(), "container-snapshot-created", + events.SendLifecycle(sourceInstance.Project(), "container-snapshot-created", fmt.Sprintf("/1.0/containers/%s", sourceInstance.Name()), map[string]interface{}{ "snapshot_name": args.Name, diff --git a/lxd/container_lxc.go b/lxd/container_lxc.go index 67e2e5f2d6..c75d4774da 100644 --- a/lxd/container_lxc.go +++ b/lxd/container_lxc.go @@ -30,6 +30,7 @@ import ( "github.com/lxc/lxd/lxd/db/query" "github.com/lxc/lxd/lxd/device" "github.com/lxc/lxd/lxd/device/config" + "github.com/lxc/lxd/lxd/events" "github.com/lxc/lxd/lxd/instance" "github.com/lxc/lxd/lxd/maas" "github.com/lxc/lxd/lxd/project" @@ -517,7 +518,7 @@ func containerLXCCreate(s *state.State, args db.ContainerArgs) (container, error } logger.Info("Created container", ctxMap) - eventSendLifecycle(c.project, "container-created", + events.SendLifecycle(c.project, "container-created", fmt.Sprintf("/1.0/containers/%s", c.name), nil) return c, nil @@ -2688,7 +2689,7 @@ func (c *containerLXC) Start(stateful bool) error { } logger.Info("Started container", ctxMap) - eventSendLifecycle(c.project, "container-started", + events.SendLifecycle(c.project, "container-started", fmt.Sprintf("/1.0/containers/%s", c.name), nil) return nil @@ -2856,7 +2857,7 @@ func (c *containerLXC) Stop(stateful bool) error { op.Done(nil) logger.Info("Stopped container", ctxMap) - eventSendLifecycle(c.project, "container-stopped", + events.SendLifecycle(c.project, "container-stopped", fmt.Sprintf("/1.0/containers/%s", c.name), nil) return nil } else if shared.PathExists(c.StatePath()) { @@ -2912,7 +2913,7 @@ func (c *containerLXC) Stop(stateful bool) error { } logger.Info("Stopped container", ctxMap) - eventSendLifecycle(c.project, "container-stopped", + events.SendLifecycle(c.project, "container-stopped", fmt.Sprintf("/1.0/containers/%s", c.name), nil) return nil @@ -2973,7 +2974,7 @@ func (c *containerLXC) Shutdown(timeout time.Duration) error { } logger.Info("Shut down container", ctxMap) - eventSendLifecycle(c.project, "container-shutdown", + events.SendLifecycle(c.project, "container-shutdown", fmt.Sprintf("/1.0/containers/%s", c.name), nil) return nil @@ -3166,7 +3167,7 @@ func (c *containerLXC) Freeze() error { } logger.Info("Froze container", ctxMap) - eventSendLifecycle(c.project, "container-paused", + events.SendLifecycle(c.project, "container-paused", fmt.Sprintf("/1.0/containers/%s", c.name), nil) return err @@ -3211,7 +3212,7 @@ func (c *containerLXC) Unfreeze() error { } logger.Info("Unfroze container", ctxMap) - eventSendLifecycle(c.project, "container-resumed", + events.SendLifecycle(c.project, "container-resumed", fmt.Sprintf("/1.0/containers/%s", c.name), nil) return err @@ -3605,7 +3606,7 @@ func (c *containerLXC) Restore(sourceContainer Instance, stateful bool) error { return nil } - eventSendLifecycle(c.project, "container-snapshot-restored", + events.SendLifecycle(c.project, "container-snapshot-restored", fmt.Sprintf("/1.0/containers/%s", c.name), map[string]interface{}{ "snapshot_name": c.name, }) @@ -3762,12 +3763,12 @@ func (c *containerLXC) Delete() error { logger.Info("Deleted container", ctxMap) if c.IsSnapshot() { - eventSendLifecycle(c.project, "container-snapshot-deleted", + events.SendLifecycle(c.project, "container-snapshot-deleted", fmt.Sprintf("/1.0/containers/%s", c.name), map[string]interface{}{ "snapshot_name": c.name, }) } else { - eventSendLifecycle(c.project, "container-deleted", + events.SendLifecycle(c.project, "container-deleted", fmt.Sprintf("/1.0/containers/%s", c.name), nil) } @@ -3928,13 +3929,13 @@ func (c *containerLXC) Rename(newName string) error { logger.Info("Renamed container", ctxMap) if c.IsSnapshot() { - eventSendLifecycle(c.project, "container-snapshot-renamed", + events.SendLifecycle(c.project, "container-snapshot-renamed", fmt.Sprintf("/1.0/containers/%s", oldName), map[string]interface{}{ "new_name": newName, "snapshot_name": oldName, }) } else { - eventSendLifecycle(c.project, "container-renamed", + events.SendLifecycle(c.project, "container-renamed", fmt.Sprintf("/1.0/containers/%s", oldName), map[string]interface{}{ "new_name": newName, }) @@ -4832,7 +4833,7 @@ func (c *containerLXC) Update(args db.ContainerArgs, userRequested bool) error { endpoint = fmt.Sprintf("/1.0/containers/%s", c.name) } - eventSendLifecycle(c.project, "container-updated", endpoint, nil) + events.SendLifecycle(c.project, "container-updated", endpoint, nil) return nil } diff --git a/lxd/daemon.go b/lxd/daemon.go index d9a0f1d3a7..2d2d433ee5 100644 --- a/lxd/daemon.go +++ b/lxd/daemon.go @@ -32,6 +32,7 @@ import ( "github.com/lxc/lxd/lxd/db" "github.com/lxc/lxd/lxd/device" "github.com/lxc/lxd/lxd/endpoints" + "github.com/lxc/lxd/lxd/events" "github.com/lxc/lxd/lxd/maas" "github.com/lxc/lxd/lxd/node" "github.com/lxc/lxd/lxd/rbac" @@ -919,7 +920,7 @@ func (d *Daemon) startClusterTasks() { d.clusterTasks.Add(cluster.HeartbeatTask(d.gateway)) // Events - d.clusterTasks.Add(cluster.Events(d.endpoints, d.cluster, eventForward)) + d.clusterTasks.Add(cluster.Events(d.endpoints, d.cluster, events.Forward)) // Auto-sync images across the cluster (daily) d.clusterTasks.Add(autoSyncImagesTask(d)) diff --git a/lxd/devlxd.go b/lxd/devlxd.go index 7ceeb96685..60bbdae845 100644 --- a/lxd/devlxd.go +++ b/lxd/devlxd.go @@ -17,8 +17,8 @@ import ( "github.com/gorilla/mux" "github.com/gorilla/websocket" - "github.com/pborman/uuid" + "github.com/lxc/lxd/lxd/events" "github.com/lxc/lxd/lxd/instance" "github.com/lxc/lxd/lxd/util" "github.com/lxc/lxd/shared" @@ -104,7 +104,7 @@ var devlxdMetadataGet = devLxdHandler{"/1.0/meta-data", func(d *Daemon, c contai }} var devlxdEventsLock sync.Mutex -var devlxdEventListeners map[int]map[string]*eventListener = make(map[int]map[string]*eventListener) +var devlxdEventListeners map[int]map[string]*events.Listener = make(map[int]map[string]*events.Listener) var devlxdEventsGet = devLxdHandler{"/1.0/events", func(d *Daemon, c container, w http.ResponseWriter, r *http.Request) *devLxdResponse { typeStr := r.FormValue("type") @@ -117,26 +117,20 @@ var devlxdEventsGet = devLxdHandler{"/1.0/events", func(d *Daemon, c container, return &devLxdResponse{"internal server error", http.StatusInternalServerError, "raw"} } - listener := eventListener{ - project: c.Project(), - active: make(chan bool, 1), - connection: conn, - id: uuid.NewRandom().String(), - messageTypes: strings.Split(typeStr, ","), - } + listener := events.NewEventListener(c.Project(), conn, strings.Split(typeStr, ","), "", false) devlxdEventsLock.Lock() cid := c.Id() _, ok := devlxdEventListeners[cid] if !ok { - devlxdEventListeners[cid] = map[string]*eventListener{} + devlxdEventListeners[cid] = map[string]*events.Listener{} } - devlxdEventListeners[cid][listener.id] = &listener + devlxdEventListeners[cid][listener.ID()] = listener devlxdEventsLock.Unlock() - logger.Debugf("New container event listener for '%s': %s", c.Name(), listener.id) + logger.Debugf("New container event listener for '%s': %s", c.Name(), listener.ID()) - <-listener.active + listener.Wait() return &devLxdResponse{"websocket", http.StatusOK, "websocket"} }} @@ -161,37 +155,36 @@ func devlxdEventSend(c container, eventType string, eventMessage interface{}) er } for _, listener := range listeners { - if !shared.StringInSlice(eventType, listener.messageTypes) { + if !shared.StringInSlice(eventType, listener.MessageTypes()) { continue } - go func(listener *eventListener, body []byte) { + go func(listener *events.Listener, body []byte) { // Check that the listener still exists if listener == nil { return } // Ensure there is only a single even going out at the time - listener.lock.Lock() - defer listener.lock.Unlock() + listener.Lock() + defer listener.Unlock() // Make sure we're not done already - if listener.done { + if listener.IsDone() { return } - err = listener.connection.WriteMessage(websocket.TextMessage, body) + err = listener.Connection().WriteMessage(websocket.TextMessage, body) if err != nil { // Remove the listener from the list devlxdEventsLock.Lock() - delete(devlxdEventListeners[cid], listener.id) + delete(devlxdEventListeners[cid], listener.ID()) devlxdEventsLock.Unlock() // Disconnect the listener - listener.connection.Close() - listener.active <- false - listener.done = true - logger.Debugf("Disconnected container event listener for '%s': %s", c.Name(), listener.id) + listener.Connection().Close() + listener.Deactivate() + logger.Debugf("Disconnected container event listener for '%s': %s", c.Name(), listener.ID()) } }(listener, body) } diff --git a/lxd/events.go b/lxd/events.go index 8040d20a4a..471e56c1c4 100644 --- a/lxd/events.go +++ b/lxd/events.go @@ -1,20 +1,12 @@ package main import ( - "encoding/json" - "fmt" "net/http" "strings" - "sync" - "time" - - "github.com/gorilla/websocket" - log "github.com/lxc/lxd/shared/log15" - "github.com/pborman/uuid" "github.com/lxc/lxd/lxd/db" + "github.com/lxc/lxd/lxd/events" "github.com/lxc/lxd/shared" - "github.com/lxc/lxd/shared/api" "github.com/lxc/lxd/shared/logger" ) @@ -24,61 +16,6 @@ var eventsCmd = APIEndpoint{ Get: APIEndpointAction{Handler: eventsGet, AccessHandler: AllowAuthenticated}, } -type eventsHandler struct { -} - -func logContextMap(ctx []interface{}) map[string]string { - var key string - ctxMap := map[string]string{} - - for _, entry := range ctx { - if key == "" { - key = entry.(string) - } else { - ctxMap[key] = fmt.Sprintf("%v", entry) - key = "" - } - } - - return ctxMap -} - -func (h eventsHandler) Log(r *log.Record) error { - eventSend("", "logging", api.EventLogging{ - Message: r.Msg, - Level: r.Lvl.String(), - Context: logContextMap(r.Ctx)}) - return nil -} - -func eventSendLifecycle(project, action, source string, - context map[string]interface{}) error { - eventSend(project, "lifecycle", api.EventLifecycle{ - Action: action, - Source: source, - Context: context}) - return nil -} - -var eventsLock sync.Mutex -var eventListeners map[string]*eventListener = make(map[string]*eventListener) - -type eventListener struct { - project string - connection *websocket.Conn - messageTypes []string - active chan bool - id string - lock sync.Mutex - done bool - location string - - // If true, this listener won't get events forwarded from other - // nodes. It only used by listeners created internally by LXD nodes - // connecting to other LXD nodes to get their local events only. - noForward bool -} - type eventsServe struct { req *http.Request d *Daemon @@ -117,27 +54,16 @@ func eventsSocket(d *Daemon, r *http.Request, w http.ResponseWriter) error { return err } - listener := eventListener{ - project: project, - active: make(chan bool, 1), - connection: c, - id: uuid.NewRandom().String(), - messageTypes: strings.Split(typeStr, ","), - location: serverName, - } - // If this request is an internal one initiated by another node wanting // to watch the events on this node, set the listener to broadcast only // local events. - listener.noForward = isClusterNotification(r) + listener := events.NewEventListener(project, c, strings.Split(typeStr, ","), serverName, isClusterNotification(r)) - eventsLock.Lock() - eventListeners[listener.id] = &listener - eventsLock.Unlock() + events.AddListener(listener) - logger.Debugf("New event listener: %s", listener.id) + logger.Debugf("New event listener: %s", listener.ID()) - <-listener.active + listener.Wait() return nil } @@ -145,110 +71,3 @@ func eventsSocket(d *Daemon, r *http.Request, w http.ResponseWriter) error { func eventsGet(d *Daemon, r *http.Request) Response { return &eventsServe{req: r, d: d} } - -func eventSend(project, eventType string, eventMessage interface{}) error { - encodedMessage, err := json.Marshal(eventMessage) - if err != nil { - return err - } - event := api.Event{ - Type: eventType, - Timestamp: time.Now(), - Metadata: encodedMessage, - } - - return eventBroadcast(project, event, false) -} - -func eventBroadcast(project string, event api.Event, isForward bool) error { - eventsLock.Lock() - listeners := eventListeners - for _, listener := range listeners { - if project != "" && listener.project != "*" && project != listener.project { - continue - } - - if isForward && listener.noForward { - continue - } - - if !shared.StringInSlice(event.Type, listener.messageTypes) { - continue - } - - go func(listener *eventListener, event api.Event) { - // Check that the listener still exists - if listener == nil { - return - } - - // Ensure there is only a single even going out at the time - listener.lock.Lock() - defer listener.lock.Unlock() - - // Make sure we're not done already - if listener.done { - return - } - - // Set the Location to the expected serverName - if event.Location == "" { - eventCopy := api.Event{} - err := shared.DeepCopy(&event, &eventCopy) - if err != nil { - return - } - eventCopy.Location = listener.location - - event = eventCopy - } - - body, err := json.Marshal(event) - if err != nil { - return - } - - err = listener.connection.WriteMessage(websocket.TextMessage, body) - if err != nil { - // Remove the listener from the list - eventsLock.Lock() - delete(eventListeners, listener.id) - eventsLock.Unlock() - - // Disconnect the listener - listener.connection.Close() - listener.active <- false - listener.done = true - logger.Debugf("Disconnected event listener: %s", listener.id) - } - }(listener, event) - } - eventsLock.Unlock() - - return nil -} - -// Forward to the local events dispatcher an event received from another node . -func eventForward(id int64, event api.Event) { - if event.Type == "logging" { - // Parse the message - logEntry := api.EventLogging{} - err := json.Unmarshal(event.Metadata, &logEntry) - if err != nil { - return - } - - if !debug && logEntry.Level == "dbug" { - return - } - - if !debug && !verbose && logEntry.Level == "info" { - return - } - } - - err := eventBroadcast("", event, true) - if err != nil { - logger.Warnf("Failed to forward event from node %d: %v", id, err) - } -} diff --git a/lxd/events/events.go b/lxd/events/events.go new file mode 100644 index 0000000000..f2ae3fd5c8 --- /dev/null +++ b/lxd/events/events.go @@ -0,0 +1,259 @@ +package events + +import ( + "encoding/json" + "fmt" + "sync" + "time" + + "github.com/pborman/uuid" + + "github.com/gorilla/websocket" + "github.com/lxc/lxd/shared" + "github.com/lxc/lxd/shared/api" + log "github.com/lxc/lxd/shared/log15" + "github.com/lxc/lxd/shared/logger" +) + +var debug bool +var verbose bool + +var eventsLock sync.Mutex +var eventListeners map[string]*Listener = make(map[string]*Listener) + +// Listener describes an event listener. +type Listener struct { + project string + connection *websocket.Conn + messageTypes []string + active chan bool + id string + lock sync.Mutex + done bool + location string + + // If true, this listener won't get events forwarded from other + // nodes. It only used by listeners created internally by LXD nodes + // connecting to other LXD nodes to get their local events only. + noForward bool +} + +// NewEventListener creates and returns a new event listener. +func NewEventListener(project string, connection *websocket.Conn, messageTypes []string, location string, noForward bool) *Listener { + return &Listener{ + project: project, + connection: connection, + messageTypes: messageTypes, + location: location, + noForward: noForward, + active: make(chan bool, 1), + id: uuid.NewRandom().String(), + } +} + +// MessageTypes returns a list of message types the listener will be notified of. +func (e *Listener) MessageTypes() []string { + return e.messageTypes +} + +// IsDone returns true if the listener is done. +func (e *Listener) IsDone() bool { + return e.done +} + +// Connection returns the underlying websocket connection. +func (e *Listener) Connection() *websocket.Conn { + return e.connection +} + +// ID returns the listener ID. +func (e *Listener) ID() string { + return e.id +} + +// Wait waits for a message on its active channel, then returns. +func (e *Listener) Wait() { + <-e.active +} + +// Lock locks the internal mutex. +func (e *Listener) Lock() { + e.lock.Lock() +} + +// Unlock unlocks the internal mutex. +func (e *Listener) Unlock() { + e.lock.Unlock() +} + +// Deactivate deactivates the event listener. +func (e *Listener) Deactivate() { + e.active <- false + e.done = true +} + +// Handler describes an event handler. +type Handler struct { +} + +// NewEventHandler creates and returns a new event handler. +func NewEventHandler() *Handler { + return &Handler{} +} + +// Log sends a new logging event. +func (h Handler) Log(r *log.Record) error { + Send("", "logging", api.EventLogging{ + Message: r.Msg, + Level: r.Lvl.String(), + Context: logContextMap(r.Ctx)}) + return nil +} + +// Init sets the debug and verbose flags. +func Init(d bool, v bool) { + debug = d + verbose = v +} + +// AddListener adds the given listener to the internal list of listeners which +// are notified when events are broadcasted. +func AddListener(listener *Listener) { + eventsLock.Lock() + eventListeners[listener.id] = listener + eventsLock.Unlock() +} + +// SendLifecycle broadcasts a lifecycle event. +func SendLifecycle(project, action, source string, + context map[string]interface{}) error { + Send(project, "lifecycle", api.EventLifecycle{ + Action: action, + Source: source, + Context: context}) + return nil +} + +// Send broadcasts a custom event. +func Send(project, eventType string, eventMessage interface{}) error { + encodedMessage, err := json.Marshal(eventMessage) + if err != nil { + return err + } + event := api.Event{ + Type: eventType, + Timestamp: time.Now(), + Metadata: encodedMessage, + } + + return broadcast(project, event, false) +} + +// Forward to the local events dispatcher an event received from another node. +func Forward(id int64, event api.Event) { + if event.Type == "logging" { + // Parse the message + logEntry := api.EventLogging{} + err := json.Unmarshal(event.Metadata, &logEntry) + if err != nil { + return + } + + if !debug && logEntry.Level == "dbug" { + return + } + + if !debug && !verbose && logEntry.Level == "info" { + return + } + } + + err := broadcast("", event, true) + if err != nil { + logger.Warnf("Failed to forward event from node %d: %v", id, err) + } +} + +func logContextMap(ctx []interface{}) map[string]string { + var key string + ctxMap := map[string]string{} + + for _, entry := range ctx { + if key == "" { + key = entry.(string) + } else { + ctxMap[key] = fmt.Sprintf("%v", entry) + key = "" + } + } + + return ctxMap +} + +func broadcast(project string, event api.Event, isForward bool) error { + eventsLock.Lock() + listeners := eventListeners + for _, listener := range listeners { + if project != "" && listener.project != "*" && project != listener.project { + continue + } + + if isForward && listener.noForward { + continue + } + + if !shared.StringInSlice(event.Type, listener.messageTypes) { + continue + } + + go func(listener *Listener, event api.Event) { + // Check that the listener still exists + if listener == nil { + return + } + + // Ensure there is only a single even going out at the time + listener.lock.Lock() + defer listener.lock.Unlock() + + // Make sure we're not done already + if listener.done { + return + } + + // Set the Location to the expected serverName + if event.Location == "" { + eventCopy := api.Event{} + err := shared.DeepCopy(&event, &eventCopy) + if err != nil { + return + } + eventCopy.Location = listener.location + + event = eventCopy + } + + body, err := json.Marshal(event) + if err != nil { + return + } + + err = listener.connection.WriteMessage(websocket.TextMessage, body) + if err != nil { + // Remove the listener from the list + eventsLock.Lock() + delete(eventListeners, listener.id) + eventsLock.Unlock() + + // Disconnect the listener + listener.connection.Close() + listener.active <- false + listener.done = true + logger.Debugf("Disconnected event listener: %s", listener.id) + } + }(listener, event) + } + eventsLock.Unlock() + + return nil +} diff --git a/lxd/main.go b/lxd/main.go index af52a93423..476ad9ecb4 100644 --- a/lxd/main.go +++ b/lxd/main.go @@ -7,6 +7,7 @@ import ( "github.com/spf13/cobra" + "github.com/lxc/lxd/lxd/events" "github.com/lxc/lxd/shared/logger" "github.com/lxc/lxd/shared/logging" "github.com/lxc/lxd/shared/version" @@ -39,14 +40,16 @@ func (c *cmdGlobal) Run(cmd *cobra.Command, args []string) error { debug = c.flagLogDebug verbose = c.flagLogVerbose + // Set debug and verbose for the events package + events.Init(debug, verbose) + // Setup logger syslog := "" if c.flagLogSyslog { syslog = "lxd" } - handler := eventsHandler{} - log, err := logging.GetLogger(syslog, c.flagLogFile, c.flagLogVerbose, c.flagLogDebug, handler) + log, err := logging.GetLogger(syslog, c.flagLogFile, c.flagLogVerbose, c.flagLogDebug, events.NewEventHandler()) if err != nil { return err } diff --git a/lxd/main_forkdns.go b/lxd/main_forkdns.go index e339593c49..0ba1d322b9 100644 --- a/lxd/main_forkdns.go +++ b/lxd/main_forkdns.go @@ -14,6 +14,7 @@ import ( "github.com/spf13/cobra" "gopkg.in/fsnotify.v0" + "github.com/lxc/lxd/lxd/events" "github.com/lxc/lxd/shared" "github.com/lxc/lxd/shared/dnsutil" "github.com/lxc/lxd/shared/logger" @@ -442,7 +443,7 @@ func (c *cmdForkDNS) Run(cmd *cobra.Command, args []string) error { return fmt.Errorf("Missing required arguments") } - log, err := logging.GetLogger("lxd-forkdns", "", false, false, eventsHandler{}) + log, err := logging.GetLogger("lxd-forkdns", "", false, false, events.NewEventHandler()) if err != nil { return err } diff --git a/lxd/operations.go b/lxd/operations.go index d0908e31e3..c96409b58e 100644 --- a/lxd/operations.go +++ b/lxd/operations.go @@ -14,6 +14,7 @@ import ( "github.com/lxc/lxd/lxd/cluster" "github.com/lxc/lxd/lxd/db" + "github.com/lxc/lxd/lxd/events" "github.com/lxc/lxd/lxd/node" "github.com/lxc/lxd/lxd/util" "github.com/lxc/lxd/shared" @@ -154,7 +155,7 @@ func (op *operation) Run() (chan error, error) { logger.Debugf("Failure for %s operation: %s: %s", op.class.String(), op.id, err) _, md, _ := op.Render() - eventSend(op.project, "operation", md) + events.Send(op.project, "operation", md) return } @@ -167,7 +168,7 @@ func (op *operation) Run() (chan error, error) { op.lock.Lock() logger.Debugf("Success for %s operation: %s", op.class.String(), op.id) _, md, _ := op.Render() - eventSend(op.project, "operation", md) + events.Send(op.project, "operation", md) op.lock.Unlock() }(op, chanRun) } @@ -175,7 +176,7 @@ func (op *operation) Run() (chan error, error) { logger.Debugf("Started %s operation: %s", op.class.String(), op.id) _, md, _ := op.Render() - eventSend(op.project, "operation", md) + events.Send(op.project, "operation", md) return chanRun, nil } @@ -207,7 +208,7 @@ func (op *operation) Cancel() (chan error, error) { logger.Debugf("Failed to cancel %s operation: %s: %s", op.class.String(), op.id, err) _, md, _ := op.Render() - eventSend(op.project, "operation", md) + events.Send(op.project, "operation", md) return } @@ -219,13 +220,13 @@ func (op *operation) Cancel() (chan error, error) { logger.Debugf("Cancelled %s operation: %s", op.class.String(), op.id) _, md, _ := op.Render() - eventSend(op.project, "operation", md) + events.Send(op.project, "operation", md) }(op, oldStatus, chanCancel) } logger.Debugf("Cancelling %s operation: %s", op.class.String(), op.id) _, md, _ := op.Render() - eventSend(op.project, "operation", md) + events.Send(op.project, "operation", md) if op.canceler != nil { err := op.canceler.Cancel() @@ -244,7 +245,7 @@ func (op *operation) Cancel() (chan error, error) { logger.Debugf("Cancelled %s operation: %s", op.class.String(), op.id) _, md, _ = op.Render() - eventSend(op.project, "operation", md) + events.Send(op.project, "operation", md) return chanCancel, nil } @@ -383,7 +384,7 @@ func (op *operation) UpdateResources(opResources map[string][]string) error { logger.Debugf("Updated resources for %s operation: %s", op.class.String(), op.id) _, md, _ := op.Render() - eventSend(op.project, "operation", md) + events.Send(op.project, "operation", md) return nil } @@ -409,7 +410,7 @@ func (op *operation) UpdateMetadata(opMetadata interface{}) error { logger.Debugf("Updated metadata for %s operation: %s", op.class.String(), op.id) _, md, _ := op.Render() - eventSend(op.project, "operation", md) + events.Send(op.project, "operation", md) return nil } @@ -472,7 +473,7 @@ func operationCreate(cluster *db.Cluster, project string, opClass operationClass logger.Debugf("New %s operation: %s", op.class.String(), op.id) _, md, _ := op.Render() - eventSend(op.project, "operation", md) + events.Send(op.project, "operation", md) return &op, nil } From 841007fabda56bb6b4755a0606a137bddadd711b Mon Sep 17 00:00:00 2001 From: Thomas Hipp <thomas.h...@canonical.com> Date: Wed, 25 Sep 2019 09:39:50 +0200 Subject: [PATCH 2/2] test: Add events package to static analysis test Signed-off-by: Thomas Hipp <thomas.h...@canonical.com> --- test/suites/static_analysis.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/test/suites/static_analysis.sh b/test/suites/static_analysis.sh index 8f40270444..21291beeb6 100644 --- a/test/suites/static_analysis.sh +++ b/test/suites/static_analysis.sh @@ -78,6 +78,7 @@ test_static_analysis() { golint -set_exit_status lxd/db/query golint -set_exit_status lxd/db/schema golint -set_exit_status lxd/endpoints + golint -set_exit_status lxd/events golint -set_exit_status lxd/maas #golint -set_exit_status lxd/migration golint -set_exit_status lxd/node
_______________________________________________ lxc-devel mailing list lxc-devel@lists.linuxcontainers.org http://lists.linuxcontainers.org/listinfo/lxc-devel