Introduce migrate_ft_trans_put_ready() which kicks the FT transaction
cycle.  When ft_mode is on, migrate_fd_put_ready() would open
ft_trans_file and turn on event_tap.  To end or cancel FT transaction,
ft_mode and event_tap is turned off.  migrate_ft_trans_get_ready() is
called to receive ack from the receiver.

Signed-off-by: Yoshiaki Tamura <tamura.yoshi...@lab.ntt.co.jp>
---
 migration.c |  256 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
 migration.h |    2 +-
 2 files changed, 249 insertions(+), 9 deletions(-)

diff --git a/migration.c b/migration.c
index 40e4945..c03d660 100644
--- a/migration.c
+++ b/migration.c
@@ -21,6 +21,7 @@
 #include "qemu_socket.h"
 #include "block-migration.h"
 #include "qemu-objects.h"
+#include "event-tap.h"
 
 //#define DEBUG_MIGRATION
 
@@ -307,6 +308,20 @@ void migrate_fd_put_notify(void *opaque)
     qemu_file_put_notify(s->file);
 }
 
+static void migrate_fd_get_notify(void *opaque)
+{
+    FdMigrationState *s = opaque;
+
+    qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
+    qemu_file_get_notify(s->file);
+    if (qemu_file_has_error(s->file)) {
+        ft_mode = FT_ERROR;
+        qemu_savevm_state_cancel(s->mon, s->file);
+        migrate_fd_error(s);
+        event_tap_unregister();
+    }
+}
+
 ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size)
 {
     FdMigrationState *s = opaque;
@@ -331,15 +346,20 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void 
*data, size_t size)
     return ret;
 }
 
-int migrate_fd_get_buffer(void *opaque, uint8_t *data, int64_t pos, int size)
+int migrate_fd_get_buffer(void *opaque, uint8_t *data, int64_t pos, size_t 
size)
 {
     FdMigrationState *s = opaque;
-    ssize_t ret;
+    int ret;
     ret = s->read(s, data, size);
     
-    if (ret == -1)
+    if (ret == -1) {
         ret = -(s->get_error(s));
-    
+    }
+
+    if (ret == -EAGAIN) {
+        qemu_set_fd_handler2(s->fd, NULL, migrate_fd_get_notify, NULL, s);
+    }
+
     return ret;
 }
 
@@ -366,6 +386,195 @@ void migrate_fd_connect(FdMigrationState *s)
     migrate_fd_put_ready(s);
 }
 
