The following pull request was submitted through Github. It can be accessed and reviewed at: https://github.com/lxc/lxd/pull/1718
This e-mail was sent by the LXC bot, direct replies will not reach the author unless they happen to be subscribed to this list. === Description (from pull-request) === This breaks up live migrations into two parts: first we send a snapshot of the filesystem while the container is running (and all of its snapshots), then we do the checkpoint and do a final incremental send. The goal here is to minimize the amount of downtime for the container. Signed-off-by: Tycho Andersen <tycho.ander...@canonical.com>
From 390975ac7157ed9c229f63c7a53ba625ee86cc18 Mon Sep 17 00:00:00 2001 From: Tycho Andersen <tycho.ander...@canonical.com> Date: Thu, 3 Mar 2016 07:48:19 -0700 Subject: [PATCH] migration: attempt to be slightly smart about moving filesystems This breaks up live migrations into two parts: first we send a snapshot of the filesystem while the container is running (and all of its snapshots), then we do the checkpoint and do a final incremental send. The goal here is to minimize the amount of downtime for the container. Signed-off-by: Tycho Andersen <tycho.ander...@canonical.com> --- lxd/migrate.go | 163 ++++++++++++++++++++--------------------- lxd/storage.go | 86 ++++++++++++++-------- lxd/storage_btrfs.go | 192 ++++++++++++++++++++++++++----------------------- lxd/storage_dir.go | 6 +- lxd/storage_lvm.go | 6 +- lxd/storage_zfs.go | 199 ++++++++++++++++++++++++++++++--------------------- 6 files changed, 368 insertions(+), 284 deletions(-) diff --git a/lxd/migrate.go b/lxd/migrate.go index e6ec1a1..0150e66 100644 --- a/lxd/migrate.go +++ b/lxd/migrate.go @@ -299,25 +299,15 @@ func (s *migrationSourceWs) Do(op *operation) error { } } - sources, fsErr := s.container.Storage().MigrationSource(s.container) + driver, fsErr := s.container.Storage().MigrationSource(s.container) /* the protocol says we have to send a header no matter what, so let's * do that, but then immediately send an error. */ snapshots := []string{} if fsErr == nil { - /* A bit of a special case here: doing lxc launch - * host2:c1/snap1 host1:container we're sending a snapshot, but - * it ends up as the container on the other end. So, we want to - * send it as the main container (i.e. ignore its IsSnapshot()). - */ - if len(sources) > 1 { - for _, snap := range sources { - if !snap.IsSnapshot() { - continue - } - name := shared.ExtractSnapshotName(snap.Name()) - snapshots = append(snapshots, name) - } + fullSnaps := driver.Snapshots() + for _, snap := range fullSnaps { + snapshots = append(snapshots, shared.ExtractSnapshotName(snap.Name())) } } @@ -348,7 +338,14 @@ func (s *migrationSourceWs) Do(op *operation) error { myType = MigrationFSType_RSYNC header.Fs = &myType - sources, _ = rsyncMigrationSource(s.container) + driver, _ = rsyncMigrationSource(s.container) + } + + defer driver.Cleanup() + + if err := driver.SendWhileRunning(s.fsConn); err != nil { + s.sendControl(err) + return err } if s.live { @@ -402,11 +399,8 @@ func (s *migrationSourceWs) Do(op *operation) error { s.sendControl(err) return err } - } - for _, source := range sources { - shared.Debugf("sending fs object %s", source.Name()) - if err := source.Send(s.fsConn); err != nil { + if err := driver.SendAfterCheckpoint(s.fsConn); err != nil { s.sendControl(err) return err } @@ -536,12 +530,68 @@ func (c *migrationSink) do() error { imagesDir := "" srcIdmap := new(shared.IdmapSet) + snapshots := []container{} + for _, snap := range header.Snapshots { + // TODO: we need to propagate snapshot configurations + // as well. Right now the container configuration is + // done through the initial migration post. Should we + // post the snapshots and their configs as well, or do + // it some other way? + name := c.container.Name() + shared.SnapshotDelimiter + snap + args := containerArgs{ + Ctype: cTypeSnapshot, + Config: c.container.LocalConfig(), + Profiles: c.container.Profiles(), + Ephemeral: c.container.IsEphemeral(), + Architecture: c.container.Architecture(), + Devices: c.container.LocalDevices(), + Name: name, + } + + ct, err := containerCreateEmptySnapshot(c.container.Daemon(), args) + if err != nil { + restore <- err + return + } + snapshots = append(snapshots, ct) + } + + for _, idmap := range header.Idmap { + e := shared.IdmapEntry{ + Isuid: *idmap.Isuid, + Isgid: *idmap.Isgid, + Nsid: int(*idmap.Nsid), + Hostid: int(*idmap.Hostid), + Maprange: int(*idmap.Maprange)} + srcIdmap.Idmap = shared.Extend(srcIdmap.Idmap, e) + } + + /* We do the fs receive in parallel so we don't have to reason + * about when to receive what. The sending side is smart enough + * to send the filesystem bits that it can before it seizes the + * container to start checkpointing, so the total transfer time + * will be minimized even if we're dumb here. + */ + fsTransfer := make(chan error) + go func() { + if err := mySink(c.live, c.container, snapshots, c.fsConn); err != nil { + fsTransfer <- err + return + } + + if err := ShiftIfNecessary(c.container, srcIdmap); err != nil { + fsTransfer <- err + return + } + + fsTransfer <- nil + }() + if c.live { var err error imagesDir, err = ioutil.TempDir("", "lxd_restore_") if err != nil { os.RemoveAll(imagesDir) - c.sendControl(err) return } @@ -560,8 +610,6 @@ func (c *migrationSink) do() error { if err := RsyncRecv(shared.AddSlash(imagesDir), c.criuConn); err != nil { restore <- err - os.RemoveAll(imagesDir) - c.sendControl(err) return } @@ -574,70 +622,17 @@ func (c *migrationSink) do() error { if !c.container.IsPrivileged() { if err := c.container.IdmapSet().ShiftRootfs(imagesDir); err != nil { restore <- err - os.RemoveAll(imagesDir) - c.sendControl(err) return } } } - snapshots := []container{} - for _, snap := range header.Snapshots { - // TODO: we need to propagate snapshot configurations - // as well. Right now the container configuration is - // done through the initial migration post. Should we - // post the snapshots and their configs as well, or do - // it some other way? - name := c.container.Name() + shared.SnapshotDelimiter + snap - args := containerArgs{ - Ctype: cTypeSnapshot, - Config: c.container.LocalConfig(), - Profiles: c.container.Profiles(), - Ephemeral: c.container.IsEphemeral(), - Architecture: c.container.Architecture(), - Devices: c.container.LocalDevices(), - Name: name, - } - - ct, err := containerCreateEmptySnapshot(c.container.Daemon(), args) - if err != nil { - restore <- err - c.sendControl(err) - return - } - snapshots = append(snapshots, ct) - } - - for _, idmap := range header.Idmap { - e := shared.IdmapEntry{ - Isuid: *idmap.Isuid, - Isgid: *idmap.Isgid, - Nsid: int(*idmap.Nsid), - Hostid: int(*idmap.Hostid), - Maprange: int(*idmap.Maprange)} - srcIdmap.Idmap = shared.Extend(srcIdmap.Idmap, e) - } - - if err := mySink(c.container, snapshots, c.fsConn); err != nil { - restore <- err - c.sendControl(err) - return - } - - if err := ShiftIfNecessary(c.container, srcIdmap); err != nil { + err := <-fsTransfer + if err != nil { restore <- err - c.sendControl(err) return } - for _, snap := range snapshots { - if err := ShiftIfNecessary(snap, srcIdmap); err != nil { - restore <- err - c.sendControl(err) - return - } - } - if c.live { err := c.container.StartFromMigration(imagesDir) if err != nil { @@ -648,12 +643,20 @@ func (c *migrationSink) do() error { log = err.Error() } err = fmt.Errorf("restore failed:\n%s", log) + restore <- err + return } - restore <- err - } else { - restore <- nil } + + for _, snap := range snapshots { + if err := ShiftIfNecessary(snap, srcIdmap); err != nil { + restore <- err + return + } + } + + restore <- nil }(c) source := c.controlChannel() diff --git a/lxd/storage.go b/lxd/storage.go index 2af685a..5b3863a 100644 --- a/lxd/storage.go +++ b/lxd/storage.go @@ -109,10 +109,26 @@ func storageTypeToString(sType storageType) string { return "dir" } -type MigrationStorageSource interface { - Name() string - IsSnapshot() bool - Send(conn *websocket.Conn) error +type MigrationStorageSourceDriver interface { + /* snapshots for this container, if any */ + Snapshots() []container + + /* send any bits of the container/snapshots that are possible while the + * container is still running. + */ + SendWhileRunning(conn *websocket.Conn) error + + /* send the final bits (e.g. a final delta snapshot for zfs, btrfs, or + * do a final rsync) of the fs after the container has been + * checkpointed. This will only be called when a container is actually + * being live migrated. + */ + SendAfterCheckpoint(conn *websocket.Conn) error + + /* Called after either success or failure of a migration, can be used + * to clean up any temporary snapshots, etc. + */ + Cleanup() } type storage interface { @@ -170,8 +186,8 @@ type storage interface { // We leave sending containers which are snapshots of other containers // already present on the target instance as an exercise for the // enterprising developer. - MigrationSource(container container) ([]MigrationStorageSource, error) - MigrationSink(container container, objects []container, conn *websocket.Conn) error + MigrationSource(container container) (MigrationStorageSourceDriver, error) + MigrationSink(live bool, container container, objects []container, conn *websocket.Conn) error } func newStorage(d *Daemon, sType storageType) (storage, error) { @@ -521,23 +537,24 @@ func (lw *storageLogWrapper) MigrationType() MigrationFSType { return lw.w.MigrationType() } -func (lw *storageLogWrapper) MigrationSource(container container) ([]MigrationStorageSource, error) { +func (lw *storageLogWrapper) MigrationSource(container container) (MigrationStorageSourceDriver, error) { lw.log.Debug("MigrationSource", log.Ctx{"container": container.Name()}) return lw.w.MigrationSource(container) } -func (lw *storageLogWrapper) MigrationSink(container container, objects []container, conn *websocket.Conn) error { +func (lw *storageLogWrapper) MigrationSink(live bool, container container, objects []container, conn *websocket.Conn) error { objNames := []string{} for _, obj := range objects { objNames = append(objNames, obj.Name()) } lw.log.Debug("MigrationSink", log.Ctx{ + "live": live, "container": container.Name(), "objects": objNames, }) - return lw.w.MigrationSink(container, objects, conn) + return lw.w.MigrationSink(live, container, objects, conn) } func ShiftIfNecessary(container container, srcIdmap *shared.IdmapSet) error { @@ -567,41 +584,47 @@ func ShiftIfNecessary(container container, srcIdmap *shared.IdmapSet) error { return nil } -type rsyncStorageSource struct { +type rsyncStorageSourceDriver struct { container container + snapshots []container } -func (s *rsyncStorageSource) Name() string { - return s.container.Name() +func (s rsyncStorageSourceDriver) Snapshots() []container { + return s.snapshots } -func (s *rsyncStorageSource) IsSnapshot() bool { - return s.container.IsSnapshot() +func (s rsyncStorageSourceDriver) SendWhileRunning(conn *websocket.Conn) error { + toSend := append([]container{s.container}, s.snapshots...) + + for _, send := range toSend { + path := send.Path() + if err := RsyncSend(shared.AddSlash(path), conn); err != nil { + return err + } + } + + return nil } -func (s *rsyncStorageSource) Send(conn *websocket.Conn) error { - path := s.container.Path() - return RsyncSend(shared.AddSlash(path), conn) +func (s rsyncStorageSourceDriver) SendAfterCheckpoint(conn *websocket.Conn) error { + /* resync anything that changed between our first send and the checkpoint */ + return RsyncSend(shared.AddSlash(s.container.Path()), conn) } -func rsyncMigrationSource(container container) ([]MigrationStorageSource, error) { - sources := []MigrationStorageSource{} +func (s rsyncStorageSourceDriver) Cleanup() { + /* no-op */ +} - /* transfer the container, and then all the snapshots */ - sources = append(sources, &rsyncStorageSource{container}) - snaps, err := container.Snapshots() +func rsyncMigrationSource(container container) (MigrationStorageSourceDriver, error) { + snapshots, err := container.Snapshots() if err != nil { return nil, err } - for _, snap := range snaps { - sources = append(sources, &rsyncStorageSource{snap}) - } - - return sources, nil + return rsyncStorageSourceDriver{container, snapshots}, nil } -func rsyncMigrationSink(container container, snapshots []container, conn *websocket.Conn) error { +func rsyncMigrationSink(live bool, container container, snapshots []container, conn *websocket.Conn) error { /* the first object is the actual container */ if err := RsyncRecv(shared.AddSlash(container.Path()), conn); err != nil { return err @@ -620,5 +643,12 @@ func rsyncMigrationSink(container container, snapshots []container, conn *websoc } } + if live { + /* now receive the final sync */ + if err := RsyncRecv(shared.AddSlash(container.Path()), conn); err != nil { + return err + } + } + return nil } diff --git a/lxd/storage_btrfs.go b/lxd/storage_btrfs.go index e4ca28f..e648e90 100644 --- a/lxd/storage_btrfs.go +++ b/lxd/storage_btrfs.go @@ -807,56 +807,38 @@ func (s *storageBtrfs) getSubVolumes(path string) ([]string, error) { return result, nil } -type btrfsMigrationSource struct { - lxdName string - deleteAfterSending bool - btrfsPath string - btrfsParent string - - btrfs *storageBtrfs -} - -func (s btrfsMigrationSource) Name() string { - return s.lxdName +type btrfsMigrationSourceDriver struct { + container container + snapshots []container + btrfsSnapshotNames []string + btrfs *storageBtrfs + runningSnapName string + stoppedSnapName string } -func (s btrfsMigrationSource) IsSnapshot() bool { - return !s.deleteAfterSending +func (s *btrfsMigrationSourceDriver) Snapshots() []container { + return s.snapshots } -func (s btrfsMigrationSource) Send(conn *websocket.Conn) error { - args := []string{"send", s.btrfsPath} - if s.btrfsParent != "" { - args = append(args, "-p", s.btrfsParent) +func (s *btrfsMigrationSourceDriver) send(conn *websocket.Conn, btrfsPath string, btrfsParent string) error { + args := []string{"send", btrfsPath} + if btrfsParent != "" { + args = append(args, "-p", btrfsParent) } cmd := exec.Command("btrfs", args...) - deleteAfterSending := func(path string) { - s.btrfs.subvolsDelete(path) - os.Remove(filepath.Dir(path)) - } - stdout, err := cmd.StdoutPipe() if err != nil { - if s.deleteAfterSending { - deleteAfterSending(s.btrfsPath) - } return err } stderr, err := cmd.StderrPipe() if err != nil { - if s.deleteAfterSending { - deleteAfterSending(s.btrfsPath) - } return err } if err := cmd.Start(); err != nil { - if s.deleteAfterSending { - deleteAfterSending(s.btrfsPath) - } return err } @@ -871,97 +853,125 @@ func (s btrfsMigrationSource) Send(conn *websocket.Conn) error { if err != nil { shared.Log.Error("problem with btrfs send", "output", string(output)) } - if s.deleteAfterSending { - deleteAfterSending(s.btrfsPath) - } return err } -func (s *storageBtrfs) MigrationType() MigrationFSType { - if runningInUserns { - return MigrationFSType_RSYNC - } else { - return MigrationFSType_BTRFS - } -} - -func (s *storageBtrfs) MigrationSource(c container) ([]MigrationStorageSource, error) { - if runningInUserns { - return rsyncMigrationSource(c) - } - - sources := []MigrationStorageSource{} - - /* If the container is a snapshot, let's just send that; we don't need - * to send anything else, because that's all the user asked for. - */ - if c.IsSnapshot() { - tmpPath := containerPath(fmt.Sprintf("%s/.migration-send-%s", c.Name(), uuid.NewRandom().String()), true) +func (s *btrfsMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn) error { + if s.container.IsSnapshot() { + tmpPath := containerPath(fmt.Sprintf("%s/.migration-send-%s", s.container.Name(), uuid.NewRandom().String()), true) err := os.MkdirAll(tmpPath, 0700) if err != nil { - return nil, err + return err } btrfsPath := fmt.Sprintf("%s/.root", tmpPath) - if err := s.subvolSnapshot(c.Path(), btrfsPath, true); err != nil { - return nil, err + if err := s.btrfs.subvolSnapshot(s.container.Path(), btrfsPath, true); err != nil { + return err } - sources = append(sources, btrfsMigrationSource{c.Name(), true, btrfsPath, "", s}) - return sources, nil - } + defer s.btrfs.subvolDelete(btrfsPath) - /* List all the snapshots in order of reverse creation. The idea here - * is that we send the oldest to newest snapshot, hopefully saving on - * xfer costs. Then, after all that, we send the container itself. - */ - snapshots, err := c.Snapshots() - if err != nil { - return nil, err + return s.send(conn, btrfsPath, "") } - for i, snap := range snapshots { - var prev container + for i, snap := range s.snapshots { + prev := "" if i > 0 { - prev = snapshots[i-1] + prev = s.snapshots[i-1].Path() } - btrfsPath := snap.Path() - parentName := "" - if prev != nil { - parentName = prev.Path() + if err := s.send(conn, snap.Path(), prev); err != nil { + return err } - - sources = append(sources, btrfsMigrationSource{snap.Name(), false, btrfsPath, parentName, s}) } /* We can't send running fses, so let's snapshot the fs and send * the snapshot. */ - tmpPath := containerPath(fmt.Sprintf("%s/.migration-send-%s", c.Name(), uuid.NewRandom().String()), true) - err = os.MkdirAll(tmpPath, 0700) + tmpPath := containerPath(fmt.Sprintf("%s/.migration-send-%s", s.container.Name(), uuid.NewRandom().String()), true) + err := os.MkdirAll(tmpPath, 0700) if err != nil { - return nil, err + return err } - btrfsPath := fmt.Sprintf("%s/.root", tmpPath) - if err := s.subvolSnapshot(c.Path(), btrfsPath, true); err != nil { - return nil, err + s.runningSnapName = fmt.Sprintf("%s/.root", tmpPath) + if err := s.btrfs.subvolSnapshot(s.container.Path(), s.runningSnapName, true); err != nil { + return err } btrfsParent := "" - if len(sources) > 0 { - btrfsParent = sources[len(sources)-1].(btrfsMigrationSource).btrfsPath + if len(s.btrfsSnapshotNames) > 0 { + btrfsParent = s.btrfsSnapshotNames[len(s.btrfsSnapshotNames)-1] + } + + return s.send(conn, s.runningSnapName, btrfsParent) +} + +func (s *btrfsMigrationSourceDriver) SendAfterCheckpoint(conn *websocket.Conn) error { + tmpPath := containerPath(fmt.Sprintf("%s/.migration-send-%s", s.container.Name(), uuid.NewRandom().String()), true) + err := os.MkdirAll(tmpPath, 0700) + if err != nil { + return err + } + + s.stoppedSnapName = fmt.Sprintf("%s/.root", tmpPath) + if err := s.btrfs.subvolSnapshot(s.container.Path(), s.stoppedSnapName, true); err != nil { + return err } - sources = append(sources, btrfsMigrationSource{c.Name(), true, btrfsPath, btrfsParent, s}) + return s.send(conn, s.stoppedSnapName, s.runningSnapName) +} + +func (s *btrfsMigrationSourceDriver) Cleanup() { + if s.stoppedSnapName != "" { + s.btrfs.subvolDelete(s.stoppedSnapName) + } + + if s.runningSnapName != "" { + s.btrfs.subvolDelete(s.runningSnapName) + } +} - return sources, nil +func (s *storageBtrfs) MigrationType() MigrationFSType { + if runningInUserns { + return MigrationFSType_RSYNC + } else { + return MigrationFSType_BTRFS + } } -func (s *storageBtrfs) MigrationSink(container container, snapshots []container, conn *websocket.Conn) error { +func (s *storageBtrfs) MigrationSource(c container) (MigrationStorageSourceDriver, error) { if runningInUserns { - return rsyncMigrationSink(container, snapshots, conn) + return rsyncMigrationSource(c) + } + + /* List all the snapshots in order of reverse creation. The idea here + * is that we send the oldest to newest snapshot, hopefully saving on + * xfer costs. Then, after all that, we send the container itself. + */ + snapshots, err := c.Snapshots() + if err != nil { + return nil, err + } + + driver := &btrfsMigrationSourceDriver{ + container: c, + snapshots: snapshots, + btrfsSnapshotNames: []string{}, + btrfs: s, + } + + for _, snap := range snapshots { + btrfsPath := snap.Path() + driver.btrfsSnapshotNames = append(driver.btrfsSnapshotNames, btrfsPath) + } + + return driver, nil +} + +func (s *storageBtrfs) MigrationSink(live bool, container container, snapshots []container, conn *websocket.Conn) error { + if runningInUserns { + return rsyncMigrationSink(live, container, snapshots, conn) } cName := container.Name() @@ -1041,6 +1051,12 @@ func (s *storageBtrfs) MigrationSink(container container, snapshots []container, return err } + if live { + if err := btrfsRecv(containerPath(cName, true), container.Path(), false); err != nil { + return err + } + } + // Cleanup if ok, _ := shared.PathIsEmpty(snapshotsPath); ok { err := os.Remove(snapshotsPath) diff --git a/lxd/storage_dir.go b/lxd/storage_dir.go index 98d5320..5126c34 100644 --- a/lxd/storage_dir.go +++ b/lxd/storage_dir.go @@ -266,10 +266,10 @@ func (s *storageDir) MigrationType() MigrationFSType { return MigrationFSType_RSYNC } -func (s *storageDir) MigrationSource(container container) ([]MigrationStorageSource, error) { +func (s *storageDir) MigrationSource(container container) (MigrationStorageSourceDriver, error) { return rsyncMigrationSource(container) } -func (s *storageDir) MigrationSink(container container, snapshots []container, conn *websocket.Conn) error { - return rsyncMigrationSink(container, snapshots, conn) +func (s *storageDir) MigrationSink(live bool, container container, snapshots []container, conn *websocket.Conn) error { + return rsyncMigrationSink(live, container, snapshots, conn) } diff --git a/lxd/storage_lvm.go b/lxd/storage_lvm.go index f0fd62e..cb4f066 100644 --- a/lxd/storage_lvm.go +++ b/lxd/storage_lvm.go @@ -1056,10 +1056,10 @@ func (s *storageLvm) MigrationType() MigrationFSType { return MigrationFSType_RSYNC } -func (s *storageLvm) MigrationSource(container container) ([]MigrationStorageSource, error) { +func (s *storageLvm) MigrationSource(container container) (MigrationStorageSourceDriver, error) { return rsyncMigrationSource(container) } -func (s *storageLvm) MigrationSink(container container, snapshots []container, conn *websocket.Conn) error { - return rsyncMigrationSink(container, snapshots, conn) +func (s *storageLvm) MigrationSink(live bool, container container, snapshots []container, conn *websocket.Conn) error { + return rsyncMigrationSink(live, container, snapshots, conn) } diff --git a/lxd/storage_zfs.go b/lxd/storage_zfs.go index ec6cdb1..d06f5cc 100644 --- a/lxd/storage_zfs.go +++ b/lxd/storage_zfs.go @@ -1157,66 +1157,38 @@ func storageZFSSetPoolNameConfig(d *Daemon, poolname string) error { return nil } -type zfsMigrationSource struct { - lxdName string - deleteAfterSending bool - zfsName string - zfsParent string - - zfs *storageZfs -} - -func (s zfsMigrationSource) Name() string { - return s.lxdName +type zfsMigrationSourceDriver struct { + container container + snapshots []container + zfsSnapshotNames []string + zfs *storageZfs + runningSnapName string + stoppedSnapName string } -func (s zfsMigrationSource) IsSnapshot() bool { - return !s.deleteAfterSending +func (s *zfsMigrationSourceDriver) Snapshots() []container { + return s.snapshots } -func (s zfsMigrationSource) Send(conn *websocket.Conn) error { - args := []string{"send", fmt.Sprintf("%s/%s", s.zfs.zfsPool, s.zfsName)} - if s.zfsParent != "" { - args = append(args, "-i", fmt.Sprintf("%s/%s", s.zfs.zfsPool, s.zfsParent)) +func (s *zfsMigrationSourceDriver) send(conn *websocket.Conn, zfsName string, zfsParent string) error { + args := []string{"send", fmt.Sprintf("%s/containers/%s@%s", s.zfs.zfsPool, s.container.Name(), zfsName)} + if zfsParent != "" { + args = append(args, "-i", fmt.Sprintf("%s/containers/%s@%s", s.zfs.zfsPool, s.container.Name(), zfsParent)) } cmd := exec.Command("zfs", args...) stdout, err := cmd.StdoutPipe() if err != nil { - /* If this is not a lxd snapshot, that means it is the root container. - * The way we zfs send a root container is by taking a temporary zfs - * snapshot and sending that, then deleting that snapshot. Here's where - * we delete it. - * - * Note that we can't use a defer here, because zfsDestroy - * takes some time, and defer doesn't block the current - * goroutine. Due to our retry mechanism for network failures - * (and because zfsDestroy takes a while), we might retry - * moving (and thus creating a temporary snapshot) before the - * last one is deleted, resulting in either a snapshot name - * collision if it was fast enough, or an extra snapshot with - * an odd name on the destination side. Instead, we don't use - * defer so we always block until the snapshot is dead. - */ - if s.deleteAfterSending { - s.zfs.zfsDestroy(s.zfsName) - } return err } stderr, err := cmd.StderrPipe() if err != nil { - if s.deleteAfterSending { - s.zfs.zfsDestroy(s.zfsName) - } return err } if err := cmd.Start(); err != nil { - if s.deleteAfterSending { - s.zfs.zfsDestroy(s.zfsName) - } return err } @@ -1231,39 +1203,97 @@ func (s zfsMigrationSource) Send(conn *websocket.Conn) error { if err != nil { shared.Log.Error("problem with zfs send", "output", string(output)) } - if s.deleteAfterSending { - s.zfs.zfsDestroy(s.zfsName) - } + return err } +func (s *zfsMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn) error { + if s.container.IsSnapshot() { + fields := strings.SplitN(s.container.Name(), shared.SnapshotDelimiter, 2) + snapshotName := fmt.Sprintf("containers/%s@snapshot-%s", fields[0], fields[1]) + return s.send(conn, snapshotName, "") + } + + lastSnap := "" + + for i, snap := range s.zfsSnapshotNames { + + prev := "" + if i > 0 { + prev = s.zfsSnapshotNames[i-1] + } + + lastSnap = snap + + if err := s.send(conn, snap, prev); err != nil { + return err + } + } + + s.runningSnapName = fmt.Sprintf("migration-send-%s", uuid.NewRandom().String()) + if err := s.zfs.zfsSnapshotCreate(fmt.Sprintf("containers/%s", s.container.Name()), s.runningSnapName); err != nil { + return err + } + + if err := s.send(conn, s.runningSnapName, lastSnap); err != nil { + return err + } + + return nil +} + +func (s *zfsMigrationSourceDriver) SendAfterCheckpoint(conn *websocket.Conn) error { + s.stoppedSnapName = fmt.Sprintf("migration-send-%s", uuid.NewRandom().String()) + if err := s.zfs.zfsSnapshotCreate(fmt.Sprintf("containers/%s", s.container.Name()), s.stoppedSnapName); err != nil { + return err + } + + if err := s.send(conn, s.stoppedSnapName, s.runningSnapName); err != nil { + return err + } + + return nil +} + +func (s *zfsMigrationSourceDriver) Cleanup() { + if s.stoppedSnapName != "" { + s.zfs.zfsSnapshotDestroy(fmt.Sprintf("containers/%s", s.container.Name()), s.stoppedSnapName) + } + + if s.runningSnapName != "" { + s.zfs.zfsSnapshotDestroy(fmt.Sprintf("containers/%s", s.container.Name()), s.runningSnapName) + } +} + func (s *storageZfs) MigrationType() MigrationFSType { return MigrationFSType_ZFS } -func (s *storageZfs) MigrationSource(container container) ([]MigrationStorageSource, error) { - sources := []MigrationStorageSource{} - +func (s *storageZfs) MigrationSource(ct container) (MigrationStorageSourceDriver, error) { /* If the container is a snapshot, let's just send that; we don't need * to send anything else, because that's all the user asked for. */ - if container.IsSnapshot() { - fields := strings.SplitN(container.Name(), shared.SnapshotDelimiter, 2) - snapshotName := fmt.Sprintf("containers/%s@snapshot-%s", fields[0], fields[1]) - sources = append(sources, zfsMigrationSource{container.Name(), false, snapshotName, "", s}) - return sources, nil + if ct.IsSnapshot() { + return &zfsMigrationSourceDriver{container: ct}, nil + } + + driver := zfsMigrationSourceDriver{ + container: ct, + snapshots: []container{}, + zfsSnapshotNames: []string{}, + zfs: s, } /* List all the snapshots in order of reverse creation. The idea here * is that we send the oldest to newest snapshot, hopefully saving on * xfer costs. Then, after all that, we send the container itself. */ - snapshots, err := s.zfsListSnapshots(fmt.Sprintf("containers/%s", container.Name())) + snapshots, err := s.zfsListSnapshots(fmt.Sprintf("containers/%s", ct.Name())) if err != nil { return nil, err } - for i, snap := range snapshots { + for _, snap := range snapshots { /* In the case of e.g. multiple copies running at the same * time, we will have potentially multiple migration-send * snapshots. (Or in the case of the test suite, sometimes one @@ -1273,41 +1303,22 @@ func (s *storageZfs) MigrationSource(container container) ([]MigrationStorageSou continue } - prev := "" - if i > 0 { - prev = snapshots[i-1] - } + lxdName := fmt.Sprintf("%s%s%s", ct.Name(), shared.SnapshotDelimiter, snap[len("snapshot-"):]) + zfsName := fmt.Sprintf("containers/%s@%s", ct.Name(), snap) - lxdName := fmt.Sprintf("%s%s%s", container.Name(), shared.SnapshotDelimiter, snap[len("snapshot-"):]) - zfsName := fmt.Sprintf("containers/%s@%s", container.Name(), snap) - parentName := "" - if prev != "" { - parentName = fmt.Sprintf("containers/%s@%s", container.Name(), prev) + snapshot, err := containerLoadByName(s.d, lxdName) + if err != nil { + return nil, err } - sources = append(sources, zfsMigrationSource{lxdName, false, zfsName, parentName, s}) + driver.snapshots = append(driver.snapshots, snapshot) + driver.zfsSnapshotNames = append(driver.zfsSnapshotNames, zfsName) } - /* We can't send running fses, so let's snapshot the fs and send - * the snapshot. - */ - snapshotName := fmt.Sprintf("migration-send-%s", uuid.NewRandom().String()) - if err := s.zfsSnapshotCreate(fmt.Sprintf("containers/%s", container.Name()), snapshotName); err != nil { - return nil, err - } - - zfsName := fmt.Sprintf("containers/%s@%s", container.Name(), snapshotName) - zfsParent := "" - if len(sources) > 0 { - zfsParent = sources[len(sources)-1].(zfsMigrationSource).zfsName - } - - sources = append(sources, zfsMigrationSource{container.Name(), true, zfsName, zfsParent, s}) - - return sources, nil + return &driver, nil } -func (s *storageZfs) MigrationSink(container container, snapshots []container, conn *websocket.Conn) error { +func (s *storageZfs) MigrationSink(live bool, container container, snapshots []container, conn *websocket.Conn) error { zfsRecv := func(zfsName string) error { zfsFsName := fmt.Sprintf("%s/%s", s.zfsPool, zfsName) args := []string{"receive", "-F", "-u", zfsFsName} @@ -1384,11 +1395,35 @@ func (s *storageZfs) MigrationSink(container container, snapshots []container, c } } + defer func() { + /* clean up our migration-send snapshots that we got from recv. */ + snapshots, err := s.zfsListSnapshots(fmt.Sprintf("containers/%s", container.Name())) + if err != nil { + shared.Log.Error("failed listing snapshots post migration", "err", err) + return + } + + for _, snap := range snapshots { + if !strings.HasPrefix(snap, "migration-send") { + continue + } + + s.zfsSnapshotDestroy(fmt.Sprintf("containers/%s", container.Name()), snap) + } + }() + /* finally, do the real container */ if err := zfsRecv(zfsName); err != nil { return err } + if live { + /* and again for the post-running snapshot if this was a live migration */ + if err := zfsRecv(zfsName); err != nil { + return err + } + } + /* Sometimes, zfs recv mounts this anyway, even if we pass -u * (https://forums.freebsd.org/threads/zfs-receive-u-shouldnt-mount-received-filesystem-right.36844/) * but sometimes it doesn't. Let's try to mount, but not complain about
_______________________________________________ lxc-devel mailing list lxc-devel@lists.linuxcontainers.org http://lists.linuxcontainers.org/listinfo/lxc-devel