The following pull request was submitted through Github. It can be accessed and reviewed at: https://github.com/lxc/lxd/pull/6528
This e-mail was sent by the LXC bot, direct replies will not reach the author unless they happen to be subscribed to this list. === Description (from pull-request) === Restructures migration sink Do() function to allow linking of new storage layer more cleanly.
From d5f578718e650c636f7e3282f19c7eeb4cdccba1 Mon Sep 17 00:00:00 2001 From: Thomas Parrott <thomas.parr...@canonical.com> Date: Fri, 29 Nov 2019 11:08:20 +0000 Subject: [PATCH 1/3] lxd/containers/post: Pass state to migration Do function Signed-off-by: Thomas Parrott <thomas.parr...@canonical.com> --- lxd/containers_post.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lxd/containers_post.go b/lxd/containers_post.go index 7bde5d7c67..43bf9ee95c 100644 --- a/lxd/containers_post.go +++ b/lxd/containers_post.go @@ -438,7 +438,7 @@ func createFromMigration(d *Daemon, project string, req *api.InstancesPost) resp }() // And finally run the migration. - err = sink.Do(op) + err = sink.Do(d.State(), op) if err != nil { return fmt.Errorf("Error transferring container data: %s", err) } From 6c1cf7822766b9e4712082481b455e8b915a8b10 Mon Sep 17 00:00:00 2001 From: Thomas Parrott <thomas.parr...@canonical.com> Date: Fri, 29 Nov 2019 11:08:57 +0000 Subject: [PATCH 2/3] lxd/migrate/container: Restructure of migrationSink.Do() - This is to accomodate the forthcoming link to the new storage layer. - Renames the offer and response header variables to align with naming used in custom volume migration. - Gathers together the legacy offer negotiation code so as to be easily segrated when new storage layer is linked. - Passes rsyncFeatures from generated response header rather than offer header, to align with custom volume migration. Signed-off-by: Thomas Parrott <thomas.parr...@canonical.com> --- lxd/migrate_container.go | 199 +++++++++++++++++++-------------------- 1 file changed, 95 insertions(+), 104 deletions(-) diff --git a/lxd/migrate_container.go b/lxd/migrate_container.go index 280a94311f..6ada2233a0 100644 --- a/lxd/migrate_container.go +++ b/lxd/migrate_container.go @@ -21,6 +21,7 @@ import ( "github.com/lxc/lxd/lxd/migration" "github.com/lxc/lxd/lxd/operations" "github.com/lxc/lxd/lxd/rsync" + "github.com/lxc/lxd/lxd/state" "github.com/lxc/lxd/lxd/util" "github.com/lxc/lxd/shared" "github.com/lxc/lxd/shared/api" @@ -783,13 +784,7 @@ func NewMigrationSink(args *MigrationSinkArgs) (*migrationSink, error) { return &sink, nil } -func (c *migrationSink) Do(migrateOp *operations.Operation) error { - if c.src.instance.Type() != instancetype.Container { - return fmt.Errorf("Instance type must be container") - } - - ct := c.src.instance.(*containerLXC) - +func (c *migrationSink) Do(state *state.State, migrateOp *operations.Operation) error { var err error if c.push { @@ -840,22 +835,19 @@ func (c *migrationSink) Do(migrateOp *operations.Operation) error { controller = c.dest.sendControl } - header := migration.MigrationHeader{} - if err := receiver(&header); err != nil { + offerHeader := migration.MigrationHeader{} + if err := receiver(&offerHeader); err != nil { controller(err) return err } - // Handle rsync options - rsyncFeatures := header.GetRsyncFeaturesSlice() - live := c.src.live if c.push { live = c.dest.live } criuType := migration.CRIUType_CRIU_RSYNC.Enum() - if header.Criu != nil && *header.Criu == migration.CRIUType_NONE { + if offerHeader.Criu != nil && *offerHeader.Criu == migration.CRIUType_NONE { criuType = migration.CRIUType_NONE.Enum() } else { if !live { @@ -863,64 +855,69 @@ func (c *migrationSink) Do(migrateOp *operations.Operation) error { } } - mySink := ct.Storage().MigrationSink - if c.refresh { - mySink = rsyncMigrationSink - } + // The function that will be executed to receive the sender's migration data. + var myTarget func(conn *websocket.Conn, op *operations.Operation, args MigrationSinkArgs) error - myType := ct.Storage().MigrationType() - resp := migration.MigrationHeader{ - Fs: &myType, - Criu: criuType, - Snapshots: header.Snapshots, - SnapshotNames: header.SnapshotNames, - Refresh: &c.refresh, - } + // The migration header to be sent back to source with our target options. + var respHeader migration.MigrationHeader - // Return those rsync features we know about (with the value sent by the remote) - if header.RsyncFeatures != nil { - resp.RsyncFeatures = &migration.RsyncFeatures{} - if resp.RsyncFeatures.Xattrs != nil { - resp.RsyncFeatures.Xattrs = header.RsyncFeatures.Xattrs - } + if c.src.instance.Type() == instancetype.Container { + ct := c.src.instance.(*containerLXC) + myTarget = ct.Storage().MigrationSink + myType := ct.Storage().MigrationType() - if resp.RsyncFeatures.Delete != nil { - resp.RsyncFeatures.Delete = header.RsyncFeatures.Delete + respHeader = migration.MigrationHeader{ + Fs: &myType, + Criu: criuType, + Snapshots: offerHeader.Snapshots, + SnapshotNames: offerHeader.SnapshotNames, + Refresh: &c.refresh, } - if resp.RsyncFeatures.Compress != nil { - resp.RsyncFeatures.Compress = header.RsyncFeatures.Compress + // Return those rsync features we know about (with the value sent by the remote). + if offerHeader.RsyncFeatures != nil { + respHeader.RsyncFeatures = &migration.RsyncFeatures{ + Xattrs: offerHeader.RsyncFeatures.Xattrs, + Delete: offerHeader.RsyncFeatures.Delete, + Compress: offerHeader.RsyncFeatures.Compress, + Bidirectional: offerHeader.RsyncFeatures.Bidirectional, + } } - if resp.RsyncFeatures.Bidirectional != nil { - resp.RsyncFeatures.Bidirectional = header.RsyncFeatures.Bidirectional + // Return those ZFS features we know about (with the value sent by the remote). + if len(zfsVersion) >= 3 && zfsVersion[0:3] != "0.6" { + if offerHeader.ZfsFeatures != nil && offerHeader.ZfsFeatures.Compress != nil { + respHeader.ZfsFeatures = &migration.ZfsFeatures{ + Compress: offerHeader.ZfsFeatures.Compress, + } + } } - } - // Return those ZFS features we know about (with the value sent by the remote) - if len(zfsVersion) >= 3 && zfsVersion[0:3] != "0.6" { - if header.ZfsFeatures != nil && header.ZfsFeatures.Compress != nil { - resp.ZfsFeatures = &migration.ZfsFeatures{ - Compress: header.ZfsFeatures.Compress, - } + // If refresh mode or the storage type the source has doesn't match what we have, + // then we have to use rsync. + if c.refresh || *offerHeader.Fs != *respHeader.Fs { + myTarget = rsyncMigrationSink + myType = migration.MigrationFSType_RSYNC } + } else { + return fmt.Errorf("Instance type not supported") } if c.refresh { - // Get our existing snapshots + // Get our existing snapshots. targetSnapshots, err := c.src.instance.Snapshots() if err != nil { controller(err) return err } - // Get the remote snapshots - sourceSnapshots := header.GetSnapshots() + // Get the remote snapshots. + sourceSnapshots := offerHeader.GetSnapshots() - // Compare the two sets + // Compare the two sets. syncSnapshots, deleteSnapshots := migrationCompareSnapshots(sourceSnapshots, targetSnapshots) - // Delete the extra local ones + // Delete the extra local ones. for _, snap := range deleteSnapshots { err := snap.Delete() if err != nil { @@ -934,29 +931,24 @@ func (c *migrationSink) Do(migrateOp *operations.Operation) error { snapshotNames = append(snapshotNames, snap.GetName()) } - resp.Snapshots = syncSnapshots - resp.SnapshotNames = snapshotNames - header.Snapshots = syncSnapshots - header.SnapshotNames = snapshotNames - } - - // If the storage type the source has doesn't match what we have, then - // we have to use rsync. - if c.refresh || *header.Fs != *resp.Fs { - mySink = rsyncMigrationSink - myType = migration.MigrationFSType_RSYNC - resp.Fs = &myType + respHeader.Snapshots = syncSnapshots + respHeader.SnapshotNames = snapshotNames + offerHeader.Snapshots = syncSnapshots + offerHeader.SnapshotNames = snapshotNames } - if header.GetPredump() == true { - // If the other side wants pre-dump and if - // this side supports it, let's use it. - resp.Predump = proto.Bool(true) + if offerHeader.GetPredump() == true { + // If the other side wants pre-dump and if this side supports it, let's use it. + respHeader.Predump = proto.Bool(true) } else { - resp.Predump = proto.Bool(false) + respHeader.Predump = proto.Bool(false) } - err = sender(&resp) + // Get rsync options from sender, these are passed into mySink function as part of + // MigrationSinkArgs below. + rsyncFeatures := respHeader.GetRsyncFeaturesSlice() + + err = sender(&respHeader) if err != nil { controller(err) return err @@ -967,7 +959,7 @@ func (c *migrationSink) Do(migrateOp *operations.Operation) error { imagesDir := "" srcIdmap := new(idmap.IdmapSet) - for _, idmapSet := range header.Idmap { + for _, idmapSet := range offerHeader.Idmap { e := idmap.IdmapEntry{ Isuid: *idmapSet.Isuid, Isgid: *idmapSet.Isgid, @@ -977,28 +969,24 @@ func (c *migrationSink) Do(migrateOp *operations.Operation) error { srcIdmap.Idmap = idmap.Extend(srcIdmap.Idmap, e) } - /* We do the fs receive in parallel so we don't have to reason - * about when to receive what. The sending side is smart enough - * to send the filesystem bits that it can before it seizes the - * container to start checkpointing, so the total transfer time - * will be minimized even if we're dumb here. - */ + // We do the fs receive in parallel so we don't have to reason about when to receive + // what. The sending side is smart enough to send the filesystem bits that it can + // before it seizes the container to start checkpointing, so the total transfer time + // will be minimized even if we're dumb here. fsTransfer := make(chan error) go func() { snapshots := []*migration.Snapshot{} - /* Legacy: we only sent the snapshot names, so we just - * copy the container's config over, same as we used to - * do. - */ - if len(header.SnapshotNames) != len(header.Snapshots) { - for _, name := range header.SnapshotNames { + // Legacy: we only sent the snapshot names, so we just copy the container's + // config over, same as we used to do. + if len(offerHeader.SnapshotNames) != len(offerHeader.Snapshots) { + for _, name := range offerHeader.SnapshotNames { base := snapshotToProtobuf(c.src.instance) base.Name = &name snapshots = append(snapshots, base) } } else { - snapshots = header.Snapshots + snapshots = offerHeader.Snapshots } var fsConn *websocket.Conn @@ -1027,16 +1015,19 @@ func (c *migrationSink) Do(migrateOp *operations.Operation) error { Snapshots: snapshots, } - err = mySink(fsConn, migrateOp, args) + err = myTarget(fsConn, migrateOp, args) if err != nil { fsTransfer <- err return } - err = resetContainerDiskIdmap(ct, srcIdmap) - if err != nil { - fsTransfer <- err - return + if c.src.instance.Type() == instancetype.Container { + ct := c.src.instance.(*containerLXC) + err = resetContainerDiskIdmap(ct, srcIdmap) + if err != nil { + fsTransfer <- err + return + } } fsTransfer <- nil @@ -1063,10 +1054,10 @@ func (c *migrationSink) Do(migrateOp *operations.Operation) error { FinalPreDump: proto.Bool(false), } - if resp.GetPredump() { + if respHeader.GetPredump() { for !sync.GetFinalPreDump() { logger.Debugf("About to receive rsync") - // Transfer a CRIU pre-dump + // Transfer a CRIU pre-dump. err = rsync.Recv(shared.AddSlash(imagesDir), &shared.WebsocketIO{Conn: criuConn}, nil, rsyncFeatures) if err != nil { restore <- err @@ -1075,8 +1066,8 @@ func (c *migrationSink) Do(migrateOp *operations.Operation) error { logger.Debugf("Done receiving from rsync") logger.Debugf("About to receive header") - // Check if this was the last pre-dump - // Only the FinalPreDump element if of interest + // Check if this was the last pre-dump. + // Only the FinalPreDump element if of interest. mtype, data, err := criuConn.ReadMessage() if err != nil { restore <- err @@ -1094,7 +1085,7 @@ func (c *migrationSink) Do(migrateOp *operations.Operation) error { } } - // Final CRIU dump + // Final CRIU dump. err = rsync.Recv(shared.AddSlash(imagesDir), &shared.WebsocketIO{Conn: criuConn}, nil, rsyncFeatures) if err != nil { restore <- err @@ -1119,15 +1110,16 @@ func (c *migrationSink) Do(migrateOp *operations.Operation) error { preDumpDir: "", } - // Currently we only do a single CRIU pre-dump so we - // can hardcode "final" here since we know that "final" is the - // folder for CRIU's final dump. - err = ct.Migrate(&criuMigrationArgs) - if err != nil { - restore <- err - return + // Currently we only do a single CRIU pre-dump so we can hardcode "final" + // here since we know that "final" is the folder for CRIU's final dump. + if c.src.instance.Type() == instancetype.Container { + ct := c.src.instance.(*containerLXC) + err = ct.Migrate(&criuMigrationArgs) + if err != nil { + restore <- err + return + } } - } restore <- nil @@ -1157,12 +1149,11 @@ func (c *migrationSink) Do(migrateOp *operations.Operation) error { if !*msg.Success { disconnector() return fmt.Errorf(*msg.Message) - } else { - // The source can only tell us it failed (e.g. if - // checkpointing failed). We have to tell the source - // whether or not the restore was successful. - logger.Debugf("Unknown message %v from source", msg) } + + // The source can only tell us it failed (e.g. if checkpointing failed). + // We have to tell the source whether or not the restore was successful. + logger.Debugf("Unknown message %v from source", msg) } } } From f11d28bb313b842d029508cead4b1bc45b50c0e4 Mon Sep 17 00:00:00 2001 From: Thomas Parrott <thomas.parr...@canonical.com> Date: Fri, 29 Nov 2019 11:13:33 +0000 Subject: [PATCH 3/3] lxd/migrate/storage/volumes: Comment restructure Signed-off-by: Thomas Parrott <thomas.parr...@canonical.com> --- lxd/migrate_storage_volumes.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/lxd/migrate_storage_volumes.go b/lxd/migrate_storage_volumes.go index a2c95321a1..3423ec8eb2 100644 --- a/lxd/migrate_storage_volumes.go +++ b/lxd/migrate_storage_volumes.go @@ -427,12 +427,10 @@ func (c *migrationSink) DoStorage(state *state.State, poolName string, req *api. restore := make(chan error) go func(c *migrationSink) { - /* We do the fs receive in parallel so we don't have to reason - * about when to receive what. The sending side is smart enough - * to send the filesystem bits that it can before it seizes the - * container to start checkpointing, so the total transfer time - * will be minimized even if we're dumb here. - */ + // We do the fs receive in parallel so we don't have to reason about when to receive + // what. The sending side is smart enough to send the filesystem bits that it can + // before it seizes the container to start checkpointing, so the total transfer time + // will be minimized even if we're dumb here. fsTransfer := make(chan error) go func() {
_______________________________________________ lxc-devel mailing list lxc-devel@lists.linuxcontainers.org http://lists.linuxcontainers.org/listinfo/lxc-devel