+static int migrate_ft_trans_commit(void *opaque)
+{
+    FdMigrationState *s = opaque;
+    int ret = -1;
+
+    if (ft_mode != FT_TRANSACTION_COMMIT && ft_mode != FT_TRANSACTION_ATOMIC) {
+        fprintf(stderr,
+                "migrate_ft_trans_commit: invalid ft_mode %d\n", ft_mode);
+        goto out;
+    }
+
+    do {
+        if (ft_mode == FT_TRANSACTION_ATOMIC) {
+            if (qemu_ft_trans_begin(s->file) < 0) {
+                fprintf(stderr, "qemu_ft_trans_begin failed\n");
+                goto out;
+            }
+
+            if ((ret = qemu_savevm_trans_begin(s->mon, s->file, 0)) < 0) {
+                fprintf(stderr, "qemu_savevm_trans_begin failed\n");
+                goto out;
+            }
+
+            ft_mode = FT_TRANSACTION_COMMIT;
+            if (ret) {
+                /* don't proceed until if fd isn't ready */
+                goto out;
+            }
+        }
+
+        /* make the VM state consistent by flushing outstanding events */
+        vm_stop(0);
+        qemu_aio_flush();
+        bdrv_flush_all();
+
+        if ((ret = qemu_savevm_trans_complete(s->mon, s->file)) < 0) {
+            fprintf(stderr, "qemu_savevm_trans_complete failed\n");
+            goto out;
+        }
+
+        if (ret) {
+            /* don't proceed until if fd isn't ready */
+            ret = 1;
+            goto out;
+        }
+
+        if ((ret = qemu_ft_trans_commit(s->file)) < 0) {
+            fprintf(stderr, "qemu_ft_trans_commit failed\n");
+            goto out;
+        }
+
+        if (ret) {
+            ft_mode = FT_TRANSACTION_RECV;
+            ret = 1;
+            goto out;
+        }
+
+        /* flush and check if events are remaining */
+        if ((ret = event_tap_flush_one()) < 0) {
+            fprintf(stderr, "event_tap_flush_one failed\n");
+            goto out;
+        }
+
+        ft_mode =  ret ? FT_TRANSACTION_BEGIN : FT_TRANSACTION_ATOMIC;
+    } while (ft_mode != FT_TRANSACTION_BEGIN);
+
+    vm_start();
+    ret = 0;
+
+out:
+    return ret;
+}
+
+static int migrate_ft_trans_get_ready(void *opaque)
+{
+    FdMigrationState *s = opaque;
+    int ret = -1;
+
+    if (ft_mode != FT_TRANSACTION_RECV) {
+        fprintf(stderr,
+                "migrate_ft_trans_get_ready: invalid ft_mode %d\n", ft_mode);
+        goto error_out;
+    }
+
+    /* flush and check if events are remaining */
+    if ((ret = event_tap_flush_one()) < 0) {
+        fprintf(stderr, "event_tap_flush_one failed\n");
+        goto error_out;
+    }
+
+    if (ret) {
+        ft_mode = FT_TRANSACTION_BEGIN;
+    } else {
+        ft_mode = FT_TRANSACTION_ATOMIC;
+        if ((ret = migrate_ft_trans_commit(s)) < 0) {
+            goto error_out;
+        }
+        if (ret) {
+            goto out;
+        }
+    }
+
+    vm_start();
+    ret = 0;
+    goto out;
+
+error_out:
+    ft_mode = FT_ERROR;
+    qemu_savevm_state_cancel(s->mon, s->file);
+    migrate_fd_error(s);
+    event_tap_unregister();
+
+out:
+    return ret;
+}
+
+static int migrate_ft_trans_put_ready(void)
+{
+    FdMigrationState *s = migrate_to_fms(current_migration);
+    int ret = -1, init = 0, timeout;
+    static int64_t start, now;
+    
+    switch (ft_mode) {
+    case FT_INIT:
+        init = 1;
+    case FT_TRANSACTION_BEGIN:
+        now = start = qemu_get_clock(vm_clock);
+        if (qemu_ft_trans_begin(s->file) < 0) {
+            fprintf(stderr, "qemu_transaction_begin failed\n");
+            goto error_out;
+        }
+
+        if ((ret = qemu_savevm_trans_begin(s->mon, s->file, init)) < 0) {
+            fprintf(stderr, "qemu_savevm_trans_begin\n");
+            goto error_out;
+        }
+
+        if (ret) {
+            ft_mode = FT_TRANSACTION_ITER;
+        } else {
+            ft_mode = FT_TRANSACTION_COMMIT;
+            if (migrate_ft_trans_commit(s) < 0) {
+                goto error_out;
+            }
+        }
+        break;
+
+    case FT_TRANSACTION_ITER:
+        now = qemu_get_clock(vm_clock);
+        timeout = ((now - start) >= max_downtime);
+        if (timeout || qemu_savevm_state_iterate(s->mon, s->file) == 1) {
+            DPRINTF("ft trans iter timeout %d\n", timeout);
+
+            ft_mode = FT_TRANSACTION_COMMIT;
+            if (migrate_ft_trans_commit(s) < 0) {
+                goto error_out;
+            }
+            return 1;
+        }
+
+        ft_mode = FT_TRANSACTION_ITER;
+        break;
+
+    case FT_TRANSACTION_ATOMIC:
+    case FT_TRANSACTION_COMMIT:
+        if (migrate_ft_trans_commit(s) < 0) {
+            goto error_out;
+        }
+        break;
+
+    default:
+        fprintf(stderr,
+                "migrate_ft_trans_put_ready: invalid ft_mode %d", ft_mode);
+        goto error_out;
+    }
+
+    ret = 0;
+    goto out;
+
+error_out:
+    ft_mode = FT_ERROR;
+    qemu_savevm_state_cancel(s->mon, s->file);
+    migrate_fd_error(s);
+    event_tap_unregister();
+
+out:
+    return ret;
+}
+
 void migrate_fd_put_ready(void *opaque)
 {
     FdMigrationState *s = opaque;
@@ -393,13 +602,38 @@ void migrate_fd_put_ready(void *opaque)
         } else {
             state = MIG_STATE_COMPLETED;
         }
-        if (migrate_fd_cleanup(s) < 0) {
+
+        if (ft_mode && state == MIG_STATE_COMPLETED) {
+            /* close buffered_file and open ft_trans_file
+             * NB: fd won't get closed, and reused by ft_trans_file
+             */
+            qemu_fclose(s->file);
+
+            s->file = qemu_fopen_ops_ft_trans(s,
+                                              migrate_fd_put_buffer,
+                                              migrate_fd_get_buffer,
+                                              migrate_ft_trans_put_ready,
+                                              migrate_ft_trans_get_ready,
+                                              migrate_fd_wait_for_unfreeze,
+                                              migrate_fd_close,
+                                              1);
+            socket_set_nodelay(s->fd);
+
+            /* events are tapped from now */
+            event_tap_register(migrate_ft_trans_put_ready);
+
             if (old_vm_running) {
                 vm_start();
             }
-            state = MIG_STATE_ERROR;
+        } else {
+            if (migrate_fd_cleanup(s) < 0) {
+                if (old_vm_running) {
+                    vm_start();
+                }
+                state = MIG_STATE_ERROR;
+            }
+            s->state = state;
         }
-        s->state = state;
     }
 }
 
@@ -419,8 +653,14 @@ void migrate_fd_cancel(MigrationState *mig_state)
     DPRINTF("cancelling migration\n");
 
     s->state = MIG_STATE_CANCELLED;
-    qemu_savevm_state_cancel(s->mon, s->file);
 
+    if (ft_mode) {
+        qemu_ft_trans_cancel(s->file);
+        ft_mode = FT_OFF;
+        event_tap_unregister();
+    }
+
+    qemu_savevm_state_cancel(s->mon, s->file);
     migrate_fd_cleanup(s);
 }
 
diff --git a/migration.h b/migration.h
index f033262..7bf6747 100644
--- a/migration.h
+++ b/migration.h
@@ -116,7 +116,7 @@ void migrate_fd_put_notify(void *opaque);
 
 ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size);
 
-int migrate_fd_get_buffer(void *opaque, uint8_t *data, int64_t pos, int size);
+int migrate_fd_get_buffer(void *opaque, uint8_t *data, int64_t pos, size_t 
size);
 
 void migrate_fd_connect(FdMigrationState *s);
 
-- 
1.7.1.2


Reply via email to