The following pull request was submitted through Github. It can be accessed and reviewed at: https://github.com/lxc/lxd/pull/6360
This e-mail was sent by the LXC bot, direct replies will not reach the author unless they happen to be subscribed to this list. === Description (from pull-request) ===
From 5f631ad4eff8345ba3835c426f1e8c71a91bf0be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgra...@ubuntu.com> Date: Fri, 25 Oct 2019 15:12:35 -0400 Subject: [PATCH 1/7] lxd/rsync: Switch to using io.ReadWriteCloser MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Stéphane Graber <stgra...@ubuntu.com> --- lxd/rsync/rsync.go | 68 ++++++++++++++++++---------------------------- 1 file changed, 27 insertions(+), 41 deletions(-) diff --git a/lxd/rsync/rsync.go b/lxd/rsync/rsync.go index 0fed6d5f44..17cf13f8f6 100644 --- a/lxd/rsync/rsync.go +++ b/lxd/rsync/rsync.go @@ -10,11 +10,11 @@ import ( "syscall" "time" - "github.com/gorilla/websocket" "github.com/pborman/uuid" "github.com/lxc/lxd/lxd/daemon" "github.com/lxc/lxd/shared" + "github.com/lxc/lxd/shared/ioprogress" "github.com/lxc/lxd/shared/logger" ) @@ -180,30 +180,35 @@ func sendSetup(name string, path string, bwlimit string, execPath string, featur // Send sets up the sending half of an rsync, to recursively send the // directory pointed to by path over the websocket. -func Send(name string, path string, conn *websocket.Conn, readWrapper func(io.ReadCloser) io.ReadCloser, features []string, bwlimit string, execPath string) error { - cmd, dataSocket, stderr, err := sendSetup(name, path, bwlimit, execPath, features) +func Send(name string, path string, conn io.ReadWriteCloser, tracker *ioprogress.ProgressTracker, features []string, bwlimit string, execPath string) error { + cmd, netcatConn, stderr, err := sendSetup(name, path, bwlimit, execPath, features) if err != nil { return err } + defer netcatConn.Close() - if dataSocket != nil { - defer dataSocket.Close() + // Setup progress tracker + readPipe := io.ReadCloser(netcatConn) + if tracker != nil { + readPipe = &ioprogress.ProgressReader{netcatConn, tracker} } - readPipe := io.ReadCloser(dataSocket) - if readWrapper != nil { - readPipe = readWrapper(dataSocket) - } - - readDone, writeDone := shared.WebsocketMirror(conn, dataSocket, readPipe, nil, nil) + // Forward from netcat to target + chCopy := make(chan error, 1) + go func() { + _, err := io.Copy(conn, readPipe) + chCopy <- err + }() + // Wait for rsync to complete chError := make(chan error, 1) go func() { err = cmd.Wait() if err != nil { - dataSocket.Close() + netcatConn.Close() readPipe.Close() } + chError <- err }() @@ -217,8 +222,7 @@ func Send(name string, path string, conn *websocket.Conn, readWrapper func(io.Re logger.Errorf("Rsync send failed: %s: %s: %s", path, err, string(output)) } - <-readDone - <-writeDone + <-chCopy return err } @@ -226,7 +230,7 @@ func Send(name string, path string, conn *websocket.Conn, readWrapper func(io.Re // Recv sets up the receiving half of the websocket to rsync (the other // half set up by rsync.Send), putting the contents in the directory specified // by path. -func Recv(path string, conn *websocket.Conn, writeWrapper func(io.WriteCloser) io.WriteCloser, features []string) error { +func Recv(path string, conn io.ReadWriteCloser, tracker *ioprogress.ProgressTracker, features []string) error { args := []string{ "--server", "-vlogDtpre.iLsfx", @@ -243,48 +247,30 @@ func Recv(path string, conn *websocket.Conn, writeWrapper func(io.WriteCloser) i args = append(args, []string{".", path}...) cmd := exec.Command("rsync", args...) - - stdin, err := cmd.StdinPipe() - if err != nil { - return err - } - - stdout, err := cmd.StdoutPipe() - if err != nil { - return err + if tracker != nil { + cmd.Stdin = &ioprogress.ProgressReader{conn, tracker} } + cmd.Stdin = conn + cmd.Stdout = conn stderr, err := cmd.StderrPipe() if err != nil { return err } - if err := cmd.Start(); err != nil { - return err - } - - writePipe := io.WriteCloser(stdin) - if writeWrapper != nil { - writePipe = writeWrapper(stdin) - } - - readDone, writeDone := shared.WebsocketMirror(conn, writePipe, stdout, nil, nil) - output, err := ioutil.ReadAll(stderr) + err = cmd.Start() if err != nil { - cmd.Process.Kill() - cmd.Wait() return err } err = cmd.Wait() + output, err := ioutil.ReadAll(stderr) if err != nil { logger.Errorf("Rsync receive failed: %s: %s: %s", path, err, string(output)) + return err } - <-readDone - <-writeDone - - return err + return nil } func rsyncFeatureArgs(features []string) []string { From 4ea34114bc033e39c5a8d2ad0d4a7bef4f7c7b91 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgra...@ubuntu.com> Date: Fri, 25 Oct 2019 15:46:34 -0400 Subject: [PATCH 2/7] shared: Implement a WebsocketIO ReadWriteCloser abstraction MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Stéphane Graber <stgra...@ubuntu.com> --- shared/network.go | 66 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/shared/network.go b/shared/network.go index 00e200001e..c73677df93 100644 --- a/shared/network.go +++ b/shared/network.go @@ -363,6 +363,72 @@ func DefaultWriter(conn *websocket.Conn, w io.WriteCloser, writeDone chan<- bool w.Close() } +// WebsocketIO is a wrapper implementing ReadWriteCloser on top of websocket +type WebsocketIO struct { + Conn *websocket.Conn + reader io.Reader +} + +func (w *WebsocketIO) Read(p []byte) (n int, err error) { + for { + // First read from this message + if w.reader == nil { + var mt int + + mt, w.reader, err = w.Conn.NextReader() + if err != nil { + return -1, err + } + + if mt == websocket.CloseMessage { + return 0, io.EOF + } + + if mt == websocket.TextMessage { + return 0, io.EOF + } + } + + // Perform the read itself + n, err := w.reader.Read(p) + if err == io.EOF { + + // At the end of the message, reset reader + w.reader = nil + + return n, nil + } + + if err != nil { + return -1, err + } + + return n, nil + } +} + +func (w *WebsocketIO) Write(p []byte) (n int, err error) { + + wr, err := w.Conn.NextWriter(websocket.BinaryMessage) + if err != nil { + return -1, err + } + + n, err = wr.Write(p) + if err != nil { + return -1, err + } + wr.Close() + + return n, nil +} + +func (w *WebsocketIO) Close() error { + closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "") + w.Conn.WriteMessage(websocket.CloseMessage, closeMsg) + return w.Conn.Close() +} + // WebsocketMirror allows mirroring a reader to a websocket and taking the // result and writing it to a writer. This function allows for multiple // mirrorings and correctly negotiates stream endings. However, it means any From f6653d302022dbb06846e9366d249dacb2527dc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgra...@ubuntu.com> Date: Fri, 25 Oct 2019 15:46:53 -0400 Subject: [PATCH 3/7] lxd/migration: Introduce ProgressTracker MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Stéphane Graber <stgra...@ubuntu.com> --- lxd/migration/migration_volumes.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/lxd/migration/migration_volumes.go b/lxd/migration/migration_volumes.go index b7ecb3aebc..1fd63a502b 100644 --- a/lxd/migration/migration_volumes.go +++ b/lxd/migration/migration_volumes.go @@ -188,3 +188,16 @@ func ProgressWriter(op *operations.Operation, key string, description string) fu return writePipe } } + +// ProgressTracker returns a migration I/O tracker +func ProgressTracker(op *operations.Operation, key string, description string) *ioprogress.ProgressTracker { + progress := func(progressInt int64, speedInt int64) { + progressWrapperRender(op, key, description, progressInt, speedInt) + } + + tracker := &ioprogress.ProgressTracker{ + Handler: progress, + } + + return tracker +} From ea252b4616c02e38ae4f8f8f168caf4b61287232 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgra...@ubuntu.com> Date: Fri, 25 Oct 2019 15:47:16 -0400 Subject: [PATCH 4/7] lxd/migration: Switch over to ReadWriteCloser for rsync MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Stéphane Graber <stgra...@ubuntu.com> --- lxd/migrate_container.go | 8 +++---- lxd/storage_migration.go | 46 ++++++++++++++++++++-------------------- 2 files changed, 27 insertions(+), 27 deletions(-) diff --git a/lxd/migrate_container.go b/lxd/migrate_container.go index 69b5d362ff..fda13de567 100644 --- a/lxd/migrate_container.go +++ b/lxd/migrate_container.go @@ -267,7 +267,7 @@ func (s *migrationSourceWs) preDumpLoop(args *preDumpLoopArgs) (bool, error) { // Send the pre-dump. ctName, _, _ := shared.ContainerGetParentAndSnapshotName(s.instance.Name()) state := s.instance.DaemonState() - err = rsync.Send(ctName, shared.AddSlash(args.checkpointDir), s.criuConn, nil, args.rsyncFeatures, args.bwlimit, state.OS.ExecPath) + err = rsync.Send(ctName, shared.AddSlash(args.checkpointDir), &shared.WebsocketIO{Conn: s.criuConn}, nil, args.rsyncFeatures, args.bwlimit, state.OS.ExecPath) if err != nil { return final, err } @@ -682,7 +682,7 @@ func (s *migrationSourceWs) Do(migrateOp *operations.Operation) error { */ ctName, _, _ := shared.ContainerGetParentAndSnapshotName(s.instance.Name()) state := s.instance.DaemonState() - err = rsync.Send(ctName, shared.AddSlash(checkpointDir), s.criuConn, nil, rsyncFeatures, bwlimit, state.OS.ExecPath) + err = rsync.Send(ctName, shared.AddSlash(checkpointDir), &shared.WebsocketIO{Conn: s.criuConn}, nil, rsyncFeatures, bwlimit, state.OS.ExecPath) if err != nil { return abort(err) } @@ -1062,7 +1062,7 @@ func (c *migrationSink) Do(migrateOp *operations.Operation) error { for !sync.GetFinalPreDump() { logger.Debugf("About to receive rsync") // Transfer a CRIU pre-dump - err = rsync.Recv(shared.AddSlash(imagesDir), criuConn, nil, rsyncFeatures) + err = rsync.Recv(shared.AddSlash(imagesDir), &shared.WebsocketIO{Conn: criuConn}, nil, rsyncFeatures) if err != nil { restore <- err return @@ -1090,7 +1090,7 @@ func (c *migrationSink) Do(migrateOp *operations.Operation) error { } // Final CRIU dump - err = rsync.Recv(shared.AddSlash(imagesDir), criuConn, nil, rsyncFeatures) + err = rsync.Recv(shared.AddSlash(imagesDir), &shared.WebsocketIO{Conn: criuConn}, nil, rsyncFeatures) if err != nil { restore <- err return diff --git a/lxd/storage_migration.go b/lxd/storage_migration.go index 5dae246681..6011f304ce 100644 --- a/lxd/storage_migration.go +++ b/lxd/storage_migration.go @@ -68,23 +68,23 @@ func (s rsyncStorageSourceDriver) SendStorageVolume(conn *websocket.Conn, op *op } for _, snap := range snapshots { - wrapper := migration.ProgressReader(op, "fs_progress", snap.Name) + wrapper := migration.ProgressTracker(op, "fs_progress", snap.Name) path := driver.GetStoragePoolVolumeSnapshotMountPoint(pool.Name, snap.Name) path = shared.AddSlash(path) logger.Debugf("Starting to send storage volume snapshot %s on storage pool %s from %s", snap.Name, pool.Name, path) - err = rsync.Send(volume.Name, path, conn, wrapper, s.rsyncFeatures, bwlimit, state.OS.ExecPath) + err = rsync.Send(volume.Name, path, &shared.WebsocketIO{Conn: conn}, wrapper, s.rsyncFeatures, bwlimit, state.OS.ExecPath) if err != nil { return err } } } - wrapper := migration.ProgressReader(op, "fs_progress", volume.Name) + wrapper := migration.ProgressTracker(op, "fs_progress", volume.Name) path := driver.GetStoragePoolVolumeMountPoint(pool.Name, volume.Name) path = shared.AddSlash(path) logger.Debugf("Starting to send storage volume %s on storage pool %s from %s", volume.Name, pool.Name, path) - err = rsync.Send(volume.Name, path, conn, wrapper, s.rsyncFeatures, bwlimit, state.OS.ExecPath) + err = rsync.Send(volume.Name, path, &shared.WebsocketIO{Conn: conn}, wrapper, s.rsyncFeatures, bwlimit, state.OS.ExecPath) if err != nil { return err } @@ -106,16 +106,16 @@ func (s rsyncStorageSourceDriver) SendWhileRunning(conn *websocket.Conn, op *ope } path := send.Path() - wrapper := migration.ProgressReader(op, "fs_progress", send.Name()) + wrapper := migration.ProgressTracker(op, "fs_progress", send.Name()) state := s.container.DaemonState() - err = rsync.Send(project.Prefix(s.container.Project(), ctName), shared.AddSlash(path), conn, wrapper, s.rsyncFeatures, bwlimit, state.OS.ExecPath) + err = rsync.Send(project.Prefix(s.container.Project(), ctName), shared.AddSlash(path), &shared.WebsocketIO{Conn: conn}, wrapper, s.rsyncFeatures, bwlimit, state.OS.ExecPath) if err != nil { return err } } } - wrapper := migration.ProgressReader(op, "fs_progress", s.container.Name()) + wrapper := migration.ProgressTracker(op, "fs_progress", s.container.Name()) state := s.container.DaemonState() // Attempt to freeze the container to avoid changing files during transfer @@ -128,14 +128,14 @@ func (s rsyncStorageSourceDriver) SendWhileRunning(conn *websocket.Conn, op *ope } } - return rsync.Send(project.Prefix(s.container.Project(), ctName), shared.AddSlash(s.container.Path()), conn, wrapper, s.rsyncFeatures, bwlimit, state.OS.ExecPath) + return rsync.Send(project.Prefix(s.container.Project(), ctName), shared.AddSlash(s.container.Path()), &shared.WebsocketIO{Conn: conn}, wrapper, s.rsyncFeatures, bwlimit, state.OS.ExecPath) } func (s rsyncStorageSourceDriver) SendAfterCheckpoint(conn *websocket.Conn, bwlimit string) error { ctName, _, _ := shared.ContainerGetParentAndSnapshotName(s.container.Name()) // resync anything that changed between our first send and the checkpoint state := s.container.DaemonState() - return rsync.Send(project.Prefix(s.container.Project(), ctName), shared.AddSlash(s.container.Path()), conn, nil, s.rsyncFeatures, bwlimit, state.OS.ExecPath) + return rsync.Send(project.Prefix(s.container.Project(), ctName), shared.AddSlash(s.container.Path()), &shared.WebsocketIO{Conn: conn}, nil, s.rsyncFeatures, bwlimit, state.OS.ExecPath) } func (s rsyncStorageSourceDriver) Cleanup() { @@ -259,12 +259,12 @@ func rsyncStorageMigrationSink(conn *websocket.Conn, op *operations.Operation, a return err } - wrapper := migration.ProgressWriter(op, "fs_progress", target.Name) + wrapper := migration.ProgressTracker(op, "fs_progress", target.Name) path := driver.GetStoragePoolVolumeMountPoint(pool.Name, volume.Name) path = shared.AddSlash(path) logger.Debugf("Starting to receive storage volume snapshot %s on storage pool %s into %s", target.Name, pool.Name, path) - err = rsync.Recv(path, conn, wrapper, args.RsyncFeatures) + err = rsync.Recv(path, &shared.WebsocketIO{Conn: conn}, wrapper, args.RsyncFeatures) if err != nil { return err } @@ -276,11 +276,11 @@ func rsyncStorageMigrationSink(conn *websocket.Conn, op *operations.Operation, a } } - wrapper := migration.ProgressWriter(op, "fs_progress", volume.Name) + wrapper := migration.ProgressTracker(op, "fs_progress", volume.Name) path := driver.GetStoragePoolVolumeMountPoint(pool.Name, volume.Name) path = shared.AddSlash(path) logger.Debugf("Starting to receive storage volume %s on storage pool %s into %s", volume.Name, pool.Name, path) - return rsync.Recv(path, conn, wrapper, args.RsyncFeatures) + return rsync.Recv(path, &shared.WebsocketIO{Conn: conn}, wrapper, args.RsyncFeatures) } func rsyncMigrationSink(conn *websocket.Conn, op *operations.Operation, args MigrationSinkArgs) error { @@ -356,8 +356,8 @@ func rsyncMigrationSink(conn *websocket.Conn, op *operations.Operation, args Mig } } - wrapper := migration.ProgressWriter(op, "fs_progress", s.Name()) - if err := rsync.Recv(shared.AddSlash(s.Path()), conn, wrapper, args.RsyncFeatures); err != nil { + wrapper := migration.ProgressTracker(op, "fs_progress", s.Name()) + if err := rsync.Recv(shared.AddSlash(s.Path()), &shared.WebsocketIO{Conn: conn}, wrapper, args.RsyncFeatures); err != nil { return err } @@ -371,8 +371,8 @@ func rsyncMigrationSink(conn *websocket.Conn, op *operations.Operation, args Mig } } - wrapper := migration.ProgressWriter(op, "fs_progress", args.Instance.Name()) - err = rsync.Recv(shared.AddSlash(args.Instance.Path()), conn, wrapper, args.RsyncFeatures) + wrapper := migration.ProgressTracker(op, "fs_progress", args.Instance.Name()) + err = rsync.Recv(shared.AddSlash(args.Instance.Path()), &shared.WebsocketIO{Conn: conn}, wrapper, args.RsyncFeatures) if err != nil { return err } @@ -409,8 +409,8 @@ func rsyncMigrationSink(conn *websocket.Conn, op *operations.Operation, args Mig } } - wrapper := migration.ProgressWriter(op, "fs_progress", snap.GetName()) - err := rsync.Recv(shared.AddSlash(args.Instance.Path()), conn, wrapper, args.RsyncFeatures) + wrapper := migration.ProgressTracker(op, "fs_progress", snap.GetName()) + err := rsync.Recv(shared.AddSlash(args.Instance.Path()), &shared.WebsocketIO{Conn: conn}, wrapper, args.RsyncFeatures) if err != nil { return err } @@ -434,8 +434,8 @@ func rsyncMigrationSink(conn *websocket.Conn, op *operations.Operation, args Mig } } - wrapper := migration.ProgressWriter(op, "fs_progress", args.Instance.Name()) - err = rsync.Recv(shared.AddSlash(args.Instance.Path()), conn, wrapper, args.RsyncFeatures) + wrapper := migration.ProgressTracker(op, "fs_progress", args.Instance.Name()) + err = rsync.Recv(shared.AddSlash(args.Instance.Path()), &shared.WebsocketIO{Conn: conn}, wrapper, args.RsyncFeatures) if err != nil { return err } @@ -443,8 +443,8 @@ func rsyncMigrationSink(conn *websocket.Conn, op *operations.Operation, args Mig if args.Live { /* now receive the final sync */ - wrapper := migration.ProgressWriter(op, "fs_progress", args.Instance.Name()) - err := rsync.Recv(shared.AddSlash(args.Instance.Path()), conn, wrapper, args.RsyncFeatures) + wrapper := migration.ProgressTracker(op, "fs_progress", args.Instance.Name()) + err := rsync.Recv(shared.AddSlash(args.Instance.Path()), &shared.WebsocketIO{Conn: conn}, wrapper, args.RsyncFeatures) if err != nil { return err } From 489b82dc0d6ebaf8aea4cacf7af7de4cdc4ce863 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgra...@ubuntu.com> Date: Mon, 28 Oct 2019 10:39:46 +0100 Subject: [PATCH 5/7] lxd/rsync: DEBUG MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Stéphane Graber <stgra...@ubuntu.com> --- lxd/rsync/rsync.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/lxd/rsync/rsync.go b/lxd/rsync/rsync.go index 17cf13f8f6..241915f7ee 100644 --- a/lxd/rsync/rsync.go +++ b/lxd/rsync/rsync.go @@ -231,6 +231,7 @@ func Send(name string, path string, conn io.ReadWriteCloser, tracker *ioprogress // half set up by rsync.Send), putting the contents in the directory specified // by path. func Recv(path string, conn io.ReadWriteCloser, tracker *ioprogress.ProgressTracker, features []string) error { + logger.Errorf("stgraber: in rsync recv") args := []string{ "--server", "-vlogDtpre.iLsfx", @@ -258,18 +259,23 @@ func Recv(path string, conn io.ReadWriteCloser, tracker *ioprogress.ProgressTrac return err } + logger.Errorf("stgraber: starting") err = cmd.Start() if err != nil { return err } + logger.Errorf("stgraber: started, waiting") err = cmd.Wait() + logger.Errorf("stgraber: done waiting, reading stderr") output, err := ioutil.ReadAll(stderr) + logger.Errorf("stgraber: done reading stderr") if err != nil { logger.Errorf("Rsync receive failed: %s: %s: %s", path, err, string(output)) return err } + logger.Errorf("stgraber: out") return nil } From 9ae17d4816cb97817a4ce2e9e2a666d5f1d9e274 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgra...@ubuntu.com> Date: Mon, 28 Oct 2019 10:41:10 +0100 Subject: [PATCH 6/7] shared: DEBUG MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Stéphane Graber <stgra...@ubuntu.com> --- shared/network.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/shared/network.go b/shared/network.go index c73677df93..2eb885e3c8 100644 --- a/shared/network.go +++ b/shared/network.go @@ -370,6 +370,7 @@ type WebsocketIO struct { } func (w *WebsocketIO) Read(p []byte) (n int, err error) { + logger.Errorf("stgraber: read: entering: %v", w.Conn.LocalAddr()) for { // First read from this message if w.reader == nil { @@ -377,14 +378,18 @@ func (w *WebsocketIO) Read(p []byte) (n int, err error) { mt, w.reader, err = w.Conn.NextReader() if err != nil { + logger.Errorf("stgraber: read: got bad reader") return -1, err } + logger.Errorf("stgraber: read: new reader: %v", mt) if mt == websocket.CloseMessage { + logger.Errorf("stgraber: read: got close message") return 0, io.EOF } if mt == websocket.TextMessage { + logger.Errorf("stgraber: read: got barrier") return 0, io.EOF } } @@ -392,38 +397,47 @@ func (w *WebsocketIO) Read(p []byte) (n int, err error) { // Perform the read itself n, err := w.reader.Read(p) if err == io.EOF { + logger.Errorf("stgraber: read: got message EOF") // At the end of the message, reset reader w.reader = nil + logger.Errorf("stgraber: read: EOF forwarded: %d", n) return n, nil } if err != nil { + logger.Errorf("stgraber: read: failed to read") return -1, err } + logger.Errorf("stgraber: read: forwarded: %d", n) return n, nil } } func (w *WebsocketIO) Write(p []byte) (n int, err error) { + logger.Errorf("stgraber: write: entering: %v", w.Conn.LocalAddr()) wr, err := w.Conn.NextWriter(websocket.BinaryMessage) if err != nil { + logger.Errorf("stgraber: write: failed to setup writer: %v", err) return -1, err } n, err = wr.Write(p) if err != nil { + logger.Errorf("stgraber: write: failed to write: %v", err) return -1, err } wr.Close() + logger.Errorf("stgraber: write: sent: %d", n) return n, nil } func (w *WebsocketIO) Close() error { + logger.Errorf("stgraber: close") closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "") w.Conn.WriteMessage(websocket.CloseMessage, closeMsg) return w.Conn.Close() From 1d7e3075bf65bdefc929e2fdf033eb35ced355fe Mon Sep 17 00:00:00 2001 From: Thomas Parrott <thomas.parr...@canonical.com> Date: Mon, 28 Oct 2019 14:25:06 +0000 Subject: [PATCH 7/7] lxd/rsync/rsync: Updates Send() to work with ReadWriteClose Signed-off-by: Thomas Parrott <thomas.parr...@canonical.com> --- lxd/rsync/rsync.go | 36 ++++++++++++++++++++++-------------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/lxd/rsync/rsync.go b/lxd/rsync/rsync.go index 241915f7ee..86e1248fc8 100644 --- a/lxd/rsync/rsync.go +++ b/lxd/rsync/rsync.go @@ -186,30 +186,37 @@ func Send(name string, path string, conn io.ReadWriteCloser, tracker *ioprogress return err } defer netcatConn.Close() + defer conn.Close() - // Setup progress tracker - readPipe := io.ReadCloser(netcatConn) + // Setup progress tracker. + readNetcatPipe := io.ReadCloser(netcatConn) if tracker != nil { - readPipe = &ioprogress.ProgressReader{netcatConn, tracker} + readNetcatPipe = &ioprogress.ProgressReader{netcatConn, tracker} } - // Forward from netcat to target - chCopy := make(chan error, 1) + // Forward from netcat to target. + chCopyNetcat := make(chan error, 1) go func() { - _, err := io.Copy(conn, readPipe) - chCopy <- err + _, err := io.Copy(conn, readNetcatPipe) + chCopyNetcat <- err + readNetcatPipe.Close() }() - // Wait for rsync to complete + // Forward from target to netcat. + writeNetcatPipe := io.WriteCloser(netcatConn) + chCopyTarget := make(chan error, 1) + go func() { + _, err := io.Copy(writeNetcatPipe, conn) + chCopyTarget <- err + writeNetcatPipe.Close() + }() + + // Wait for rsync to complete. chError := make(chan error, 1) go func() { err = cmd.Wait() - if err != nil { - netcatConn.Close() - readPipe.Close() - } - chError <- err + netcatConn.Close() }() output, err := ioutil.ReadAll(stderr) @@ -222,7 +229,8 @@ func Send(name string, path string, conn io.ReadWriteCloser, tracker *ioprogress logger.Errorf("Rsync send failed: %s: %s: %s", path, err, string(output)) } - <-chCopy + <-chCopyNetcat + <-chCopyTarget return err }
_______________________________________________ lxc-devel mailing list lxc-devel@lists.linuxcontainers.org http://lists.linuxcontainers.org/listinfo/lxc-devel