The following pull request was submitted through Github.
It can be accessed and reviewed at: https://github.com/lxc/lxd/pull/1718

This e-mail was sent by the LXC bot, direct replies will not reach the author
unless they happen to be subscribed to this list.

=== Description (from pull-request) ===
This breaks up live migrations into two parts: first we send a snapshot of
the filesystem while the container is running (and all of its snapshots),
then we do the checkpoint and do a final incremental send. The goal here is
to minimize the amount of downtime for the container.

Signed-off-by: Tycho Andersen <tycho.ander...@canonical.com>
From 390975ac7157ed9c229f63c7a53ba625ee86cc18 Mon Sep 17 00:00:00 2001
From: Tycho Andersen <tycho.ander...@canonical.com>
Date: Thu, 3 Mar 2016 07:48:19 -0700
Subject: [PATCH] migration: attempt to be slightly smart about moving
 filesystems

This breaks up live migrations into two parts: first we send a snapshot of
the filesystem while the container is running (and all of its snapshots),
then we do the checkpoint and do a final incremental send. The goal here is
to minimize the amount of downtime for the container.

Signed-off-by: Tycho Andersen <tycho.ander...@canonical.com>
---
 lxd/migrate.go       | 163 ++++++++++++++++++++---------------------
 lxd/storage.go       |  86 ++++++++++++++--------
 lxd/storage_btrfs.go | 192 ++++++++++++++++++++++++++-----------------------
 lxd/storage_dir.go   |   6 +-
 lxd/storage_lvm.go   |   6 +-
 lxd/storage_zfs.go   | 199 ++++++++++++++++++++++++++++++---------------------
 6 files changed, 368 insertions(+), 284 deletions(-)

diff --git a/lxd/migrate.go b/lxd/migrate.go
index e6ec1a1..0150e66 100644
--- a/lxd/migrate.go
+++ b/lxd/migrate.go
@@ -299,25 +299,15 @@ func (s *migrationSourceWs) Do(op *operation) error {
                }
        }
 
