The following pull request was submitted through Github.
It can be accessed and reviewed at: https://github.com/lxc/lxd/pull/7160

This e-mail was sent by the LXC bot, direct replies will not reach the author
unless they happen to be subscribed to this list.

=== Description (from pull-request) ===
- Fixes migration from ZFS storage pools.
- Improves error reporting back to user and avoids hangs when an error occurs.
From 95193963ff710234baee7553b24d6d43dd550250 Mon Sep 17 00:00:00 2001
From: Thomas Parrott <thomas.parr...@canonical.com>
Date: Wed, 8 Apr 2020 16:56:54 +0100
Subject: [PATCH 1/4] lxd/storage/drivers/generic/vfs: Log when creating
 snapshots

Helpful when dir driver takes a long time creating snapshot.

Signed-off-by: Thomas Parrott <thomas.parr...@canonical.com>
---
 lxd/storage/drivers/generic_vfs.go | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/lxd/storage/drivers/generic_vfs.go 
b/lxd/storage/drivers/generic_vfs.go
index 56cdd9564b..88d81e8720 100644
--- a/lxd/storage/drivers/generic_vfs.go
+++ b/lxd/storage/drivers/generic_vfs.go
@@ -361,6 +361,7 @@ func genericVFSCreateVolumeFromMigration(d Driver, 
initVolume func(vol Volume) (
                        }
 
                        // Create the snapshot itself.
+                       d.Logger().Debug("Creating snapshot", 
log.Ctx{"volName": snapVol.Name()})
                        err = d.CreateVolumeSnapshot(snapVol, op)
                        if err != nil {
                                return err
@@ -860,6 +861,7 @@ func genericVFSCopyVolume(d Driver, initVolume func(vol 
Volume) (func(), error),
                                snapVol := NewVolume(d, d.Name(), vol.volType, 
vol.contentType, fullSnapName, vol.config, vol.poolConfig)
 
                                // Create the snapshot itself.
+                               d.Logger().Debug("Creating snapshot", 
log.Ctx{"volName": snapVol.Name()})
                                err = d.CreateVolumeSnapshot(snapVol, op)
                                if err != nil {
                                        return err

From 81bf97b9220eebca8915a40f6ae7f68d7521a90e Mon Sep 17 00:00:00 2001
From: Thomas Parrott <thomas.parr...@canonical.com>
Date: Wed, 8 Apr 2020 18:17:16 +0100
Subject: [PATCH 2/4] lxd/storage/drivers/driver/zfs/volumes: Fix migrating VM
 block volumes in MigrateVolume

Signed-off-by: Thomas Parrott <thomas.parr...@canonical.com>
---
 lxd/storage/drivers/driver_zfs_volumes.go | 26 +++++++++++++++++++++++
 1 file changed, 26 insertions(+)

diff --git a/lxd/storage/drivers/driver_zfs_volumes.go 
b/lxd/storage/drivers/driver_zfs_volumes.go
index e1e11fdddc..c2b7ed1996 100644
--- a/lxd/storage/drivers/driver_zfs_volumes.go
+++ b/lxd/storage/drivers/driver_zfs_volumes.go
@@ -1121,6 +1121,32 @@ func (d *zfs) RenameVolume(vol Volume, newVolName 
string, op *operations.Operati
 func (d *zfs) MigrateVolume(vol Volume, conn io.ReadWriteCloser, volSrcArgs 
*migration.VolumeSourceArgs, op *operations.Operation) error {
        // Handle simple rsync and block_and_rsync through generic.
        if volSrcArgs.MigrationType.FSType == migration.MigrationFSType_RSYNC 
|| volSrcArgs.MigrationType.FSType == migration.MigrationFSType_BLOCK_AND_RSYNC 
{
+               // We need to mount the parent volume before calling 
genericVFSMigrateVolume for two reasons.
+               // 1. In order to get the block device disk path to read from 
the device must be activated.
+               // 2. If copying snapshots the parent volume must be activated 
before the snapshot volume's block
+               // device can be made visible.
+               parent, _, _ := 
shared.InstanceGetParentAndSnapshotName(vol.Name())
+               parentVol := NewVolume(d, d.Name(), vol.volType, 
vol.contentType, parent, vol.config, vol.poolConfig)
+               ourMount, err := d.MountVolume(parentVol, op)
+               if err != nil {
+                       return err
+               }
+               if ourMount {
+                       defer d.UnmountVolume(parentVol, op)
+               }
+
+               // In addition to above, if the volume we are sending is a 
snapshot, we also need to mount that
+               // so that genericVFSMigrateVolume can discover its block 
device (same reason as 1. above).
+               if vol.IsSnapshot() {
+                       ourMount, err = d.MountVolumeSnapshot(vol, op)
+                       if err != nil {
+                               return err
+                       }
+                       if ourMount {
+                               defer d.UnmountVolumeSnapshot(vol, op)
+                       }
+               }
+
                return genericVFSMigrateVolume(d, d.state, vol, conn, 
volSrcArgs, op)
        } else if volSrcArgs.MigrationType.FSType != 
migration.MigrationFSType_ZFS {
                return ErrNotSupported

From 4639ebf9fc73522537cf57094de17be5def76105 Mon Sep 17 00:00:00 2001
From: Thomas Parrott <thomas.parr...@canonical.com>
Date: Wed, 8 Apr 2020 18:17:52 +0100
Subject: [PATCH 3/4] lxd/storage/memorypipe: Adds context support for
 cancellation

Signed-off-by: Thomas Parrott <thomas.parr...@canonical.com>
---
 lxd/storage/memorypipe/memory_pipe.go | 44 ++++++++++++++++++++-------
 1 file changed, 33 insertions(+), 11 deletions(-)

diff --git a/lxd/storage/memorypipe/memory_pipe.go 
b/lxd/storage/memorypipe/memory_pipe.go
index 79a1b03c6e..d8628816b3 100644
--- a/lxd/storage/memorypipe/memory_pipe.go
+++ b/lxd/storage/memorypipe/memory_pipe.go
@@ -1,6 +1,8 @@
 package memorypipe
 
 import (
+       "context"
+       "fmt"
        "io"
 )
 
@@ -21,17 +23,22 @@ type msg struct {
 // connection to be used for multiple sessions.
 type pipe struct {
        ch       chan msg
+       ctx      context.Context
        otherEnd *pipe
 }
 
 // Read reads from the pipe into p. Returns number of bytes read and any 
errors.
 func (p *pipe) Read(b []byte) (int, error) {
-       msg := <-p.ch
-       if msg.err == io.EOF {
-               return -1, msg.err
+       select {
+       case msg := <-p.ch:
+               if msg.err == io.EOF {
+                       return -1, msg.err
+               }
+               n := copy(b, msg.data)
+               return n, msg.err
+       case <-p.ctx.Done():
+               return -1, fmt.Errorf("Context done")
        }
-       n := copy(b, msg.data)
-       return n, msg.err
 }
 
 // Write writes to the pipe from p. Returns number of bytes written and any 
errors.
@@ -40,8 +47,13 @@ func (p *pipe) Write(b []byte) (int, error) {
                data: append(b[:0:0], b...), // Create copy of b in case it is 
modified externally.
                err:  nil,
        }
-       p.otherEnd.ch <- msg // Send msg to the other side's Read function.
-       return len(msg.data), msg.err
+
+       select {
+       case p.otherEnd.ch <- msg: // Sent msg to the other side's Read 
function.
+               return len(msg.data), msg.err
+       case <-p.ctx.Done():
+               return -1, fmt.Errorf("Context done")
+       }
 }
 
 // Close is unusual in that it doesn't actually close the pipe. Instead it 
sends an io.EOF error
@@ -49,23 +61,33 @@ func (p *pipe) Write(b []byte) (int, error) {
 // Each call to Close will indicate to the other side that a session has 
ended, whilst allowing the
 // reuse of a single persistent pipe for multiple sessions.
 func (p *pipe) Close() error {
-       p.otherEnd.ch <- msg{
+       msg := msg{
                data: nil,
                err:  io.EOF, // Indicates to the other side's Read function 
that session has ended.
        }
+
+       select {
+       case p.otherEnd.ch <- msg: // Sent msg to the other side's Read 
function.
+               return nil
+       case <-p.ctx.Done():
+               return fmt.Errorf("Context done")
+       }
+
        return nil
 }
 
 // NewPipePair returns a pair of io.ReadWriterCloser pipes that are connected 
together such that
 // writes to one will appear as reads on the other and vice versa. Calling 
Close() on one end will
 // indicate to the other end that the session has ended.
-func NewPipePair() (io.ReadWriteCloser, io.ReadWriteCloser) {
+func NewPipePair(ctx context.Context) (io.ReadWriteCloser, io.ReadWriteCloser) 
{
        aEnd := &pipe{
-               ch: make(chan msg, bufferSize),
+               ch:  make(chan msg, bufferSize),
+               ctx: ctx,
        }
 
        bEnd := &pipe{
-               ch: make(chan msg, bufferSize),
+               ch:  make(chan msg, bufferSize),
+               ctx: ctx,
        }
 
        aEnd.otherEnd = bEnd

From d2a58f17e570e76517a74204cffd45dcb3d73c0e Mon Sep 17 00:00:00 2001
From: Thomas Parrott <thomas.parr...@canonical.com>
Date: Wed, 8 Apr 2020 18:18:14 +0100
Subject: [PATCH 4/4] lxd/storage/backend/lxd: memorypipe cancellation usage

Signed-off-by: Thomas Parrott <thomas.parr...@canonical.com>
---
 lxd/storage/backend_lxd.go | 19 ++++++++++++++++---
 1 file changed, 16 insertions(+), 3 deletions(-)

diff --git a/lxd/storage/backend_lxd.go b/lxd/storage/backend_lxd.go
index 4621290a7f..06db8dd65a 100644
--- a/lxd/storage/backend_lxd.go
+++ b/lxd/storage/backend_lxd.go
@@ -1,6 +1,7 @@
 package storage
 
 import (
+       "context"
        "fmt"
        "io"
        "os"
@@ -679,8 +680,10 @@ func (b *lxdBackend) CreateInstanceFromCopy(inst 
instance.Instance, src instance
                        }
                }
 
+               ctx, cancel := context.WithCancel(context.Background())
+
                // Use in-memory pipe pair to simulate a connection between the 
sender and receiver.
-               aEnd, bEnd := memorypipe.NewPipePair()
+               aEnd, bEnd := memorypipe.NewPipePair(ctx)
 
                // Negotiate the migration type to use.
                offeredTypes := srcPool.MigrationTypes(contentType, false)
@@ -701,6 +704,7 @@ func (b *lxdBackend) CreateInstanceFromCopy(inst 
instance.Instance, src instance
                                TrackProgress: true, // Do use a progress 
tracker on sender.
                        }, op)
 
+                       cancel()
                        aEndErrCh <- err
                }()
 
@@ -712,6 +716,7 @@ func (b *lxdBackend) CreateInstanceFromCopy(inst 
instance.Instance, src instance
                                TrackProgress: false, // Do not use a progress 
tracker on receiver.
                        }, op)
 
+                       cancel()
                        bEndErrCh <- err
                }()
 
@@ -820,8 +825,10 @@ func (b *lxdBackend) RefreshInstance(inst 
instance.Instance, src instance.Instan
                        snapshotNames = append(snapshotNames, snapShotName)
                }
 
+               ctx, cancel := context.WithCancel(context.Background())
+
                // Use in-memory pipe pair to simulate a connection between the 
sender and receiver.
-               aEnd, bEnd := memorypipe.NewPipePair()
+               aEnd, bEnd := memorypipe.NewPipePair(ctx)
 
                // Negotiate the migration type to use.
                offeredTypes := srcPool.MigrationTypes(contentType, true)
@@ -842,6 +849,7 @@ func (b *lxdBackend) RefreshInstance(inst 
instance.Instance, src instance.Instan
                                TrackProgress: true, // Do use a progress 
tracker on sender.
                        }, op)
 
+                       cancel()
                        aEndErrCh <- err
                }()
 
@@ -854,6 +862,7 @@ func (b *lxdBackend) RefreshInstance(inst 
instance.Instance, src instance.Instan
                                TrackProgress: false, // Do not use a progress 
tracker on receiver.
                        }, op)
 
+                       cancel()
                        bEndErrCh <- err
                }()
 
@@ -2261,8 +2270,10 @@ func (b *lxdBackend) 
CreateCustomVolumeFromCopy(projectName string, volName stri
        // to negotiate a common transfer method between pool types.
        logger.Debug("CreateCustomVolumeFromCopy cross-pool mode detected")
 
+       ctx, cancel := context.WithCancel(context.Background())
+
        // Use in-memory pipe pair to simulate a connection between the sender 
and receiver.
-       aEnd, bEnd := memorypipe.NewPipePair()
+       aEnd, bEnd := memorypipe.NewPipePair(ctx)
 
        // Negotiate the migration type to use.
        offeredTypes := srcPool.MigrationTypes(drivers.ContentTypeFS, false)
@@ -2283,6 +2294,7 @@ func (b *lxdBackend) 
CreateCustomVolumeFromCopy(projectName string, volName stri
                        TrackProgress: true, // Do use a progress tracker on 
sender.
                }, op)
 
+               cancel()
                aEndErrCh <- err
        }()
 
@@ -2297,6 +2309,7 @@ func (b *lxdBackend) 
CreateCustomVolumeFromCopy(projectName string, volName stri
 
                }, op)
 
+               cancel()
                bEndErrCh <- err
        }()
 
_______________________________________________
lxc-devel mailing list
lxc-devel@lists.linuxcontainers.org
http://lists.linuxcontainers.org/listinfo/lxc-devel

Reply via email to