The following pull request was submitted through Github.
It can be accessed and reviewed at: https://github.com/lxc/lxd/pull/7018

This e-mail was sent by the LXC bot, direct replies will not reach the author
unless they happen to be subscribed to this list.

=== Description (from pull-request) ===
The websocket read go routines were being started before the instance.Exec process was confirmed to have started.

This was leaking go routines and leaving websocket readers running on closed connections which could result in infinite loops in the websocket reader functions.

Changes made:

1. Re-orders the code so that the websocket mirroring go routines are not started until the process being execed has started.
2. Changes any channels that were used for coodination only (i.e not actually sending any data) from bool to struct{} to indicate that the value has no meaning, and instead using close(ch) to indicate whatever coodination point was needed. This has the added benefit of not needing to use a buffered channel, as channels can be closed with no readers without blocking.
3. Properly uses the finisher() function for returning from the Do() function so that sockets and channels are closed.
4. Improves logging with instance name context.

Signed-off-by: Thomas Parrott <thomas.parr...@canonical.com>
From 3f7441e7a34a92cad426941a82a7778248dfc590 Mon Sep 17 00:00:00 2001
From: Thomas Parrott <thomas.parr...@canonical.com>
Date: Fri, 13 Mar 2020 10:28:38 +0000
Subject: [PATCH 1/5] shared/util/linux: Updates ExecReaderToChannel to accept
 a finisher chan as struct{}

This is because the action of closing such a channel suffices for indicating 
the channel is finished.

It also means that the sender doesn't need to initialise the channel as a 
buffered channel, as multiple reads are allowed on a closed channel.

The empty struct{} is used to indicate that the contents of the read message 
has no meaning, just the action of reading from it is the indicator.

Signed-off-by: Thomas Parrott <thomas.parr...@canonical.com>
---
 shared/util_linux_cgo.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/shared/util_linux_cgo.go b/shared/util_linux_cgo.go
index 45f7c03e22..41d84b80ff 100644
--- a/shared/util_linux_cgo.go
+++ b/shared/util_linux_cgo.go
@@ -289,7 +289,7 @@ again:
 // Extensively commented directly in the code. Please leave the comments!
 // Looking at this in a couple of months noone will know why and how this works
 // anymore.