-       sources, fsErr := s.container.Storage().MigrationSource(s.container)
+       driver, fsErr := s.container.Storage().MigrationSource(s.container)
        /* the protocol says we have to send a header no matter what, so let's
         * do that, but then immediately send an error.
         */
        snapshots := []string{}
        if fsErr == nil {
-               /* A bit of a special case here: doing lxc launch
-                * host2:c1/snap1 host1:container we're sending a snapshot, but
-                * it ends up as the container on the other end. So, we want to
-                * send it as the main container (i.e. ignore its IsSnapshot()).
-                */
-               if len(sources) > 1 {
-                       for _, snap := range sources {
-                               if !snap.IsSnapshot() {
-                                       continue
-                               }
-                               name := shared.ExtractSnapshotName(snap.Name())
-                               snapshots = append(snapshots, name)
-                       }
+               fullSnaps := driver.Snapshots()
+               for _, snap := range fullSnaps {
+                       snapshots = append(snapshots, 
shared.ExtractSnapshotName(snap.Name()))
                }
        }
 
@@ -348,7 +338,14 @@ func (s *migrationSourceWs) Do(op *operation) error {
                myType = MigrationFSType_RSYNC
                header.Fs = &myType
 
-               sources, _ = rsyncMigrationSource(s.container)
+               driver, _ = rsyncMigrationSource(s.container)
+       }
+
+       defer driver.Cleanup()
+
+       if err := driver.SendWhileRunning(s.fsConn); err != nil {
+               s.sendControl(err)
+               return err
        }
 
        if s.live {
@@ -402,11 +399,8 @@ func (s *migrationSourceWs) Do(op *operation) error {
                        s.sendControl(err)
                        return err
                }
-       }
 
-       for _, source := range sources {
-               shared.Debugf("sending fs object %s", source.Name())
-               if err := source.Send(s.fsConn); err != nil {
+               if err := driver.SendAfterCheckpoint(s.fsConn); err != nil {
                        s.sendControl(err)
                        return err
                }
@@ -536,12 +530,68 @@ func (c *migrationSink) do() error {
                imagesDir := ""
                srcIdmap := new(shared.IdmapSet)
 
+               snapshots := []container{}
+               for _, snap := range header.Snapshots {
+                       // TODO: we need to propagate snapshot configurations
+                       // as well. Right now the container configuration is
+                       // done through the initial migration post. Should we
+                       // post the snapshots and their configs as well, or do
+                       // it some other way?
+                       name := c.container.Name() + shared.SnapshotDelimiter + 
snap
+                       args := containerArgs{
+                               Ctype:        cTypeSnapshot,
+                               Config:       c.container.LocalConfig(),
+                               Profiles:     c.container.Profiles(),
+                               Ephemeral:    c.container.IsEphemeral(),
+                               Architecture: c.container.Architecture(),
+                               Devices:      c.container.LocalDevices(),
+                               Name:         name,
+                       }
+
+                       ct, err := 
containerCreateEmptySnapshot(c.container.Daemon(), args)
+                       if err != nil {
+                               restore <- err
+                               return
+                       }
+                       snapshots = append(snapshots, ct)
+               }
+
+               for _, idmap := range header.Idmap {
+                       e := shared.IdmapEntry{
+                               Isuid:    *idmap.Isuid,
+                               Isgid:    *idmap.Isgid,
+                               Nsid:     int(*idmap.Nsid),
+                               Hostid:   int(*idmap.Hostid),
+                               Maprange: int(*idmap.Maprange)}
+                       srcIdmap.Idmap = shared.Extend(srcIdmap.Idmap, e)
+               }
+
+               /* We do the fs receive in parallel so we don't have to reason
+                * about when to receive what. The sending side is smart enough
+                * to send the filesystem bits that it can before it seizes the
+                * container to start checkpointing, so the total transfer time
+                * will be minimized even if we're dumb here.
+                */
+               fsTransfer := make(chan error)
+               go func() {
+                       if err := mySink(c.live, c.container, snapshots, 
c.fsConn); err != nil {
+                               fsTransfer <- err
+                               return
+                       }
+
+                       if err := ShiftIfNecessary(c.container, srcIdmap); err 
!= nil {
+                               fsTransfer <- err
+                               return
+                       }
+
+                       fsTransfer <- nil
+               }()
+
                if c.live {
                        var err error
                        imagesDir, err = ioutil.TempDir("", "lxd_restore_")
                        if err != nil {
                                os.RemoveAll(imagesDir)
-                               c.sendControl(err)
                                return
                        }
 
@@ -560,8 +610,6 @@ func (c *migrationSink) do() error {
 
                        if err := RsyncRecv(shared.AddSlash(imagesDir), 
c.criuConn); err != nil {
                                restore <- err
-                               os.RemoveAll(imagesDir)
-                               c.sendControl(err)
                                return
                        }
 
@@ -574,70 +622,17 @@ func (c *migrationSink) do() error {
                        if !c.container.IsPrivileged() {
                                if err := 
c.container.IdmapSet().ShiftRootfs(imagesDir); err != nil {
                                        restore <- err
-                                       os.RemoveAll(imagesDir)
-                                       c.sendControl(err)
                                        return
                                }
                        }
                }
 
-               snapshots := []container{}
-               for _, snap := range header.Snapshots {
-                       // TODO: we need to propagate snapshot configurations
-                       // as well. Right now the container configuration is
-                       // done through the initial migration post. Should we
-                       // post the snapshots and their configs as well, or do
-                       // it some other way?
-                       name := c.container.Name() + shared.SnapshotDelimiter + 
snap
-                       args := containerArgs{
-                               Ctype:        cTypeSnapshot,
-                               Config:       c.container.LocalConfig(),
-                               Profiles:     c.container.Profiles(),
-                               Ephemeral:    c.container.IsEphemeral(),
-                               Architecture: c.container.Architecture(),
-                               Devices:      c.container.LocalDevices(),
-                               Name:         name,
-                       }
-
-                       ct, err := 
containerCreateEmptySnapshot(c.container.Daemon(), args)
-                       if err != nil {
-                               restore <- err
-                               c.sendControl(err)
-                               return
-                       }
-                       snapshots = append(snapshots, ct)
-               }
-
-               for _, idmap := range header.Idmap {
-                       e := shared.IdmapEntry{
-                               Isuid:    *idmap.Isuid,
-                               Isgid:    *idmap.Isgid,
-                               Nsid:     int(*idmap.Nsid),
-                               Hostid:   int(*idmap.Hostid),
-                               Maprange: int(*idmap.Maprange)}
-                       srcIdmap.Idmap = shared.Extend(srcIdmap.Idmap, e)
-               }
-
-               if err := mySink(c.container, snapshots, c.fsConn); err != nil {
-                       restore <- err
-                       c.sendControl(err)
-                       return
-               }
-
-               if err := ShiftIfNecessary(c.container, srcIdmap); err != nil {
+               err := <-fsTransfer
+               if err != nil {
                        restore <- err
-                       c.sendControl(err)
                        return
                }
 
-               for _, snap := range snapshots {
-                       if err := ShiftIfNecessary(snap, srcIdmap); err != nil {
-                               restore <- err
-                               c.sendControl(err)
-                               return
-                       }
-               }
-
                if c.live {
                        err := c.container.StartFromMigration(imagesDir)
                        if err != nil {
@@ -648,12 +643,20 @@ func (c *migrationSink) do() error {
                                        log = err.Error()
                                }
                                err = fmt.Errorf("restore failed:\n%s", log)
+                               restore <- err
+                               return
                        }
 
-                       restore <- err
-               } else {
-                       restore <- nil
                }
+
+               for _, snap := range snapshots {
+                       if err := ShiftIfNecessary(snap, srcIdmap); err != nil {
+                               restore <- err
+                               return
+                       }
+               }
+
+               restore <- nil
        }(c)
 
        source := c.controlChannel()
diff --git a/lxd/storage.go b/lxd/storage.go
index 2af685a..5b3863a 100644
--- a/lxd/storage.go
+++ b/lxd/storage.go
@@ -109,10 +109,26 @@ func storageTypeToString(sType storageType) string {
        return "dir"
 }
 
-type MigrationStorageSource interface {
-       Name() string
-       IsSnapshot() bool
-       Send(conn *websocket.Conn) error
+type MigrationStorageSourceDriver interface {
+       /* snapshots for this container, if any */
+       Snapshots() []container
+
+       /* send any bits of the container/snapshots that are possible while the
+        * container is still running.
+        */
+       SendWhileRunning(conn *websocket.Conn) error
+
+       /* send the final bits (e.g. a final delta snapshot for zfs, btrfs, or
+        * do a final rsync) of the fs after the container has been
+        * checkpointed. This will only be called when a container is actually
+        * being live migrated.
+        */
+       SendAfterCheckpoint(conn *websocket.Conn) error
+
+       /* Called after either success or failure of a migration, can be used
+        * to clean up any temporary snapshots, etc.
+        */
+       Cleanup()
 }
 
 type storage interface {
@@ -170,8 +186,8 @@ type storage interface {
        // We leave sending containers which are snapshots of other containers
        // already present on the target instance as an exercise for the
        // enterprising developer.
-       MigrationSource(container container) ([]MigrationStorageSource, error)
-       MigrationSink(container container, objects []container, conn 
*websocket.Conn) error
+       MigrationSource(container container) (MigrationStorageSourceDriver, 
error)
+       MigrationSink(live bool, container container, objects []container, conn 
*websocket.Conn) error
 }
 
 func newStorage(d *Daemon, sType storageType) (storage, error) {
@@ -521,23 +537,24 @@ func (lw *storageLogWrapper) MigrationType() 
MigrationFSType {
        return lw.w.MigrationType()
 }
 
-func (lw *storageLogWrapper) MigrationSource(container container) 
([]MigrationStorageSource, error) {
+func (lw *storageLogWrapper) MigrationSource(container container) 
(MigrationStorageSourceDriver, error) {
        lw.log.Debug("MigrationSource", log.Ctx{"container": container.Name()})
        return lw.w.MigrationSource(container)
 }
 
-func (lw *storageLogWrapper) MigrationSink(container container, objects 
[]container, conn *websocket.Conn) error {
+func (lw *storageLogWrapper) MigrationSink(live bool, container container, 
objects []container, conn *websocket.Conn) error {
        objNames := []string{}
        for _, obj := range objects {
                objNames = append(objNames, obj.Name())
        }
 
        lw.log.Debug("MigrationSink", log.Ctx{
+               "live":      live,
                "container": container.Name(),
                "objects":   objNames,
        })
 
-       return lw.w.MigrationSink(container, objects, conn)
+       return lw.w.MigrationSink(live, container, objects, conn)
 }
 
 func ShiftIfNecessary(container container, srcIdmap *shared.IdmapSet) error {
@@ -567,41 +584,47 @@ func ShiftIfNecessary(container container, srcIdmap 
*shared.IdmapSet) error {
        return nil
 }
 
-type rsyncStorageSource struct {
+type rsyncStorageSourceDriver struct {
        container container
+       snapshots []container
 }
 
-func (s *rsyncStorageSource) Name() string {
-       return s.container.Name()
+func (s rsyncStorageSourceDriver) Snapshots() []container {
+       return s.snapshots
 }
 
-func (s *rsyncStorageSource) IsSnapshot() bool {
-       return s.container.IsSnapshot()
+func (s rsyncStorageSourceDriver) SendWhileRunning(conn *websocket.Conn) error 
{
+       toSend := append([]container{s.container}, s.snapshots...)
+
+       for _, send := range toSend {
+               path := send.Path()
+               if err := RsyncSend(shared.AddSlash(path), conn); err != nil {
+                       return err
+               }
+       }
+
+       return nil
 }
 
-func (s *rsyncStorageSource) Send(conn *websocket.Conn) error {
-       path := s.container.Path()
-       return RsyncSend(shared.AddSlash(path), conn)
+func (s rsyncStorageSourceDriver) SendAfterCheckpoint(conn *websocket.Conn) 
error {
+       /* resync anything that changed between our first send and the 
checkpoint */
+       return RsyncSend(shared.AddSlash(s.container.Path()), conn)
 }
 
-func rsyncMigrationSource(container container) ([]MigrationStorageSource, 
error) {
-       sources := []MigrationStorageSource{}
+func (s rsyncStorageSourceDriver) Cleanup() {
+       /* no-op */
+}
 
-       /* transfer the container, and then all the snapshots */
-       sources = append(sources, &rsyncStorageSource{container})
-       snaps, err := container.Snapshots()
+func rsyncMigrationSource(container container) (MigrationStorageSourceDriver, 
error) {
+       snapshots, err := container.Snapshots()
        if err != nil {
                return nil, err
        }
 
-       for _, snap := range snaps {
-               sources = append(sources, &rsyncStorageSource{snap})
-       }
-
-       return sources, nil
+       return rsyncStorageSourceDriver{container, snapshots}, nil
 }
 
-func rsyncMigrationSink(container container, snapshots []container, conn 
*websocket.Conn) error {
+func rsyncMigrationSink(live bool, container container, snapshots []container, 
conn *websocket.Conn) error {
        /* the first object is the actual container */
        if err := RsyncRecv(shared.AddSlash(container.Path()), conn); err != 
nil {
                return err
@@ -620,5 +643,12 @@ func rsyncMigrationSink(container container, snapshots 
[]container, conn *websoc
                }
        }
 
+       if live {
+               /* now receive the final sync */
+               if err := RsyncRecv(shared.AddSlash(container.Path()), conn); 
err != nil {
+                       return err
+               }
+       }
+
        return nil
 }
diff --git a/lxd/storage_btrfs.go b/lxd/storage_btrfs.go
index e4ca28f..e648e90 100644
--- a/lxd/storage_btrfs.go
+++ b/lxd/storage_btrfs.go
@@ -807,56 +807,38 @@ func (s *storageBtrfs) getSubVolumes(path string) 
([]string, error) {
        return result, nil
 }
 
-type btrfsMigrationSource struct {
-       lxdName            string
-       deleteAfterSending bool
-       btrfsPath          string
-       btrfsParent        string
-
-       btrfs *storageBtrfs
-}
-
-func (s btrfsMigrationSource) Name() string {
-       return s.lxdName
+type btrfsMigrationSourceDriver struct {
+       container          container
+       snapshots          []container
+       btrfsSnapshotNames []string
+       btrfs              *storageBtrfs
+       runningSnapName    string
+       stoppedSnapName    string
 }
 
-func (s btrfsMigrationSource) IsSnapshot() bool {
-       return !s.deleteAfterSending
+func (s *btrfsMigrationSourceDriver) Snapshots() []container {
+       return s.snapshots
 }
 
-func (s btrfsMigrationSource) Send(conn *websocket.Conn) error {
-       args := []string{"send", s.btrfsPath}
-       if s.btrfsParent != "" {
-               args = append(args, "-p", s.btrfsParent)
+func (s *btrfsMigrationSourceDriver) send(conn *websocket.Conn, btrfsPath 
string, btrfsParent string) error {
+       args := []string{"send", btrfsPath}
+       if btrfsParent != "" {
+               args = append(args, "-p", btrfsParent)
        }
 
        cmd := exec.Command("btrfs", args...)
 
-       deleteAfterSending := func(path string) {
-               s.btrfs.subvolsDelete(path)
-               os.Remove(filepath.Dir(path))
-       }
-
        stdout, err := cmd.StdoutPipe()
        if err != nil {
-               if s.deleteAfterSending {
-                       deleteAfterSending(s.btrfsPath)
-               }
                return err
        }
 
        stderr, err := cmd.StderrPipe()
        if err != nil {
-               if s.deleteAfterSending {
-                       deleteAfterSending(s.btrfsPath)
-               }
                return err
        }
 
        if err := cmd.Start(); err != nil {
-               if s.deleteAfterSending {
-                       deleteAfterSending(s.btrfsPath)
-               }
                return err
        }
 
@@ -871,97 +853,125 @@ func (s btrfsMigrationSource) Send(conn *websocket.Conn) 
error {
        if err != nil {
                shared.Log.Error("problem with btrfs send", "output", 
string(output))
        }
-       if s.deleteAfterSending {
-               deleteAfterSending(s.btrfsPath)
-       }
        return err
 }
 
-func (s *storageBtrfs) MigrationType() MigrationFSType {
-       if runningInUserns {
-               return MigrationFSType_RSYNC
-       } else {
-               return MigrationFSType_BTRFS
-       }
-}
-
-func (s *storageBtrfs) MigrationSource(c container) ([]MigrationStorageSource, 
error) {
-       if runningInUserns {
-               return rsyncMigrationSource(c)
-       }
-
-       sources := []MigrationStorageSource{}
-
-       /* If the container is a snapshot, let's just send that; we don't need
-        * to send anything else, because that's all the user asked for.
-        */
-       if c.IsSnapshot() {
-               tmpPath := containerPath(fmt.Sprintf("%s/.migration-send-%s", 
c.Name(), uuid.NewRandom().String()), true)
+func (s *btrfsMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn) 
error {
+       if s.container.IsSnapshot() {
+               tmpPath := containerPath(fmt.Sprintf("%s/.migration-send-%s", 
s.container.Name(), uuid.NewRandom().String()), true)
                err := os.MkdirAll(tmpPath, 0700)
                if err != nil {
-                       return nil, err
+                       return err
                }
 
                btrfsPath := fmt.Sprintf("%s/.root", tmpPath)
-               if err := s.subvolSnapshot(c.Path(), btrfsPath, true); err != 
nil {
-                       return nil, err
+               if err := s.btrfs.subvolSnapshot(s.container.Path(), btrfsPath, 
true); err != nil {
+                       return err
                }
 
-               sources = append(sources, btrfsMigrationSource{c.Name(), true, 
btrfsPath, "", s})
-               return sources, nil
-       }
+               defer s.btrfs.subvolDelete(btrfsPath)
 
-       /* List all the snapshots in order of reverse creation. The idea here
-        * is that we send the oldest to newest snapshot, hopefully saving on
-        * xfer costs. Then, after all that, we send the container itself.
-        */
-       snapshots, err := c.Snapshots()
-       if err != nil {
-               return nil, err
+               return s.send(conn, btrfsPath, "")
        }
 
-       for i, snap := range snapshots {
-               var prev container
+       for i, snap := range s.snapshots {
+               prev := ""
                if i > 0 {
-                       prev = snapshots[i-1]
+                       prev = s.snapshots[i-1].Path()
                }
 
-               btrfsPath := snap.Path()
-               parentName := ""
-               if prev != nil {
-                       parentName = prev.Path()
+               if err := s.send(conn, snap.Path(), prev); err != nil {
+                       return err
                }
-
-               sources = append(sources, btrfsMigrationSource{snap.Name(), 
false, btrfsPath, parentName, s})
        }
 
        /* We can't send running fses, so let's snapshot the fs and send
         * the snapshot.
         */
-       tmpPath := containerPath(fmt.Sprintf("%s/.migration-send-%s", c.Name(), 
uuid.NewRandom().String()), true)
-       err = os.MkdirAll(tmpPath, 0700)
+       tmpPath := containerPath(fmt.Sprintf("%s/.migration-send-%s", 
s.container.Name(), uuid.NewRandom().String()), true)
+       err := os.MkdirAll(tmpPath, 0700)
        if err != nil {
-               return nil, err
+               return err
        }
 
-       btrfsPath := fmt.Sprintf("%s/.root", tmpPath)
-       if err := s.subvolSnapshot(c.Path(), btrfsPath, true); err != nil {
-               return nil, err
+       s.runningSnapName = fmt.Sprintf("%s/.root", tmpPath)
+       if err := s.btrfs.subvolSnapshot(s.container.Path(), s.runningSnapName, 
true); err != nil {
+               return err
        }
 
        btrfsParent := ""
-       if len(sources) > 0 {
-               btrfsParent = 
sources[len(sources)-1].(btrfsMigrationSource).btrfsPath
+       if len(s.btrfsSnapshotNames) > 0 {
+               btrfsParent = s.btrfsSnapshotNames[len(s.btrfsSnapshotNames)-1]
+       }
+
+       return s.send(conn, s.runningSnapName, btrfsParent)
+}
+
+func (s *btrfsMigrationSourceDriver) SendAfterCheckpoint(conn *websocket.Conn) 
error {
+       tmpPath := containerPath(fmt.Sprintf("%s/.migration-send-%s", 
s.container.Name(), uuid.NewRandom().String()), true)
+       err := os.MkdirAll(tmpPath, 0700)
+       if err != nil {
+               return err
+       }
+
+       s.stoppedSnapName = fmt.Sprintf("%s/.root", tmpPath)
+       if err := s.btrfs.subvolSnapshot(s.container.Path(), s.stoppedSnapName, 
true); err != nil {
+               return err
        }
 
-       sources = append(sources, btrfsMigrationSource{c.Name(), true, 
btrfsPath, btrfsParent, s})
+       return s.send(conn, s.stoppedSnapName, s.runningSnapName)
+}
+
+func (s *btrfsMigrationSourceDriver) Cleanup() {
+       if s.stoppedSnapName != "" {
+               s.btrfs.subvolDelete(s.stoppedSnapName)
+       }
+
+       if s.runningSnapName != "" {
+               s.btrfs.subvolDelete(s.runningSnapName)
+       }
+}
 
-       return sources, nil
+func (s *storageBtrfs) MigrationType() MigrationFSType {
+       if runningInUserns {
+               return MigrationFSType_RSYNC
+       } else {
+               return MigrationFSType_BTRFS
+       }
 }
 
-func (s *storageBtrfs) MigrationSink(container container, snapshots 
[]container, conn *websocket.Conn) error {
+func (s *storageBtrfs) MigrationSource(c container) 
(MigrationStorageSourceDriver, error) {
        if runningInUserns {
-               return rsyncMigrationSink(container, snapshots, conn)
+               return rsyncMigrationSource(c)
+       }
+
+       /* List all the snapshots in order of reverse creation. The idea here
+        * is that we send the oldest to newest snapshot, hopefully saving on
+        * xfer costs. Then, after all that, we send the container itself.
+        */
+       snapshots, err := c.Snapshots()
+       if err != nil {
+               return nil, err
+       }
+
+       driver := &btrfsMigrationSourceDriver{
+               container:          c,
+               snapshots:          snapshots,
+               btrfsSnapshotNames: []string{},
+               btrfs:              s,
+       }
+
+       for _, snap := range snapshots {
+               btrfsPath := snap.Path()
+               driver.btrfsSnapshotNames = append(driver.btrfsSnapshotNames, 
btrfsPath)
+       }
+
+       return driver, nil
+}
+
+func (s *storageBtrfs) MigrationSink(live bool, container container, snapshots 
[]container, conn *websocket.Conn) error {
+       if runningInUserns {
+               return rsyncMigrationSink(live, container, snapshots, conn)
        }
 
        cName := container.Name()
@@ -1041,6 +1051,12 @@ func (s *storageBtrfs) MigrationSink(container 
container, snapshots []container,
                return err
        }
 
+       if live {
+               if err := btrfsRecv(containerPath(cName, true), 
container.Path(), false); err != nil {
+                       return err
+               }
+       }
+
        // Cleanup
        if ok, _ := shared.PathIsEmpty(snapshotsPath); ok {
                err := os.Remove(snapshotsPath)
diff --git a/lxd/storage_dir.go b/lxd/storage_dir.go
index 98d5320..5126c34 100644
--- a/lxd/storage_dir.go
+++ b/lxd/storage_dir.go
@@ -266,10 +266,10 @@ func (s *storageDir) MigrationType() MigrationFSType {
        return MigrationFSType_RSYNC
 }
 
-func (s *storageDir) MigrationSource(container container) 
([]MigrationStorageSource, error) {
+func (s *storageDir) MigrationSource(container container) 
(MigrationStorageSourceDriver, error) {
        return rsyncMigrationSource(container)
 }
 
-func (s *storageDir) MigrationSink(container container, snapshots []container, 
conn *websocket.Conn) error {
-       return rsyncMigrationSink(container, snapshots, conn)
+func (s *storageDir) MigrationSink(live bool, container container, snapshots 
[]container, conn *websocket.Conn) error {
+       return rsyncMigrationSink(live, container, snapshots, conn)
 }
diff --git a/lxd/storage_lvm.go b/lxd/storage_lvm.go
index f0fd62e..cb4f066 100644
--- a/lxd/storage_lvm.go
+++ b/lxd/storage_lvm.go
@@ -1056,10 +1056,10 @@ func (s *storageLvm) MigrationType() MigrationFSType {
        return MigrationFSType_RSYNC
 }
 
-func (s *storageLvm) MigrationSource(container container) 
([]MigrationStorageSource, error) {
+func (s *storageLvm) MigrationSource(container container) 
(MigrationStorageSourceDriver, error) {
        return rsyncMigrationSource(container)
 }
 
-func (s *storageLvm) MigrationSink(container container, snapshots []container, 
conn *websocket.Conn) error {
-       return rsyncMigrationSink(container, snapshots, conn)
+func (s *storageLvm) MigrationSink(live bool, container container, snapshots 
[]container, conn *websocket.Conn) error {
+       return rsyncMigrationSink(live, container, snapshots, conn)
 }
diff --git a/lxd/storage_zfs.go b/lxd/storage_zfs.go
index ec6cdb1..d06f5cc 100644
--- a/lxd/storage_zfs.go
+++ b/lxd/storage_zfs.go
@@ -1157,66 +1157,38 @@ func storageZFSSetPoolNameConfig(d *Daemon, poolname 
string) error {
        return nil
 }
 
-type zfsMigrationSource struct {
-       lxdName            string
-       deleteAfterSending bool
-       zfsName            string
-       zfsParent          string
-
-       zfs *storageZfs
-}
-
-func (s zfsMigrationSource) Name() string {
-       return s.lxdName
+type zfsMigrationSourceDriver struct {
+       container        container
+       snapshots        []container
+       zfsSnapshotNames []string
+       zfs              *storageZfs
+       runningSnapName  string
+       stoppedSnapName  string
 }
 
-func (s zfsMigrationSource) IsSnapshot() bool {
-       return !s.deleteAfterSending
+func (s *zfsMigrationSourceDriver) Snapshots() []container {
+       return s.snapshots
 }
 
-func (s zfsMigrationSource) Send(conn *websocket.Conn) error {
-       args := []string{"send", fmt.Sprintf("%s/%s", s.zfs.zfsPool, s.zfsName)}
-       if s.zfsParent != "" {
-               args = append(args, "-i", fmt.Sprintf("%s/%s", s.zfs.zfsPool, 
s.zfsParent))
+func (s *zfsMigrationSourceDriver) send(conn *websocket.Conn, zfsName string, 
zfsParent string) error {
+       args := []string{"send", fmt.Sprintf("%s/containers/%s@%s", 
s.zfs.zfsPool, s.container.Name(), zfsName)}
+       if zfsParent != "" {
+               args = append(args, "-i", fmt.Sprintf("%s/containers/%s@%s", 
s.zfs.zfsPool, s.container.Name(), zfsParent))
        }
 
        cmd := exec.Command("zfs", args...)
 
        stdout, err := cmd.StdoutPipe()
        if err != nil {
-               /* If this is not a lxd snapshot, that means it is the root 
container.
-                * The way we zfs send a root container is by taking a 
temporary zfs
-                * snapshot and sending that, then deleting that snapshot. 
Here's where
-                * we delete it.
-                *
-                * Note that we can't use a defer here, because zfsDestroy
-                * takes some time, and defer doesn't block the current
-                * goroutine. Due to our retry mechanism for network failures
-                * (and because zfsDestroy takes a while), we might retry
-                * moving (and thus creating a temporary snapshot) before the
-                * last one is deleted, resulting in either a snapshot name
-                * collision if it was fast enough, or an extra snapshot with
-                * an odd name on the destination side. Instead, we don't use
-                * defer so we always block until the snapshot is dead.
-                */
-               if s.deleteAfterSending {
-                       s.zfs.zfsDestroy(s.zfsName)
-               }
                return err
        }
 
        stderr, err := cmd.StderrPipe()
        if err != nil {
-               if s.deleteAfterSending {
-                       s.zfs.zfsDestroy(s.zfsName)
-               }
                return err
        }
 
        if err := cmd.Start(); err != nil {
-               if s.deleteAfterSending {
-                       s.zfs.zfsDestroy(s.zfsName)
-               }
                return err
        }
 
@@ -1231,39 +1203,97 @@ func (s zfsMigrationSource) Send(conn *websocket.Conn) 
error {
        if err != nil {
                shared.Log.Error("problem with zfs send", "output", 
string(output))
        }
-       if s.deleteAfterSending {
-               s.zfs.zfsDestroy(s.zfsName)
-       }
+
        return err
 }
 
+func (s *zfsMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn) 
error {
+       if s.container.IsSnapshot() {
+               fields := strings.SplitN(s.container.Name(), 
shared.SnapshotDelimiter, 2)
+               snapshotName := fmt.Sprintf("containers/%s@snapshot-%s", 
fields[0], fields[1])
+               return s.send(conn, snapshotName, "")
+       }
+
+       lastSnap := ""
+
+       for i, snap := range s.zfsSnapshotNames {
+
+               prev := ""
+               if i > 0 {
+                       prev = s.zfsSnapshotNames[i-1]
+               }
+
+               lastSnap = snap
+
+               if err := s.send(conn, snap, prev); err != nil {
+                       return err
+               }
+       }
+
+       s.runningSnapName = fmt.Sprintf("migration-send-%s", 
uuid.NewRandom().String())
+       if err := s.zfs.zfsSnapshotCreate(fmt.Sprintf("containers/%s", 
s.container.Name()), s.runningSnapName); err != nil {
+               return err
+       }
+
+       if err := s.send(conn, s.runningSnapName, lastSnap); err != nil {
+               return err
+       }
+
+       return nil
+}
+
+func (s *zfsMigrationSourceDriver) SendAfterCheckpoint(conn *websocket.Conn) 
error {
+       s.stoppedSnapName = fmt.Sprintf("migration-send-%s", 
uuid.NewRandom().String())
+       if err := s.zfs.zfsSnapshotCreate(fmt.Sprintf("containers/%s", 
s.container.Name()), s.stoppedSnapName); err != nil {
+               return err
+       }
+
+       if err := s.send(conn, s.stoppedSnapName, s.runningSnapName); err != 
nil {
+               return err
+       }
+
+       return nil
+}
+
+func (s *zfsMigrationSourceDriver) Cleanup() {
+       if s.stoppedSnapName != "" {
+               s.zfs.zfsSnapshotDestroy(fmt.Sprintf("containers/%s", 
s.container.Name()), s.stoppedSnapName)
+       }
+
+       if s.runningSnapName != "" {
+               s.zfs.zfsSnapshotDestroy(fmt.Sprintf("containers/%s", 
s.container.Name()), s.runningSnapName)
+       }
+}
+
 func (s *storageZfs) MigrationType() MigrationFSType {
        return MigrationFSType_ZFS
 }
 
-func (s *storageZfs) MigrationSource(container container) 
([]MigrationStorageSource, error) {
-       sources := []MigrationStorageSource{}
-
+func (s *storageZfs) MigrationSource(ct container) 
(MigrationStorageSourceDriver, error) {
        /* If the container is a snapshot, let's just send that; we don't need
         * to send anything else, because that's all the user asked for.
         */
-       if container.IsSnapshot() {
-               fields := strings.SplitN(container.Name(), 
shared.SnapshotDelimiter, 2)
-               snapshotName := fmt.Sprintf("containers/%s@snapshot-%s", 
fields[0], fields[1])
-               sources = append(sources, zfsMigrationSource{container.Name(), 
false, snapshotName, "", s})
-               return sources, nil
+       if ct.IsSnapshot() {
+               return &zfsMigrationSourceDriver{container: ct}, nil
+       }
+
+       driver := zfsMigrationSourceDriver{
+               container:        ct,
+               snapshots:        []container{},
+               zfsSnapshotNames: []string{},
+               zfs:              s,
        }
 
        /* List all the snapshots in order of reverse creation. The idea here
         * is that we send the oldest to newest snapshot, hopefully saving on
         * xfer costs. Then, after all that, we send the container itself.
         */
-       snapshots, err := s.zfsListSnapshots(fmt.Sprintf("containers/%s", 
container.Name()))
+       snapshots, err := s.zfsListSnapshots(fmt.Sprintf("containers/%s", 
ct.Name()))
        if err != nil {
                return nil, err
        }
 
-       for i, snap := range snapshots {
+       for _, snap := range snapshots {
                /* In the case of e.g. multiple copies running at the same
                 * time, we will have potentially multiple migration-send
                 * snapshots. (Or in the case of the test suite, sometimes one
@@ -1273,41 +1303,22 @@ func (s *storageZfs) MigrationSource(container 
container) ([]MigrationStorageSou
                        continue
                }
 
-               prev := ""
-               if i > 0 {
-                       prev = snapshots[i-1]
-               }
+               lxdName := fmt.Sprintf("%s%s%s", ct.Name(), 
shared.SnapshotDelimiter, snap[len("snapshot-"):])
+               zfsName := fmt.Sprintf("containers/%s@%s", ct.Name(), snap)
 
-               lxdName := fmt.Sprintf("%s%s%s", container.Name(), 
shared.SnapshotDelimiter, snap[len("snapshot-"):])
-               zfsName := fmt.Sprintf("containers/%s@%s", container.Name(), 
snap)
-               parentName := ""
-               if prev != "" {
-                       parentName = fmt.Sprintf("containers/%s@%s", 
container.Name(), prev)
+               snapshot, err := containerLoadByName(s.d, lxdName)
+               if err != nil {
+                       return nil, err
                }
 
-               sources = append(sources, zfsMigrationSource{lxdName, false, 
zfsName, parentName, s})
+               driver.snapshots = append(driver.snapshots, snapshot)
+               driver.zfsSnapshotNames = append(driver.zfsSnapshotNames, 
zfsName)
        }
 
-       /* We can't send running fses, so let's snapshot the fs and send
-        * the snapshot.
-        */
-       snapshotName := fmt.Sprintf("migration-send-%s", 
uuid.NewRandom().String())
-       if err := s.zfsSnapshotCreate(fmt.Sprintf("containers/%s", 
container.Name()), snapshotName); err != nil {
-               return nil, err
-       }
-
-       zfsName := fmt.Sprintf("containers/%s@%s", container.Name(), 
snapshotName)
-       zfsParent := ""
-       if len(sources) > 0 {
-               zfsParent = sources[len(sources)-1].(zfsMigrationSource).zfsName
-       }
-
-       sources = append(sources, zfsMigrationSource{container.Name(), true, 
zfsName, zfsParent, s})
-
-       return sources, nil
+       return &driver, nil
 }
 
-func (s *storageZfs) MigrationSink(container container, snapshots []container, 
conn *websocket.Conn) error {
+func (s *storageZfs) MigrationSink(live bool, container container, snapshots 
[]container, conn *websocket.Conn) error {
        zfsRecv := func(zfsName string) error {
                zfsFsName := fmt.Sprintf("%s/%s", s.zfsPool, zfsName)
                args := []string{"receive", "-F", "-u", zfsFsName}
@@ -1384,11 +1395,35 @@ func (s *storageZfs) MigrationSink(container container, 
snapshots []container, c
                }
        }
 
+       defer func() {
+               /* clean up our migration-send snapshots that we got from recv. 
*/
+               snapshots, err := 
s.zfsListSnapshots(fmt.Sprintf("containers/%s", container.Name()))
+               if err != nil {
+                       shared.Log.Error("failed listing snapshots post 
migration", "err", err)
+                       return
+               }
+
+               for _, snap := range snapshots {
+                       if !strings.HasPrefix(snap, "migration-send") {
+                               continue
+                       }
+
+                       s.zfsSnapshotDestroy(fmt.Sprintf("containers/%s", 
container.Name()), snap)
+               }
+       }()
+
        /* finally, do the real container */
        if err := zfsRecv(zfsName); err != nil {
                return err
        }
 
+       if live {
+               /* and again for the post-running snapshot if this was a live 
migration */
+               if err := zfsRecv(zfsName); err != nil {
+                       return err
+               }
+       }
+
        /* Sometimes, zfs recv mounts this anyway, even if we pass -u
         * 
(https://forums.freebsd.org/threads/zfs-receive-u-shouldnt-mount-received-filesystem-right.36844/)
         * but sometimes it doesn't. Let's try to mount, but not complain about
_______________________________________________
lxc-devel mailing list
lxc-devel@lists.linuxcontainers.org
http://lists.linuxcontainers.org/listinfo/lxc-devel

Reply via email to