Allow multifd to open file-backed channels. This will be used when
enabling the fixed-ram migration stream format which expects a
seekable transport.

The QIOChannel read and write methods will use the preadv/pwritev
versions which don't update the file offset at each call so we can
reuse the fd without re-opening for every channel.

Note that this is just setup code and multifd cannot yet make use of
the file channels.

Signed-off-by: Fabiano Rosas <faro...@suse.de>
---
 migration/file.c      | 64 +++++++++++++++++++++++++++++++++++++++++--
 migration/file.h      | 10 +++++--
 migration/migration.c |  2 +-
 migration/multifd.c   | 14 ++++++++--
 migration/options.c   |  7 +++++
 migration/options.h   |  1 +
 6 files changed, 90 insertions(+), 8 deletions(-)

diff --git a/migration/file.c b/migration/file.c
index cf5b1bf365..93b9b7bf5d 100644
--- a/migration/file.c
+++ b/migration/file.c
@@ -17,6 +17,12 @@
 
 #define OFFSET_OPTION ",offset="
 
+static struct FileOutgoingArgs {
+    char *fname;
+    int flags;
+    int mode;
+} outgoing_args;
+
 /* Remove the offset option from @filespec and return it in @offsetp. */
 
 static int file_parse_offset(char *filespec, uint64_t *offsetp, Error **errp)
@@ -36,13 +42,62 @@ static int file_parse_offset(char *filespec, uint64_t 
*offsetp, Error **errp)
     return 0;
 }
 
