The following pull request was submitted through Github.
It can be accessed and reviewed at: https://github.com/lxc/lxd/pull/6360

This e-mail was sent by the LXC bot, direct replies will not reach the author
unless they happen to be subscribed to this list.

=== Description (from pull-request) ===

From 5f631ad4eff8345ba3835c426f1e8c71a91bf0be Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgra...@ubuntu.com>
Date: Fri, 25 Oct 2019 15:12:35 -0400
Subject: [PATCH 1/7] lxd/rsync: Switch to using io.ReadWriteCloser
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgra...@ubuntu.com>
---
 lxd/rsync/rsync.go | 68 ++++++++++++++++++----------------------------
 1 file changed, 27 insertions(+), 41 deletions(-)

diff --git a/lxd/rsync/rsync.go b/lxd/rsync/rsync.go
index 0fed6d5f44..17cf13f8f6 100644
--- a/lxd/rsync/rsync.go
+++ b/lxd/rsync/rsync.go
@@ -10,11 +10,11 @@ import (
        "syscall"
        "time"
 
-       "github.com/gorilla/websocket"
        "github.com/pborman/uuid"
 
        "github.com/lxc/lxd/lxd/daemon"
        "github.com/lxc/lxd/shared"
+       "github.com/lxc/lxd/shared/ioprogress"
        "github.com/lxc/lxd/shared/logger"
 )
 
