The following pull request was submitted through Github.
It can be accessed and reviewed at: https://github.com/lxc/lxd/pull/6528

This e-mail was sent by the LXC bot, direct replies will not reach the author
unless they happen to be subscribed to this list.

=== Description (from pull-request) ===
Restructures migration sink Do() function to allow linking of new storage layer more cleanly.
From d5f578718e650c636f7e3282f19c7eeb4cdccba1 Mon Sep 17 00:00:00 2001
From: Thomas Parrott <thomas.parr...@canonical.com>
Date: Fri, 29 Nov 2019 11:08:20 +0000
Subject: [PATCH 1/3] lxd/containers/post: Pass state to migration Do function

Signed-off-by: Thomas Parrott <thomas.parr...@canonical.com>
---
 lxd/containers_post.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/lxd/containers_post.go b/lxd/containers_post.go
index 7bde5d7c67..43bf9ee95c 100644
--- a/lxd/containers_post.go
+++ b/lxd/containers_post.go
@@ -438,7 +438,7 @@ func createFromMigration(d *Daemon, project string, req 
*api.InstancesPost) resp
                }()
 
                // And finally run the migration.
-               err = sink.Do(op)
+               err = sink.Do(d.State(), op)
                if err != nil {
                        return fmt.Errorf("Error transferring container data: 
%s", err)
                }

From 6c1cf7822766b9e4712082481b455e8b915a8b10 Mon Sep 17 00:00:00 2001
From: Thomas Parrott <thomas.parr...@canonical.com>
Date: Fri, 29 Nov 2019 11:08:57 +0000
Subject: [PATCH 2/3] lxd/migrate/container: Restructure of migrationSink.Do()

- This is to accomodate the forthcoming link to the new storage layer.
- Renames the offer and response header variables to align with naming used in 
custom volume migration.
- Gathers together the legacy offer negotiation code so as to be easily 
segrated when new storage layer is linked.
- Passes rsyncFeatures from generated response header rather than offer header, 
to align with custom volume migration.

Signed-off-by: Thomas Parrott <thomas.parr...@canonical.com>
---
 lxd/migrate_container.go | 199 +++++++++++++++++++--------------------
 1 file changed, 95 insertions(+), 104 deletions(-)

diff --git a/lxd/migrate_container.go b/lxd/migrate_container.go
index 280a94311f..6ada2233a0 100644
--- a/lxd/migrate_container.go
+++ b/lxd/migrate_container.go
@@ -21,6 +21,7 @@ import (
        "github.com/lxc/lxd/lxd/migration"
        "github.com/lxc/lxd/lxd/operations"
        "github.com/lxc/lxd/lxd/rsync"
+       "github.com/lxc/lxd/lxd/state"
        "github.com/lxc/lxd/lxd/util"
        "github.com/lxc/lxd/shared"
        "github.com/lxc/lxd/shared/api"
@@ -783,13 +784,7 @@ func NewMigrationSink(args *MigrationSinkArgs) 
(*migrationSink, error) {
        return &sink, nil
 }
 
-func (c *migrationSink) Do(migrateOp *operations.Operation) error {
-       if c.src.instance.Type() != instancetype.Container {
-               return fmt.Errorf("Instance type must be container")
-       }
-
-       ct := c.src.instance.(*containerLXC)
-
+func (c *migrationSink) Do(state *state.State, migrateOp 
*operations.Operation) error {
        var err error
 
        if c.push {
@@ -840,22 +835,19 @@ func (c *migrationSink) Do(migrateOp 
*operations.Operation) error {
                controller = c.dest.sendControl
        }
 
-       header := migration.MigrationHeader{}
-       if err := receiver(&header); err != nil {
+       offerHeader := migration.MigrationHeader{}
+       if err := receiver(&offerHeader); err != nil {
                controller(err)
                return err
        }
 
-       // Handle rsync options
-       rsyncFeatures := header.GetRsyncFeaturesSlice()
-
        live := c.src.live
        if c.push {
                live = c.dest.live
        }
 
        criuType := migration.CRIUType_CRIU_RSYNC.Enum()
-       if header.Criu != nil && *header.Criu == migration.CRIUType_NONE {
+       if offerHeader.Criu != nil && *offerHeader.Criu == 
migration.CRIUType_NONE {
                criuType = migration.CRIUType_NONE.Enum()
        } else {
                if !live {
@@ -863,64 +855,69 @@ func (c *migrationSink) Do(migrateOp 
*operations.Operation) error {
                }
        }
 
-       mySink := ct.Storage().MigrationSink
-       if c.refresh {
-               mySink = rsyncMigrationSink
-       }
+       // The function that will be executed to receive the sender's migration 
data.
+       var myTarget func(conn *websocket.Conn, op *operations.Operation, args 
MigrationSinkArgs) error
 
-       myType := ct.Storage().MigrationType()
-       resp := migration.MigrationHeader{
-               Fs:            &myType,
-               Criu:          criuType,
-               Snapshots:     header.Snapshots,
-               SnapshotNames: header.SnapshotNames,
-               Refresh:       &c.refresh,
-       }
+       // The migration header to be sent back to source with our target 
options.
+       var respHeader migration.MigrationHeader
 
-       // Return those rsync features we know about (with the value sent by 
the remote)
-       if header.RsyncFeatures != nil {
-               resp.RsyncFeatures = &migration.RsyncFeatures{}
-               if resp.RsyncFeatures.Xattrs != nil {
-                       resp.RsyncFeatures.Xattrs = header.RsyncFeatures.Xattrs
-               }
+       if c.src.instance.Type() == instancetype.Container {
+               ct := c.src.instance.(*containerLXC)
+               myTarget = ct.Storage().MigrationSink
+               myType := ct.Storage().MigrationType()
 
-               if resp.RsyncFeatures.Delete != nil {
-                       resp.RsyncFeatures.Delete = header.RsyncFeatures.Delete
+               respHeader = migration.MigrationHeader{
+                       Fs:            &myType,
+                       Criu:          criuType,
+                       Snapshots:     offerHeader.Snapshots,
+                       SnapshotNames: offerHeader.SnapshotNames,
+                       Refresh:       &c.refresh,
                }
 
-               if resp.RsyncFeatures.Compress != nil {
-                       resp.RsyncFeatures.Compress = 
header.RsyncFeatures.Compress
+               // Return those rsync features we know about (with the value 
sent by the remote).
+               if offerHeader.RsyncFeatures != nil {
+                       respHeader.RsyncFeatures = &migration.RsyncFeatures{
+                               Xattrs:        offerHeader.RsyncFeatures.Xattrs,
+                               Delete:        offerHeader.RsyncFeatures.Delete,
+                               Compress:      
offerHeader.RsyncFeatures.Compress,
+                               Bidirectional: 
offerHeader.RsyncFeatures.Bidirectional,
+                       }
                }
 
-               if resp.RsyncFeatures.Bidirectional != nil {
-                       resp.RsyncFeatures.Bidirectional = 
header.RsyncFeatures.Bidirectional
+               // Return those ZFS features we know about (with the value sent 
by the remote).
+               if len(zfsVersion) >= 3 && zfsVersion[0:3] != "0.6" {
+                       if offerHeader.ZfsFeatures != nil && 
offerHeader.ZfsFeatures.Compress != nil {
+                               respHeader.ZfsFeatures = &migration.ZfsFeatures{
+                                       Compress: 
offerHeader.ZfsFeatures.Compress,
+                               }
+                       }
                }
-       }
 
-       // Return those ZFS features we know about (with the value sent by the 
remote)
-       if len(zfsVersion) >= 3 && zfsVersion[0:3] != "0.6" {
-               if header.ZfsFeatures != nil && header.ZfsFeatures.Compress != 
nil {
-                       resp.ZfsFeatures = &migration.ZfsFeatures{
-                               Compress: header.ZfsFeatures.Compress,
-                       }
+               // If refresh mode or the storage type the source has doesn't 
match what we have,
+               // then we have to use rsync.
+               if c.refresh || *offerHeader.Fs != *respHeader.Fs {
+                       myTarget = rsyncMigrationSink
+                       myType = migration.MigrationFSType_RSYNC
                }
+       } else {
+               return fmt.Errorf("Instance type not supported")
        }
 
        if c.refresh {
-               // Get our existing snapshots
+               // Get our existing snapshots.
                targetSnapshots, err := c.src.instance.Snapshots()
                if err != nil {
                        controller(err)
                        return err
                }
 
-               // Get the remote snapshots
-               sourceSnapshots := header.GetSnapshots()
+               // Get the remote snapshots.
+               sourceSnapshots := offerHeader.GetSnapshots()
 
-               // Compare the two sets
+               // Compare the two sets.
                syncSnapshots, deleteSnapshots := 
migrationCompareSnapshots(sourceSnapshots, targetSnapshots)
 
-               // Delete the extra local ones
+               // Delete the extra local ones.
                for _, snap := range deleteSnapshots {
                        err := snap.Delete()
                        if err != nil {
@@ -934,29 +931,24 @@ func (c *migrationSink) Do(migrateOp 
*operations.Operation) error {
                        snapshotNames = append(snapshotNames, snap.GetName())
                }
 
-               resp.Snapshots = syncSnapshots
-               resp.SnapshotNames = snapshotNames
-               header.Snapshots = syncSnapshots
-               header.SnapshotNames = snapshotNames
-       }
-
-       // If the storage type the source has doesn't match what we have, then
-       // we have to use rsync.
-       if c.refresh || *header.Fs != *resp.Fs {
-               mySink = rsyncMigrationSink
-               myType = migration.MigrationFSType_RSYNC
-               resp.Fs = &myType
+               respHeader.Snapshots = syncSnapshots
+               respHeader.SnapshotNames = snapshotNames
+               offerHeader.Snapshots = syncSnapshots
+               offerHeader.SnapshotNames = snapshotNames
        }
 
-       if header.GetPredump() == true {
-               // If the other side wants pre-dump and if
-               // this side supports it, let's use it.
-               resp.Predump = proto.Bool(true)
+       if offerHeader.GetPredump() == true {
+               // If the other side wants pre-dump and if this side supports 
it, let's use it.
+               respHeader.Predump = proto.Bool(true)
        } else {
-               resp.Predump = proto.Bool(false)
+               respHeader.Predump = proto.Bool(false)
        }
 
-       err = sender(&resp)
+       // Get rsync options from sender, these are passed into mySink function 
as part of
+       // MigrationSinkArgs below.
+       rsyncFeatures := respHeader.GetRsyncFeaturesSlice()
+
+       err = sender(&respHeader)
        if err != nil {
                controller(err)
                return err
@@ -967,7 +959,7 @@ func (c *migrationSink) Do(migrateOp *operations.Operation) 
error {
                imagesDir := ""
                srcIdmap := new(idmap.IdmapSet)
 
-               for _, idmapSet := range header.Idmap {
+               for _, idmapSet := range offerHeader.Idmap {
                        e := idmap.IdmapEntry{
                                Isuid:    *idmapSet.Isuid,
                                Isgid:    *idmapSet.Isgid,
@@ -977,28 +969,24 @@ func (c *migrationSink) Do(migrateOp 
*operations.Operation) error {
                        srcIdmap.Idmap = idmap.Extend(srcIdmap.Idmap, e)
                }
 
-               /* We do the fs receive in parallel so we don't have to reason
-                * about when to receive what. The sending side is smart enough
-                * to send the filesystem bits that it can before it seizes the
-                * container to start checkpointing, so the total transfer time
-                * will be minimized even if we're dumb here.
-                */
+               // We do the fs receive in parallel so we don't have to reason 
about when to receive
+               // what. The sending side is smart enough to send the 
filesystem bits that it can
+               // before it seizes the container to start checkpointing, so 
the total transfer time
+               // will be minimized even if we're dumb here.
                fsTransfer := make(chan error)
                go func() {
                        snapshots := []*migration.Snapshot{}
 
-                       /* Legacy: we only sent the snapshot names, so we just
-                        * copy the container's config over, same as we used to
-                        * do.
-                        */
-                       if len(header.SnapshotNames) != len(header.Snapshots) {
-                               for _, name := range header.SnapshotNames {
+                       // Legacy: we only sent the snapshot names, so we just 
copy the container's
+                       // config over, same as we used to do.
+                       if len(offerHeader.SnapshotNames) != 
len(offerHeader.Snapshots) {
+                               for _, name := range offerHeader.SnapshotNames {
                                        base := 
snapshotToProtobuf(c.src.instance)
                                        base.Name = &name
                                        snapshots = append(snapshots, base)
                                }
                        } else {
-                               snapshots = header.Snapshots
+                               snapshots = offerHeader.Snapshots
                        }
 
                        var fsConn *websocket.Conn
@@ -1027,16 +1015,19 @@ func (c *migrationSink) Do(migrateOp 
*operations.Operation) error {
                                Snapshots:     snapshots,
                        }
 
-                       err = mySink(fsConn, migrateOp, args)
+                       err = myTarget(fsConn, migrateOp, args)
                        if err != nil {
                                fsTransfer <- err
                                return
                        }
 
-                       err = resetContainerDiskIdmap(ct, srcIdmap)
-                       if err != nil {
-                               fsTransfer <- err
-                               return
+                       if c.src.instance.Type() == instancetype.Container {
+                               ct := c.src.instance.(*containerLXC)
+                               err = resetContainerDiskIdmap(ct, srcIdmap)
+                               if err != nil {
+                                       fsTransfer <- err
+                                       return
+                               }
                        }
 
                        fsTransfer <- nil
@@ -1063,10 +1054,10 @@ func (c *migrationSink) Do(migrateOp 
*operations.Operation) error {
                                FinalPreDump: proto.Bool(false),
                        }
 
-                       if resp.GetPredump() {
+                       if respHeader.GetPredump() {
                                for !sync.GetFinalPreDump() {
                                        logger.Debugf("About to receive rsync")
-                                       // Transfer a CRIU pre-dump
+                                       // Transfer a CRIU pre-dump.
                                        err = 
rsync.Recv(shared.AddSlash(imagesDir), &shared.WebsocketIO{Conn: criuConn}, 
nil, rsyncFeatures)
                                        if err != nil {
                                                restore <- err
@@ -1075,8 +1066,8 @@ func (c *migrationSink) Do(migrateOp 
*operations.Operation) error {
                                        logger.Debugf("Done receiving from 
rsync")
 
                                        logger.Debugf("About to receive header")
-                                       // Check if this was the last pre-dump
-                                       // Only the FinalPreDump element if of 
interest
+                                       // Check if this was the last pre-dump.
+                                       // Only the FinalPreDump element if of 
interest.
                                        mtype, data, err := 
criuConn.ReadMessage()
                                        if err != nil {
                                                restore <- err
@@ -1094,7 +1085,7 @@ func (c *migrationSink) Do(migrateOp 
*operations.Operation) error {
                                }
                        }
 
-                       // Final CRIU dump
+                       // Final CRIU dump.
                        err = rsync.Recv(shared.AddSlash(imagesDir), 
&shared.WebsocketIO{Conn: criuConn}, nil, rsyncFeatures)
                        if err != nil {
                                restore <- err
@@ -1119,15 +1110,16 @@ func (c *migrationSink) Do(migrateOp 
*operations.Operation) error {
                                preDumpDir:   "",
                        }
 
-                       // Currently we only do a single CRIU pre-dump so we
-                       // can hardcode "final" here since we know that "final" 
is the
-                       // folder for CRIU's final dump.
-                       err = ct.Migrate(&criuMigrationArgs)
-                       if err != nil {
-                               restore <- err
-                               return
+                       // Currently we only do a single CRIU pre-dump so we 
can hardcode "final"
+                       // here since we know that "final" is the folder for 
CRIU's final dump.
+                       if c.src.instance.Type() == instancetype.Container {
+                               ct := c.src.instance.(*containerLXC)
+                               err = ct.Migrate(&criuMigrationArgs)
+                               if err != nil {
+                                       restore <- err
+                                       return
+                               }
                        }
-
                }
 
                restore <- nil
@@ -1157,12 +1149,11 @@ func (c *migrationSink) Do(migrateOp 
*operations.Operation) error {
                        if !*msg.Success {
                                disconnector()
                                return fmt.Errorf(*msg.Message)
-                       } else {
-                               // The source can only tell us it failed (e.g. 
if
-                               // checkpointing failed). We have to tell the 
source
-                               // whether or not the restore was successful.
-                               logger.Debugf("Unknown message %v from source", 
msg)
                        }
+
+                       // The source can only tell us it failed (e.g. if 
checkpointing failed).
+                       // We have to tell the source whether or not the 
restore was successful.
+                       logger.Debugf("Unknown message %v from source", msg)
                }
        }
 }

From f11d28bb313b842d029508cead4b1bc45b50c0e4 Mon Sep 17 00:00:00 2001
From: Thomas Parrott <thomas.parr...@canonical.com>
Date: Fri, 29 Nov 2019 11:13:33 +0000
Subject: [PATCH 3/3] lxd/migrate/storage/volumes: Comment restructure

Signed-off-by: Thomas Parrott <thomas.parr...@canonical.com>
---
 lxd/migrate_storage_volumes.go | 10 ++++------
 1 file changed, 4 insertions(+), 6 deletions(-)

diff --git a/lxd/migrate_storage_volumes.go b/lxd/migrate_storage_volumes.go
index a2c95321a1..3423ec8eb2 100644
--- a/lxd/migrate_storage_volumes.go
+++ b/lxd/migrate_storage_volumes.go
@@ -427,12 +427,10 @@ func (c *migrationSink) DoStorage(state *state.State, 
poolName string, req *api.
        restore := make(chan error)
 
        go func(c *migrationSink) {
-               /* We do the fs receive in parallel so we don't have to reason
-                * about when to receive what. The sending side is smart enough
-                * to send the filesystem bits that it can before it seizes the
-                * container to start checkpointing, so the total transfer time
-                * will be minimized even if we're dumb here.
-                */
+               // We do the fs receive in parallel so we don't have to reason 
about when to receive
+               // what. The sending side is smart enough to send the 
filesystem bits that it can
+               // before it seizes the container to start checkpointing, so 
the total transfer time
+               // will be minimized even if we're dumb here.
                fsTransfer := make(chan error)
 
                go func() {
_______________________________________________
lxc-devel mailing list
lxc-devel@lists.linuxcontainers.org
http://lists.linuxcontainers.org/listinfo/lxc-devel

Reply via email to