+static void qio_channel_file_connect_worker(QIOTask *task, gpointer opaque)
+{
+    /* noop */
+}
+
+static void file_migration_cancel(Error *errp)
+{
+    MigrationState *s;
+
+    s = migrate_get_current();
+
+    migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
+                      MIGRATION_STATUS_FAILED);
+    migration_cancel(errp);
+}
+
+int file_send_channel_destroy(QIOChannel *ioc)
+{
+    if (ioc) {
+        qio_channel_close(ioc, NULL);
+        object_unref(OBJECT(ioc));
+    }
+    g_free(outgoing_args.fname);
+    outgoing_args.fname = NULL;
+
+    return 0;
+}
+
+void file_send_channel_create(QIOTaskFunc f, void *data)
+{
+    QIOChannelFile *ioc;
+    QIOTask *task;
+    Error *errp = NULL;
+
+    ioc = qio_channel_file_new_path(outgoing_args.fname,
+                                    outgoing_args.flags,
+                                    outgoing_args.mode, &errp);
+    if (!ioc) {
+        file_migration_cancel(errp);
+        return;
+    }
+
+    task = qio_task_new(OBJECT(ioc), f, (gpointer)data, NULL);
+    qio_task_run_in_thread(task, qio_channel_file_connect_worker,
+                           (gpointer)data, NULL, NULL);
+}
+
 void file_start_outgoing_migration(MigrationState *s, const char *filespec,
                                    Error **errp)
 {
-    g_autofree char *filename = g_strdup(filespec);
     g_autoptr(QIOChannelFile) fioc = NULL;
+    g_autofree char *filename = g_strdup(filespec);
     uint64_t offset = 0;
     QIOChannel *ioc;
+    int flags = O_CREAT | O_TRUNC | O_WRONLY;
+    mode_t mode = 0660;
 
     trace_migration_file_outgoing(filename);
 
@@ -50,12 +105,15 @@ void file_start_outgoing_migration(MigrationState *s, 
const char *filespec,
         return;
     }
 
-    fioc = qio_channel_file_new_path(filename, O_CREAT | O_WRONLY | O_TRUNC,
-                                     0600, errp);
+    fioc = qio_channel_file_new_path(filename, flags, mode, errp);
     if (!fioc) {
         return;
     }
 
+    outgoing_args.fname = g_strdup(filename);
+    outgoing_args.flags = flags;
+    outgoing_args.mode = mode;
+
     ioc = QIO_CHANNEL(fioc);
     if (offset && qio_channel_io_seek(ioc, offset, SEEK_SET, errp) < 0) {
         return;
diff --git a/migration/file.h b/migration/file.h
index 90fa4849e0..10148233c5 100644
--- a/migration/file.h
+++ b/migration/file.h
@@ -7,8 +7,14 @@
 
 #ifndef QEMU_MIGRATION_FILE_H
 #define QEMU_MIGRATION_FILE_H
-void file_start_incoming_migration(const char *filename, Error **errp);
 
-void file_start_outgoing_migration(MigrationState *s, const char *filename,
+#include "io/task.h"
+#include "channel.h"
+
+void file_start_incoming_migration(const char *filespec, Error **errp);
+
+void file_start_outgoing_migration(MigrationState *s, const char *filespec,
                                    Error **errp);
+void file_send_channel_create(QIOTaskFunc f, void *data);
+int file_send_channel_destroy(QIOChannel *ioc);
 #endif
diff --git a/migration/migration.c b/migration/migration.c
index cabb3ad3a5..ba806cea55 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -114,7 +114,7 @@ static bool migration_needs_seekable_channel(void)
 static bool uri_supports_multi_channels(const char *uri)
 {
     return strstart(uri, "tcp:", NULL) || strstart(uri, "unix:", NULL) ||
-           strstart(uri, "vsock:", NULL);
+           strstart(uri, "vsock:", NULL) || strstart(uri, "file:", NULL);
 }
 
 static bool uri_supports_seeking(const char *uri)
diff --git a/migration/multifd.c b/migration/multifd.c
index b912060b32..75a17ea8ab 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -17,6 +17,7 @@
 #include "exec/ramblock.h"
 #include "qemu/error-report.h"
 #include "qapi/error.h"
+#include "file.h"
 #include "ram.h"
 #include "migration.h"
 #include "migration-stats.h"
@@ -28,6 +29,7 @@
 #include "threadinfo.h"
 #include "options.h"
 #include "qemu/yank.h"
+#include "io/channel-file.h"
 #include "io/channel-socket.h"
 #include "yank_functions.h"
 
@@ -512,7 +514,11 @@ static void multifd_send_terminate_threads(Error *err)
 
 static int multifd_send_channel_destroy(QIOChannel *send)
 {
-    return socket_send_channel_destroy(send);
+    if (migrate_to_file()) {
+        return file_send_channel_destroy(send);
+    } else {
+        return socket_send_channel_destroy(send);
+    }
 }
 
 void multifd_save_cleanup(void)
@@ -907,7 +913,11 @@ static void multifd_new_send_channel_async(QIOTask *task, 
gpointer opaque)
 
 static void multifd_new_send_channel_create(gpointer opaque)
 {
-    socket_send_channel_create(multifd_new_send_channel_async, opaque);
+    if (migrate_to_file()) {
+        file_send_channel_create(multifd_new_send_channel_async, opaque);
+    } else {
+        socket_send_channel_create(multifd_new_send_channel_async, opaque);
+    }
 }
 
 int multifd_save_setup(Error **errp)
diff --git a/migration/options.c b/migration/options.c
index bb7a2bbe06..469d5d4c50 100644
--- a/migration/options.c
+++ b/migration/options.c
@@ -414,6 +414,13 @@ bool migrate_tls(void)
     return s->parameters.tls_creds && *s->parameters.tls_creds;
 }
 
+bool migrate_to_file(void)
+{
+    MigrationState *s = migrate_get_current();
+
+    return qemu_file_is_seekable(s->to_dst_file);
+}
+
 typedef enum WriteTrackingSupport {
     WT_SUPPORT_UNKNOWN = 0,
     WT_SUPPORT_ABSENT,
diff --git a/migration/options.h b/migration/options.h
index 4a3e7e36a8..01bba5b928 100644
--- a/migration/options.h
+++ b/migration/options.h
@@ -61,6 +61,7 @@ bool migrate_multifd_packets(void);
 bool migrate_postcopy(void);
 bool migrate_rdma(void);
 bool migrate_tls(void);
+bool migrate_to_file(void);
 
 /* capabilities helpers */
 
-- 
2.35.3


Reply via email to