@@ -180,30 +180,35 @@ func sendSetup(name string, path string, bwlimit string, 
execPath string, featur
 
 // Send sets up the sending half of an rsync, to recursively send the
 // directory pointed to by path over the websocket.
-func Send(name string, path string, conn *websocket.Conn, readWrapper 
func(io.ReadCloser) io.ReadCloser, features []string, bwlimit string, execPath 
string) error {
-       cmd, dataSocket, stderr, err := sendSetup(name, path, bwlimit, 
execPath, features)
+func Send(name string, path string, conn io.ReadWriteCloser, tracker 
*ioprogress.ProgressTracker, features []string, bwlimit string, execPath 
string) error {
+       cmd, netcatConn, stderr, err := sendSetup(name, path, bwlimit, 
execPath, features)
        if err != nil {
                return err
        }
+       defer netcatConn.Close()
 
-       if dataSocket != nil {
-               defer dataSocket.Close()
+       // Setup progress tracker
+       readPipe := io.ReadCloser(netcatConn)
+       if tracker != nil {
+               readPipe = &ioprogress.ProgressReader{netcatConn, tracker}
        }
 
-       readPipe := io.ReadCloser(dataSocket)
-       if readWrapper != nil {
-               readPipe = readWrapper(dataSocket)
-       }
-
-       readDone, writeDone := shared.WebsocketMirror(conn, dataSocket, 
readPipe, nil, nil)
+       // Forward from netcat to target
+       chCopy := make(chan error, 1)
+       go func() {
+               _, err := io.Copy(conn, readPipe)
+               chCopy <- err
+       }()
 
+       // Wait for rsync to complete
        chError := make(chan error, 1)
        go func() {
                err = cmd.Wait()
                if err != nil {
-                       dataSocket.Close()
+                       netcatConn.Close()
                        readPipe.Close()
                }
+
                chError <- err
        }()
 
@@ -217,8 +222,7 @@ func Send(name string, path string, conn *websocket.Conn, 
readWrapper func(io.Re
                logger.Errorf("Rsync send failed: %s: %s: %s", path, err, 
string(output))
        }
 
-       <-readDone
-       <-writeDone
+       <-chCopy
 
        return err
 }
@@ -226,7 +230,7 @@ func Send(name string, path string, conn *websocket.Conn, 
readWrapper func(io.Re
 // Recv sets up the receiving half of the websocket to rsync (the other
 // half set up by rsync.Send), putting the contents in the directory specified
 // by path.
-func Recv(path string, conn *websocket.Conn, writeWrapper func(io.WriteCloser) 
io.WriteCloser, features []string) error {
+func Recv(path string, conn io.ReadWriteCloser, tracker 
*ioprogress.ProgressTracker, features []string) error {
        args := []string{
                "--server",
                "-vlogDtpre.iLsfx",
@@ -243,48 +247,30 @@ func Recv(path string, conn *websocket.Conn, writeWrapper 
func(io.WriteCloser) i
        args = append(args, []string{".", path}...)
 
        cmd := exec.Command("rsync", args...)
-
-       stdin, err := cmd.StdinPipe()
-       if err != nil {
-               return err
-       }
-
-       stdout, err := cmd.StdoutPipe()
-       if err != nil {
-               return err
+       if tracker != nil {
+               cmd.Stdin = &ioprogress.ProgressReader{conn, tracker}
        }
+       cmd.Stdin = conn
+       cmd.Stdout = conn
 
        stderr, err := cmd.StderrPipe()
        if err != nil {
                return err
        }
 
-       if err := cmd.Start(); err != nil {
-               return err
-       }
-
-       writePipe := io.WriteCloser(stdin)
-       if writeWrapper != nil {
-               writePipe = writeWrapper(stdin)
-       }
-
-       readDone, writeDone := shared.WebsocketMirror(conn, writePipe, stdout, 
nil, nil)
-       output, err := ioutil.ReadAll(stderr)
+       err = cmd.Start()
        if err != nil {
-               cmd.Process.Kill()
-               cmd.Wait()
                return err
        }
 
        err = cmd.Wait()
+       output, err := ioutil.ReadAll(stderr)
        if err != nil {
                logger.Errorf("Rsync receive failed: %s: %s: %s", path, err, 
string(output))
+               return err
        }
 
-       <-readDone
-       <-writeDone
-
-       return err
+       return nil
 }
 
 func rsyncFeatureArgs(features []string) []string {

From 4ea34114bc033e39c5a8d2ad0d4a7bef4f7c7b91 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgra...@ubuntu.com>
Date: Fri, 25 Oct 2019 15:46:34 -0400
Subject: [PATCH 2/7] shared: Implement a WebsocketIO ReadWriteCloser
 abstraction
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgra...@ubuntu.com>
---
 shared/network.go | 66 +++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 66 insertions(+)

diff --git a/shared/network.go b/shared/network.go
index 00e200001e..c73677df93 100644
--- a/shared/network.go
+++ b/shared/network.go
@@ -363,6 +363,72 @@ func DefaultWriter(conn *websocket.Conn, w io.WriteCloser, 
writeDone chan<- bool
        w.Close()
 }
 
+// WebsocketIO is a wrapper implementing ReadWriteCloser on top of websocket
+type WebsocketIO struct {
+       Conn   *websocket.Conn
+       reader io.Reader
+}
+
+func (w *WebsocketIO) Read(p []byte) (n int, err error) {
+       for {
+               // First read from this message
+               if w.reader == nil {
+                       var mt int
+
+                       mt, w.reader, err = w.Conn.NextReader()
+                       if err != nil {
+                               return -1, err
+                       }
+
+                       if mt == websocket.CloseMessage {
+                               return 0, io.EOF
+                       }
+
+                       if mt == websocket.TextMessage {
+                               return 0, io.EOF
+                       }
+               }
+
+               // Perform the read itself
+               n, err := w.reader.Read(p)
+               if err == io.EOF {
+
+                       // At the end of the message, reset reader
+                       w.reader = nil
+
+                       return n, nil
+               }
+
+               if err != nil {
+                       return -1, err
+               }
+
+               return n, nil
+       }
+}
+
+func (w *WebsocketIO) Write(p []byte) (n int, err error) {
+
+       wr, err := w.Conn.NextWriter(websocket.BinaryMessage)
+       if err != nil {
+               return -1, err
+       }
+
+       n, err = wr.Write(p)
+       if err != nil {
+               return -1, err
+       }
+       wr.Close()
+
+       return n, nil
+}
+
+func (w *WebsocketIO) Close() error {
+       closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, 
"")
+       w.Conn.WriteMessage(websocket.CloseMessage, closeMsg)
+       return w.Conn.Close()
+}
+
 // WebsocketMirror allows mirroring a reader to a websocket and taking the
 // result and writing it to a writer. This function allows for multiple
 // mirrorings and correctly negotiates stream endings. However, it means any

From f6653d302022dbb06846e9366d249dacb2527dc9 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgra...@ubuntu.com>
Date: Fri, 25 Oct 2019 15:46:53 -0400
Subject: [PATCH 3/7] lxd/migration: Introduce ProgressTracker
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgra...@ubuntu.com>
---
 lxd/migration/migration_volumes.go | 13 +++++++++++++
 1 file changed, 13 insertions(+)

diff --git a/lxd/migration/migration_volumes.go 
b/lxd/migration/migration_volumes.go
index b7ecb3aebc..1fd63a502b 100644
--- a/lxd/migration/migration_volumes.go
+++ b/lxd/migration/migration_volumes.go
@@ -188,3 +188,16 @@ func ProgressWriter(op *operations.Operation, key string, 
description string) fu
                return writePipe
        }
 }
+
+// ProgressTracker returns a migration I/O tracker
+func ProgressTracker(op *operations.Operation, key string, description string) 
*ioprogress.ProgressTracker {
+       progress := func(progressInt int64, speedInt int64) {
+               progressWrapperRender(op, key, description, progressInt, 
speedInt)
+       }
+
+       tracker := &ioprogress.ProgressTracker{
+               Handler: progress,
+       }
+
+       return tracker
+}

From ea252b4616c02e38ae4f8f8f168caf4b61287232 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgra...@ubuntu.com>
Date: Fri, 25 Oct 2019 15:47:16 -0400
Subject: [PATCH 4/7] lxd/migration: Switch over to ReadWriteCloser for rsync
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgra...@ubuntu.com>
---
 lxd/migrate_container.go |  8 +++----
 lxd/storage_migration.go | 46 ++++++++++++++++++++--------------------
 2 files changed, 27 insertions(+), 27 deletions(-)

diff --git a/lxd/migrate_container.go b/lxd/migrate_container.go
index 69b5d362ff..fda13de567 100644
--- a/lxd/migrate_container.go
+++ b/lxd/migrate_container.go
@@ -267,7 +267,7 @@ func (s *migrationSourceWs) preDumpLoop(args 
*preDumpLoopArgs) (bool, error) {
        // Send the pre-dump.
        ctName, _, _ := 
shared.ContainerGetParentAndSnapshotName(s.instance.Name())
        state := s.instance.DaemonState()
-       err = rsync.Send(ctName, shared.AddSlash(args.checkpointDir), 
s.criuConn, nil, args.rsyncFeatures, args.bwlimit, state.OS.ExecPath)
+       err = rsync.Send(ctName, shared.AddSlash(args.checkpointDir), 
&shared.WebsocketIO{Conn: s.criuConn}, nil, args.rsyncFeatures, args.bwlimit, 
state.OS.ExecPath)
        if err != nil {
                return final, err
        }
@@ -682,7 +682,7 @@ func (s *migrationSourceWs) Do(migrateOp 
*operations.Operation) error {
                 */
                ctName, _, _ := 
shared.ContainerGetParentAndSnapshotName(s.instance.Name())
                state := s.instance.DaemonState()
-               err = rsync.Send(ctName, shared.AddSlash(checkpointDir), 
s.criuConn, nil, rsyncFeatures, bwlimit, state.OS.ExecPath)
+               err = rsync.Send(ctName, shared.AddSlash(checkpointDir), 
&shared.WebsocketIO{Conn: s.criuConn}, nil, rsyncFeatures, bwlimit, 
state.OS.ExecPath)
                if err != nil {
                        return abort(err)
                }
@@ -1062,7 +1062,7 @@ func (c *migrationSink) Do(migrateOp 
*operations.Operation) error {
                                for !sync.GetFinalPreDump() {
                                        logger.Debugf("About to receive rsync")
                                        // Transfer a CRIU pre-dump
-                                       err = 
rsync.Recv(shared.AddSlash(imagesDir), criuConn, nil, rsyncFeatures)
+                                       err = 
rsync.Recv(shared.AddSlash(imagesDir), &shared.WebsocketIO{Conn: criuConn}, 
nil, rsyncFeatures)
                                        if err != nil {
                                                restore <- err
                                                return
@@ -1090,7 +1090,7 @@ func (c *migrationSink) Do(migrateOp 
*operations.Operation) error {
                        }
 
                        // Final CRIU dump
-                       err = rsync.Recv(shared.AddSlash(imagesDir), criuConn, 
nil, rsyncFeatures)
+                       err = rsync.Recv(shared.AddSlash(imagesDir), 
&shared.WebsocketIO{Conn: criuConn}, nil, rsyncFeatures)
                        if err != nil {
                                restore <- err
                                return
diff --git a/lxd/storage_migration.go b/lxd/storage_migration.go
index 5dae246681..6011f304ce 100644
--- a/lxd/storage_migration.go
+++ b/lxd/storage_migration.go
@@ -68,23 +68,23 @@ func (s rsyncStorageSourceDriver) SendStorageVolume(conn 
*websocket.Conn, op *op
                }
 
                for _, snap := range snapshots {
-                       wrapper := migration.ProgressReader(op, "fs_progress", 
snap.Name)
+                       wrapper := migration.ProgressTracker(op, "fs_progress", 
snap.Name)
                        path := 
driver.GetStoragePoolVolumeSnapshotMountPoint(pool.Name, snap.Name)
                        path = shared.AddSlash(path)
                        logger.Debugf("Starting to send storage volume snapshot 
%s on storage pool %s from %s", snap.Name, pool.Name, path)
 
-                       err = rsync.Send(volume.Name, path, conn, wrapper, 
s.rsyncFeatures, bwlimit, state.OS.ExecPath)
+                       err = rsync.Send(volume.Name, path, 
&shared.WebsocketIO{Conn: conn}, wrapper, s.rsyncFeatures, bwlimit, 
state.OS.ExecPath)
                        if err != nil {
                                return err
                        }
                }
        }
 
-       wrapper := migration.ProgressReader(op, "fs_progress", volume.Name)
+       wrapper := migration.ProgressTracker(op, "fs_progress", volume.Name)
        path := driver.GetStoragePoolVolumeMountPoint(pool.Name, volume.Name)
        path = shared.AddSlash(path)
        logger.Debugf("Starting to send storage volume %s on storage pool %s 
from %s", volume.Name, pool.Name, path)
-       err = rsync.Send(volume.Name, path, conn, wrapper, s.rsyncFeatures, 
bwlimit, state.OS.ExecPath)
+       err = rsync.Send(volume.Name, path, &shared.WebsocketIO{Conn: conn}, 
wrapper, s.rsyncFeatures, bwlimit, state.OS.ExecPath)
        if err != nil {
                return err
        }
@@ -106,16 +106,16 @@ func (s rsyncStorageSourceDriver) SendWhileRunning(conn 
*websocket.Conn, op *ope
                        }
 
                        path := send.Path()
-                       wrapper := migration.ProgressReader(op, "fs_progress", 
send.Name())
+                       wrapper := migration.ProgressTracker(op, "fs_progress", 
send.Name())
                        state := s.container.DaemonState()
-                       err = rsync.Send(project.Prefix(s.container.Project(), 
ctName), shared.AddSlash(path), conn, wrapper, s.rsyncFeatures, bwlimit, 
state.OS.ExecPath)
+                       err = rsync.Send(project.Prefix(s.container.Project(), 
ctName), shared.AddSlash(path), &shared.WebsocketIO{Conn: conn}, wrapper, 
s.rsyncFeatures, bwlimit, state.OS.ExecPath)
                        if err != nil {
                                return err
                        }
                }
        }
 
-       wrapper := migration.ProgressReader(op, "fs_progress", 
s.container.Name())
+       wrapper := migration.ProgressTracker(op, "fs_progress", 
s.container.Name())
        state := s.container.DaemonState()
 
        // Attempt to freeze the container to avoid changing files during 
transfer
@@ -128,14 +128,14 @@ func (s rsyncStorageSourceDriver) SendWhileRunning(conn 
*websocket.Conn, op *ope
                }
        }
 
-       return rsync.Send(project.Prefix(s.container.Project(), ctName), 
shared.AddSlash(s.container.Path()), conn, wrapper, s.rsyncFeatures, bwlimit, 
state.OS.ExecPath)
+       return rsync.Send(project.Prefix(s.container.Project(), ctName), 
shared.AddSlash(s.container.Path()), &shared.WebsocketIO{Conn: conn}, wrapper, 
s.rsyncFeatures, bwlimit, state.OS.ExecPath)
 }
 
 func (s rsyncStorageSourceDriver) SendAfterCheckpoint(conn *websocket.Conn, 
bwlimit string) error {
        ctName, _, _ := 
shared.ContainerGetParentAndSnapshotName(s.container.Name())
        // resync anything that changed between our first send and the 
checkpoint
        state := s.container.DaemonState()
-       return rsync.Send(project.Prefix(s.container.Project(), ctName), 
shared.AddSlash(s.container.Path()), conn, nil, s.rsyncFeatures, bwlimit, 
state.OS.ExecPath)
+       return rsync.Send(project.Prefix(s.container.Project(), ctName), 
shared.AddSlash(s.container.Path()), &shared.WebsocketIO{Conn: conn}, nil, 
s.rsyncFeatures, bwlimit, state.OS.ExecPath)
 }
 
 func (s rsyncStorageSourceDriver) Cleanup() {
@@ -259,12 +259,12 @@ func rsyncStorageMigrationSink(conn *websocket.Conn, op 
*operations.Operation, a
                                return err
                        }
 
-                       wrapper := migration.ProgressWriter(op, "fs_progress", 
target.Name)
+                       wrapper := migration.ProgressTracker(op, "fs_progress", 
target.Name)
                        path := 
driver.GetStoragePoolVolumeMountPoint(pool.Name, volume.Name)
                        path = shared.AddSlash(path)
                        logger.Debugf("Starting to receive storage volume 
snapshot %s on storage pool %s into %s", target.Name, pool.Name, path)
 
-                       err = rsync.Recv(path, conn, wrapper, 
args.RsyncFeatures)
+                       err = rsync.Recv(path, &shared.WebsocketIO{Conn: conn}, 
wrapper, args.RsyncFeatures)
                        if err != nil {
                                return err
                        }
@@ -276,11 +276,11 @@ func rsyncStorageMigrationSink(conn *websocket.Conn, op 
*operations.Operation, a
                }
        }
 
-       wrapper := migration.ProgressWriter(op, "fs_progress", volume.Name)
+       wrapper := migration.ProgressTracker(op, "fs_progress", volume.Name)
        path := driver.GetStoragePoolVolumeMountPoint(pool.Name, volume.Name)
        path = shared.AddSlash(path)
        logger.Debugf("Starting to receive storage volume %s on storage pool %s 
into %s", volume.Name, pool.Name, path)
-       return rsync.Recv(path, conn, wrapper, args.RsyncFeatures)
+       return rsync.Recv(path, &shared.WebsocketIO{Conn: conn}, wrapper, 
args.RsyncFeatures)
 }
 
 func rsyncMigrationSink(conn *websocket.Conn, op *operations.Operation, args 
MigrationSinkArgs) error {
@@ -356,8 +356,8 @@ func rsyncMigrationSink(conn *websocket.Conn, op 
*operations.Operation, args Mig
                                        }
                                }
 
-                               wrapper := migration.ProgressWriter(op, 
"fs_progress", s.Name())
-                               if err := rsync.Recv(shared.AddSlash(s.Path()), 
conn, wrapper, args.RsyncFeatures); err != nil {
+                               wrapper := migration.ProgressTracker(op, 
"fs_progress", s.Name())
+                               if err := rsync.Recv(shared.AddSlash(s.Path()), 
&shared.WebsocketIO{Conn: conn}, wrapper, args.RsyncFeatures); err != nil {
                                        return err
                                }
 
@@ -371,8 +371,8 @@ func rsyncMigrationSink(conn *websocket.Conn, op 
*operations.Operation, args Mig
                        }
                }
 
-               wrapper := migration.ProgressWriter(op, "fs_progress", 
args.Instance.Name())
-               err = rsync.Recv(shared.AddSlash(args.Instance.Path()), conn, 
wrapper, args.RsyncFeatures)
+               wrapper := migration.ProgressTracker(op, "fs_progress", 
args.Instance.Name())
+               err = rsync.Recv(shared.AddSlash(args.Instance.Path()), 
&shared.WebsocketIO{Conn: conn}, wrapper, args.RsyncFeatures)
                if err != nil {
                        return err
                }
@@ -409,8 +409,8 @@ func rsyncMigrationSink(conn *websocket.Conn, op 
*operations.Operation, args Mig
                                        }
                                }
 
-                               wrapper := migration.ProgressWriter(op, 
"fs_progress", snap.GetName())
-                               err := 
rsync.Recv(shared.AddSlash(args.Instance.Path()), conn, wrapper, 
args.RsyncFeatures)
+                               wrapper := migration.ProgressTracker(op, 
"fs_progress", snap.GetName())
+                               err := 
rsync.Recv(shared.AddSlash(args.Instance.Path()), &shared.WebsocketIO{Conn: 
conn}, wrapper, args.RsyncFeatures)
                                if err != nil {
                                        return err
                                }
@@ -434,8 +434,8 @@ func rsyncMigrationSink(conn *websocket.Conn, op 
*operations.Operation, args Mig
                        }
                }
 
-               wrapper := migration.ProgressWriter(op, "fs_progress", 
args.Instance.Name())
-               err = rsync.Recv(shared.AddSlash(args.Instance.Path()), conn, 
wrapper, args.RsyncFeatures)
+               wrapper := migration.ProgressTracker(op, "fs_progress", 
args.Instance.Name())
+               err = rsync.Recv(shared.AddSlash(args.Instance.Path()), 
&shared.WebsocketIO{Conn: conn}, wrapper, args.RsyncFeatures)
                if err != nil {
                        return err
                }
@@ -443,8 +443,8 @@ func rsyncMigrationSink(conn *websocket.Conn, op 
*operations.Operation, args Mig
 
        if args.Live {
                /* now receive the final sync */
-               wrapper := migration.ProgressWriter(op, "fs_progress", 
args.Instance.Name())
-               err := rsync.Recv(shared.AddSlash(args.Instance.Path()), conn, 
wrapper, args.RsyncFeatures)
+               wrapper := migration.ProgressTracker(op, "fs_progress", 
args.Instance.Name())
+               err := rsync.Recv(shared.AddSlash(args.Instance.Path()), 
&shared.WebsocketIO{Conn: conn}, wrapper, args.RsyncFeatures)
                if err != nil {
                        return err
                }

From 489b82dc0d6ebaf8aea4cacf7af7de4cdc4ce863 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgra...@ubuntu.com>
Date: Mon, 28 Oct 2019 10:39:46 +0100
Subject: [PATCH 5/7] lxd/rsync: DEBUG
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgra...@ubuntu.com>
---
 lxd/rsync/rsync.go | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/lxd/rsync/rsync.go b/lxd/rsync/rsync.go
index 17cf13f8f6..241915f7ee 100644
--- a/lxd/rsync/rsync.go
+++ b/lxd/rsync/rsync.go
@@ -231,6 +231,7 @@ func Send(name string, path string, conn 
io.ReadWriteCloser, tracker *ioprogress
 // half set up by rsync.Send), putting the contents in the directory specified
 // by path.
 func Recv(path string, conn io.ReadWriteCloser, tracker 
*ioprogress.ProgressTracker, features []string) error {
+       logger.Errorf("stgraber: in rsync recv")
        args := []string{
                "--server",
                "-vlogDtpre.iLsfx",
@@ -258,18 +259,23 @@ func Recv(path string, conn io.ReadWriteCloser, tracker 
*ioprogress.ProgressTrac
                return err
        }
 
+       logger.Errorf("stgraber: starting")
        err = cmd.Start()
        if err != nil {
                return err
        }
+       logger.Errorf("stgraber: started, waiting")
 
        err = cmd.Wait()
+       logger.Errorf("stgraber: done waiting, reading stderr")
        output, err := ioutil.ReadAll(stderr)
+       logger.Errorf("stgraber: done reading stderr")
        if err != nil {
                logger.Errorf("Rsync receive failed: %s: %s: %s", path, err, 
string(output))
                return err
        }
 
+       logger.Errorf("stgraber: out")
        return nil
 }
 

From 9ae17d4816cb97817a4ce2e9e2a666d5f1d9e274 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgra...@ubuntu.com>
Date: Mon, 28 Oct 2019 10:41:10 +0100
Subject: [PATCH 6/7] shared: DEBUG
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgra...@ubuntu.com>
---
 shared/network.go | 14 ++++++++++++++
 1 file changed, 14 insertions(+)

diff --git a/shared/network.go b/shared/network.go
index c73677df93..2eb885e3c8 100644
--- a/shared/network.go
+++ b/shared/network.go
@@ -370,6 +370,7 @@ type WebsocketIO struct {
 }
 
 func (w *WebsocketIO) Read(p []byte) (n int, err error) {
+       logger.Errorf("stgraber: read: entering: %v", w.Conn.LocalAddr())
        for {
                // First read from this message
                if w.reader == nil {
@@ -377,14 +378,18 @@ func (w *WebsocketIO) Read(p []byte) (n int, err error) {
 
                        mt, w.reader, err = w.Conn.NextReader()
                        if err != nil {
+                               logger.Errorf("stgraber: read: got bad reader")
                                return -1, err
                        }
+                       logger.Errorf("stgraber: read: new reader: %v", mt)
 
                        if mt == websocket.CloseMessage {
+                               logger.Errorf("stgraber: read: got close 
message")
                                return 0, io.EOF
                        }
 
                        if mt == websocket.TextMessage {
+                               logger.Errorf("stgraber: read: got barrier")
                                return 0, io.EOF
                        }
                }
@@ -392,38 +397,47 @@ func (w *WebsocketIO) Read(p []byte) (n int, err error) {
                // Perform the read itself
                n, err := w.reader.Read(p)
                if err == io.EOF {
+                       logger.Errorf("stgraber: read: got message EOF")
 
                        // At the end of the message, reset reader
                        w.reader = nil
 
+                       logger.Errorf("stgraber: read: EOF forwarded: %d", n)
                        return n, nil
                }
 
                if err != nil {
+                       logger.Errorf("stgraber: read: failed to read")
                        return -1, err
                }
 
+               logger.Errorf("stgraber: read: forwarded: %d", n)
                return n, nil
        }
 }
 
 func (w *WebsocketIO) Write(p []byte) (n int, err error) {
+       logger.Errorf("stgraber: write: entering: %v", w.Conn.LocalAddr())
 
        wr, err := w.Conn.NextWriter(websocket.BinaryMessage)
        if err != nil {
+               logger.Errorf("stgraber: write: failed to setup writer: %v", 
err)
                return -1, err
        }
 
        n, err = wr.Write(p)
        if err != nil {
+               logger.Errorf("stgraber: write: failed to write: %v", err)
                return -1, err
        }
        wr.Close()
 
+       logger.Errorf("stgraber: write: sent: %d", n)
        return n, nil
 }
 
 func (w *WebsocketIO) Close() error {
+       logger.Errorf("stgraber: close")
        closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, 
"")
        w.Conn.WriteMessage(websocket.CloseMessage, closeMsg)
        return w.Conn.Close()

From 1d7e3075bf65bdefc929e2fdf033eb35ced355fe Mon Sep 17 00:00:00 2001
From: Thomas Parrott <thomas.parr...@canonical.com>
Date: Mon, 28 Oct 2019 14:25:06 +0000
Subject: [PATCH 7/7] lxd/rsync/rsync: Updates Send() to work with
 ReadWriteClose

Signed-off-by: Thomas Parrott <thomas.parr...@canonical.com>
---
 lxd/rsync/rsync.go | 36 ++++++++++++++++++++++--------------
 1 file changed, 22 insertions(+), 14 deletions(-)

diff --git a/lxd/rsync/rsync.go b/lxd/rsync/rsync.go
index 241915f7ee..86e1248fc8 100644
--- a/lxd/rsync/rsync.go
+++ b/lxd/rsync/rsync.go
@@ -186,30 +186,37 @@ func Send(name string, path string, conn 
io.ReadWriteCloser, tracker *ioprogress
                return err
        }
        defer netcatConn.Close()
+       defer conn.Close()
 
-       // Setup progress tracker
-       readPipe := io.ReadCloser(netcatConn)
+       // Setup progress tracker.
+       readNetcatPipe := io.ReadCloser(netcatConn)
        if tracker != nil {
-               readPipe = &ioprogress.ProgressReader{netcatConn, tracker}
+               readNetcatPipe = &ioprogress.ProgressReader{netcatConn, tracker}
        }
 
-       // Forward from netcat to target
-       chCopy := make(chan error, 1)
+       // Forward from netcat to target.
+       chCopyNetcat := make(chan error, 1)
        go func() {
-               _, err := io.Copy(conn, readPipe)
-               chCopy <- err
+               _, err := io.Copy(conn, readNetcatPipe)
+               chCopyNetcat <- err
+               readNetcatPipe.Close()
        }()
 
-       // Wait for rsync to complete
+       // Forward from target to netcat.
+       writeNetcatPipe := io.WriteCloser(netcatConn)
+       chCopyTarget := make(chan error, 1)
+       go func() {
+               _, err := io.Copy(writeNetcatPipe, conn)
+               chCopyTarget <- err
+               writeNetcatPipe.Close()
+       }()
+
+       // Wait for rsync to complete.
        chError := make(chan error, 1)
        go func() {
                err = cmd.Wait()
-               if err != nil {
-                       netcatConn.Close()
-                       readPipe.Close()
-               }
-
                chError <- err
+               netcatConn.Close()
        }()
 
        output, err := ioutil.ReadAll(stderr)
@@ -222,7 +229,8 @@ func Send(name string, path string, conn 
io.ReadWriteCloser, tracker *ioprogress
                logger.Errorf("Rsync send failed: %s: %s: %s", path, err, 
string(output))
        }
 
-       <-chCopy
+       <-chCopyNetcat
+       <-chCopyTarget
 
        return err
 }
_______________________________________________
lxc-devel mailing list
lxc-devel@lists.linuxcontainers.org
http://lists.linuxcontainers.org/listinfo/lxc-devel

Reply via email to