-func ExecReaderToChannel(r io.Reader, bufferSize int, exited <-chan bool, fd 
int) <-chan []byte {
+func ExecReaderToChannel(r io.Reader, bufferSize int, exited <-chan struct{}, 
fd int) <-chan []byte {
        if bufferSize <= (128 * 1024) {
                bufferSize = (128 * 1024)
        }

From 03a71e64dd2dee8ab39cbcb25816a2d2e5238207 Mon Sep 17 00:00:00 2001
From: Thomas Parrott <thomas.parr...@canonical.com>
Date: Fri, 13 Mar 2020 10:30:14 +0000
Subject: [PATCH 2/5] lxd-agent/exec: Updates usage of ExecReaderToChannel
 channel definitions

Signed-off-by: Thomas Parrott <thomas.parr...@canonical.com>
---
 lxd-agent/exec.go | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/lxd-agent/exec.go b/lxd-agent/exec.go
index 1feddb8325..23a3fa5104 100644
--- a/lxd-agent/exec.go
+++ b/lxd-agent/exec.go
@@ -253,7 +253,7 @@ func (s *execWs) Do(op *operations.Operation) error {
 
        controlExit := make(chan bool, 1)
        attachedChildIsBorn := make(chan int)
-       attachedChildIsDead := make(chan bool, 1)
+       attachedChildIsDead := make(chan struct{})
        var wgEOF sync.WaitGroup
 
        if s.interactive {
@@ -400,7 +400,7 @@ func (s *execWs) Do(op *operations.Operation) error {
                        conn.Close()
                }
 
-               attachedChildIsDead <- true
+               close(attachedChildIsDead)
 
                wgEOF.Wait()
 

From ad51d1488bed7e3911901eb8942a95cf4e14b58b Mon Sep 17 00:00:00 2001
From: Thomas Parrott <thomas.parr...@canonical.com>
Date: Fri, 13 Mar 2020 10:33:56 +0000
Subject: [PATCH 3/5] shared/network: Removes logging internal state of
 websocket in WebsocketRecvStream

Signed-off-by: Thomas Parrott <thomas.parr...@canonical.com>
---
 shared/network.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/shared/network.go b/shared/network.go
index 71e5f09551..3a49052850 100644
--- a/shared/network.go
+++ b/shared/network.go
@@ -211,7 +211,7 @@ func WebsocketRecvStream(w io.Writer, conn *websocket.Conn) 
chan bool {
                        }
 
                        if err != nil {
-                               logger.Debugf("Got error getting next reader 
%s, %s", err, w)
+                               logger.Debugf("Got error getting next reader 
%s", err)
                                break
                        }
 

From 7e4468edad2977201ea0fc8ce41bb48a31b7a512 Mon Sep 17 00:00:00 2001
From: Thomas Parrott <thomas.parr...@canonical.com>
Date: Fri, 13 Mar 2020 10:34:20 +0000
Subject: [PATCH 4/5] shared/netutils/network/linux: Updates
 WebsocketExecMirror to use struct{} exited indicator channel

Signed-off-by: Thomas Parrott <thomas.parr...@canonical.com>
---
 shared/netutils/network_linux.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/shared/netutils/network_linux.go b/shared/netutils/network_linux.go
index 0dc7bb72b0..2f31ef70e6 100644
--- a/shared/netutils/network_linux.go
+++ b/shared/netutils/network_linux.go
@@ -173,7 +173,7 @@ func NetnsGetifaddrs(initPID int32) 
(map[string]api.InstanceStateNetwork, error)
 }
 
 // WebsocketExecMirror mirrors a websocket connection with a set of 
Writer/Reader.
-func WebsocketExecMirror(conn *websocket.Conn, w io.WriteCloser, r 
io.ReadCloser, exited chan bool, fd int) (chan bool, chan bool) {
+func WebsocketExecMirror(conn *websocket.Conn, w io.WriteCloser, r 
io.ReadCloser, exited chan struct{}, fd int) (chan bool, chan bool) {
        readDone := make(chan bool, 1)
        writeDone := make(chan bool, 1)
 

From ef63dd9af90281e2c484376ab619ced1ee37b8fd Mon Sep 17 00:00:00 2001
From: Thomas Parrott <thomas.parr...@canonical.com>
Date: Fri, 13 Mar 2020 10:35:19 +0000
Subject: [PATCH 5/5] lxd/instance/exec: Fixes VM read loop when agent not
 started

The websocket read go routines were being started before the instance.Exec 
process was confirmed to have started.

This was leaking go routines and leaving websocket readers running on closed 
connections which could result in infinite loops in the websocket reader 
functions.

Changes made:

1. Re-orders the code so that the websocket mirroring go routines are not 
started until the process being execed has started.
2. Changes any channels that were used for coodination only (i.e not actually 
sending any data) from bool to struct{} to indicate that the value has no 
meaning, and instead using close(ch) to indicate whatever coodination point was 
needed. This has the added benefit of not needing to use a buffered channel, as 
channels can be closed with no readers without blocking.
3. Properly uses the finisher() function for returning from the Do() function 
so that sockets and channels are closed.
4. Improves logging with instance name context.

Signed-off-by: Thomas Parrott <thomas.parr...@canonical.com>
---
 lxd/instance_exec.go | 146 +++++++++++++++++++++----------------------
 1 file changed, 70 insertions(+), 76 deletions(-)

diff --git a/lxd/instance_exec.go b/lxd/instance_exec.go
index 66b95f19fd..8f3e3d41a0 100644
--- a/lxd/instance_exec.go
+++ b/lxd/instance_exec.go
@@ -25,6 +25,7 @@ import (
        "github.com/lxc/lxd/shared/api"
        log "github.com/lxc/lxd/shared/log15"
        "github.com/lxc/lxd/shared/logger"
+       "github.com/lxc/lxd/shared/logging"
        "github.com/lxc/lxd/shared/netutils"
        "github.com/lxc/lxd/shared/version"
 )
@@ -37,8 +38,8 @@ type execWs struct {
        rootGid          int64
        conns            map[int]*websocket.Conn
        connsLock        sync.Mutex
-       allConnected     chan bool
-       controlConnected chan bool
+       allConnected     chan struct{}
+       controlConnected chan struct{}
        fds              map[int]string
 }
 
@@ -78,7 +79,7 @@ func (s *execWs) Connect(op *operations.Operation, r 
*http.Request, w http.Respo
                        s.connsLock.Unlock()
 
                        if fd == -1 {
-                               s.controlConnected <- true
+                               close(s.controlConnected) // Control WS is now 
connected.
                                return nil
                        }
 
@@ -91,7 +92,7 @@ func (s *execWs) Connect(op *operations.Operation, r 
*http.Request, w http.Respo
                        }
                        s.connsLock.Unlock()
 
-                       s.allConnected <- true
+                       close(s.allConnected) // All WS not connected.
                        return nil
                }
        }
@@ -142,17 +143,59 @@ func (s *execWs) Do(op *operations.Operation) error {
                stderr = ttys[2]
        }
 
-       controlExit := make(chan bool)
-       attachedChildIsBorn := make(chan instance.Cmd)
-       attachedChildIsDead := make(chan bool, 1)
+       controlExit := make(chan struct{})
+       attachedChildIsDead := make(chan struct{})
        var wgEOF sync.WaitGroup
 
+       // Define a function to clean up TTYs and sockets when done.
+       finisher := func(cmdResult int, cmdErr error) error {
+               for _, tty := range ttys {
+                       tty.Close()
+               }
+
+               s.connsLock.Lock()
+               conn := s.conns[-1]
+               s.connsLock.Unlock()
+
+               if conn == nil {
+                       if s.req.Interactive {
+                               close(controlExit)
+                       }
+               } else {
+                       conn.Close()
+               }
+
+               close(attachedChildIsDead)
+
+               wgEOF.Wait()
+
+               for _, pty := range ptys {
+                       pty.Close()
+               }
+
+               metadata := shared.Jmap{"return": cmdResult}
+               err = op.UpdateMetadata(metadata)
+               if err != nil {
+                       return err
+               }
+
+               return cmdErr
+       }
+
+       cmd, err := s.instance.Exec(s.req, stdin, stdout, stderr)
+       if err != nil {
+               return finisher(-1, err)
+       }
+
+       logger := logging.AddContext(logger.Log, log.Ctx{"instance": 
s.instance.Name(), "PID": cmd.PID()})
+       logger.Debug("Instance process started")
+
+       // Now that process has started, we can start the mirroring of the 
process channels and websockets.
        if s.req.Interactive {
                wgEOF.Add(1)
                go func() {
-                       logger.Debugf("Interactive child process handler 
waiting")
-                       defer logger.Debugf("Interactive child process handler 
finished")
-                       attachedChild := <-attachedChildIsBorn
+                       logger.Debug("Interactive child process handler 
started")
+                       defer logger.Debug("Interactive child process handler 
finished")
 
                        select {
                        case <-s.controlConnected:
@@ -162,7 +205,6 @@ func (s *execWs) Do(op *operations.Operation) error {
                                return
                        }
 
-                       logger.Debugf(`Interactive child process handler 
started for child PID "%d"`, attachedChild.PID())
                        for {
                                s.connsLock.Lock()
                                conn := s.conns[-1]
@@ -174,7 +216,7 @@ func (s *execWs) Do(op *operations.Operation) error {
                                }
 
                                if err != nil {
-                                       logger.Debugf("Got error getting next 
reader: %v", err)
+                                       logger.Debug("Got error getting next 
reader", log.Ctx{"err": err})
                                        er, ok := err.(*websocket.CloseError)
                                        if !ok {
                                                break
@@ -185,50 +227,50 @@ func (s *execWs) Do(op *operations.Operation) error {
                                        }
 
                                        // If an abnormal closure occurred, 
kill the attached child.
-                                       err := 
attachedChild.Signal(unix.SIGKILL)
+                                       err := cmd.Signal(unix.SIGKILL)
                                        if err != nil {
-                                               logger.Debugf(`Failed to send 
SIGKILL to PID "%d": %v`, attachedChild.PID(), err)
+                                               logger.Debug("Failed to send 
SIGKILL signal", log.Ctx{"err": err})
                                        } else {
-                                               logger.Debugf(`Sent SIGKILL to 
PID "%d"`, attachedChild.PID())
+                                               logger.Debug("Sent SIGKILL 
signal")
                                        }
                                        return
                                }
 
                                buf, err := ioutil.ReadAll(r)
                                if err != nil {
-                                       logger.Debugf("Failed to read message: 
%v", err)
+                                       logger.Debug("Failed to read message", 
log.Ctx{"err": err})
                                        break
                                }
 
                                command := api.InstanceExecControl{}
 
                                if err := json.Unmarshal(buf, &command); err != 
nil {
-                                       logger.Debugf("Failed to unmarshal 
control socket command: %v", err)
+                                       logger.Debug("Failed to unmarshal 
control socket command", log.Ctx{"err": err})
                                        continue
                                }
 
                                if command.Command == "window-resize" {
                                        winchWidth, err := 
strconv.Atoi(command.Args["width"])
                                        if err != nil {
-                                               logger.Debugf("Unable to 
extract window width: %v", err)
+                                               logger.Debug("Unable to extract 
window width", log.Ctx{"err": err})
                                                continue
                                        }
 
                                        winchHeight, err := 
strconv.Atoi(command.Args["height"])
                                        if err != nil {
-                                               logger.Debugf("Unable to 
extract window height: %v", err)
+                                               logger.Debug("Unable to extract 
window height", log.Ctx{"err": err})
                                                continue
                                        }
 
-                                       err = 
attachedChild.WindowResize(int(ptys[0].Fd()), winchWidth, winchHeight)
+                                       err = 
cmd.WindowResize(int(ptys[0].Fd()), winchWidth, winchHeight)
                                        if err != nil {
-                                               logger.Debugf(`Failed to set 
window size to "%dx%d": %v`, winchWidth, winchHeight, err)
+                                               logger.Debug("Failed to set 
window size", winchWidth, winchHeight, log.Ctx{"err": err, "width": winchWidth, 
"height": winchHeight})
                                                continue
                                        }
                                } else if command.Command == "signal" {
-                                       err := 
attachedChild.Signal(unix.Signal(command.Signal))
+                                       err := 
cmd.Signal(unix.Signal(command.Signal))
                                        if err != nil {
-                                               logger.Debugf(`Failed 
forwarding signal "%d" to PID "%d": %v`, command.Signal, attachedChild.PID(), 
err)
+                                               logger.Debug("Failed forwarding 
signal", log.Ctx{"err": err, "signal": command.Signal})
                                                continue
                                        }
                                }
@@ -240,8 +282,8 @@ func (s *execWs) Do(op *operations.Operation) error {
                        conn := s.conns[0]
                        s.connsLock.Unlock()
 
-                       logger.Debugf("Started mirroring websocket")
-                       defer logger.Debugf("Finished mirroring websocket")
+                       logger.Debug("Started mirroring websocket")
+                       defer logger.Debug("Finished mirroring websocket")
                        readDone, writeDone := 
netutils.WebsocketExecMirror(conn, ptys[0], ptys[0], attachedChildIsDead, 
int(ptys[0].Fd()))
 
                        <-readDone
@@ -249,7 +291,6 @@ func (s *execWs) Do(op *operations.Operation) error {
                        conn.Close()
                        wgEOF.Done()
                }()
-
        } else {
                wgEOF.Add(len(ttys) - 1)
                for i := 0; i < len(ttys); i++ {
@@ -274,55 +315,8 @@ func (s *execWs) Do(op *operations.Operation) error {
                }
        }
 
-       finisher := func(cmdResult int, cmdErr error) error {
-               for _, tty := range ttys {
-                       tty.Close()
-               }
-
-               s.connsLock.Lock()
-               conn := s.conns[-1]
-               s.connsLock.Unlock()
-
-               if conn == nil {
-                       if s.req.Interactive {
-                               controlExit <- true
-                       }
-               } else {
-                       conn.Close()
-               }
-
-               attachedChildIsDead <- true
-
-               wgEOF.Wait()
-
-               for _, pty := range ptys {
-                       pty.Close()
-               }
-
-               metadata := shared.Jmap{"return": cmdResult}
-               err = op.UpdateMetadata(metadata)
-               if err != nil {
-                       return err
-               }
-
-               return cmdErr
-       }
-
-       cmd, err := s.instance.Exec(s.req, stdin, stdout, stderr)
-       if err != nil {
-               return err
-       }
-
-       if s.req.Interactive {
-               // Start the interactive process handler.
-               attachedChildIsBorn <- cmd
-       }
-
        exitCode, err := cmd.Wait()
-       if err != nil {
-               return err
-       }
-
+       logger.Debug("Instance process stopped")
        return finisher(exitCode, err)
 }
 
@@ -444,8 +438,8 @@ func containerExecPost(d *Daemon, r *http.Request) 
response.Response {
                        ws.conns[1] = nil
                        ws.conns[2] = nil
                }
-               ws.allConnected = make(chan bool, 1)
-               ws.controlConnected = make(chan bool, 1)
+               ws.allConnected = make(chan struct{})
+               ws.controlConnected = make(chan struct{})
                for i := -1; i < len(ws.conns)-1; i++ {
                        ws.fds[i], err = shared.RandomCryptoString()
                        if err != nil {
_______________________________________________
lxc-devel mailing list
lxc-devel@lists.linuxcontainers.org
http://lists.linuxcontainers.org/listinfo/lxc-devel

Reply via email to