[Qemu-devel] [PULL 22/57] Return path: Source handling of return path

2015-11-09 Thread Juan Quintela
From: "Dr. David Alan Gilbert" 

Open a return path, and handle messages that are received upon it.

Signed-off-by: Dr. David Alan Gilbert 
Reviewed-by: Juan Quintela 
Signed-off-by: Juan Quintela 
---
 include/migration/migration.h |   8 +++
 migration/migration.c | 159 +-
 trace-events  |  10 +++
 3 files changed, 175 insertions(+), 2 deletions(-)

diff --git a/include/migration/migration.h b/include/migration/migration.h
index 3ce3fda..571466b 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -80,6 +80,14 @@ struct MigrationState

 int state;
 MigrationParams params;
+
+/* State related to return path */
+struct {
+QEMUFile *from_dst_file;
+QemuThreadrp_thread;
+bool  error;
+} rp_state;
+
 double mbps;
 int64_t total_time;
 int64_t downtime;
diff --git a/migration/migration.c b/migration/migration.c
index 4317bab..b0782c6 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -696,6 +696,11 @@ static void migrate_fd_cancel(MigrationState *s)
 QEMUFile *f = migrate_get_current()->file;
 trace_migrate_fd_cancel();

+if (s->rp_state.from_dst_file) {
+/* shutdown the rp socket, so causing the rp thread to shutdown */
+qemu_file_shutdown(s->rp_state.from_dst_file);
+}
+
 do {
 old_state = s->state;
 if (!migration_is_setup_or_active(old_state)) {
@@ -1030,6 +1035,154 @@ int64_t migrate_xbzrle_cache_size(void)
 return s->xbzrle_cache_size;
 }

+/* migration thread support */
+/*
+ * Something bad happened to the RP stream, mark an error
+ * The caller shall print or trace something to indicate why
+ */
+static void mark_source_rp_bad(MigrationState *s)
+{
+s->rp_state.error = true;
+}
+
+static struct rp_cmd_args {
+ssize_t len; /* -1 = variable */
+const char *name;
+} rp_cmd_args[] = {
+[MIG_RP_MSG_INVALID]= { .len = -1, .name = "INVALID" },
+[MIG_RP_MSG_SHUT]   = { .len =  4, .name = "SHUT" },
+[MIG_RP_MSG_PONG]   = { .len =  4, .name = "PONG" },
+[MIG_RP_MSG_MAX]= { .len = -1, .name = "MAX" },
+};
+
+/*
+ * Handles messages sent on the return path towards the source VM
+ *
+ */
+static void *source_return_path_thread(void *opaque)
+{
+MigrationState *ms = opaque;
+QEMUFile *rp = ms->rp_state.from_dst_file;
+uint16_t header_len, header_type;
+const int max_len = 512;
+uint8_t buf[max_len];
+uint32_t tmp32, sibling_error;
+int res;
+
+trace_source_return_path_thread_entry();
+while (!ms->rp_state.error && !qemu_file_get_error(rp) &&
+   migration_is_setup_or_active(ms->state)) {
+trace_source_return_path_thread_loop_top();
+header_type = qemu_get_be16(rp);
+header_len = qemu_get_be16(rp);
+
+if (header_type >= MIG_RP_MSG_MAX ||
+header_type == MIG_RP_MSG_INVALID) {
+error_report("RP: Received invalid message 0x%04x length 0x%04x",
+header_type, header_len);
+mark_source_rp_bad(ms);
+goto out;
+}
+
+if ((rp_cmd_args[header_type].len != -1 &&
+header_len != rp_cmd_args[header_type].len) ||
+header_len > max_len) {
+error_report("RP: Received '%s' message (0x%04x) with"
+"incorrect length %d expecting %zd",
+rp_cmd_args[header_type].name, header_type, header_len,
+rp_cmd_args[header_type].len);
+mark_source_rp_bad(ms);
+goto out;
+}
+
+/* We know we've got a valid header by this point */
+res = qemu_get_buffer(rp, buf, header_len);
+if (res != header_len) {
+error_report("RP: Failed reading data for message 0x%04x"
+ " read %d expected %d",
+ header_type, res, header_len);
+mark_source_rp_bad(ms);
+goto out;
+}
+
+/* OK, we have the message and the data */
+switch (header_type) {
+case MIG_RP_MSG_SHUT:
+sibling_error = be32_to_cpup((uint32_t *)buf);
+trace_source_return_path_thread_shut(sibling_error);
+if (sibling_error) {
+error_report("RP: Sibling indicated error %d", sibling_error);
+mark_source_rp_bad(ms);
+}
+/*
+ * We'll let the main thread deal with closing the RP
+ * we could do a shutdown(2) on it, but we're the only user
+ * anyway, so there's nothing gained.
+ */
+goto out;
+
+case MIG_RP_MSG_PONG:
+tmp32 = be32_to_cpup((uint32_t *)buf);
+trace_source_return_path_thread_pong(tmp32);
+break;
+
+default:
+break;
+}
+}
+if (rp && qemu_file_get_error(rp)) {

[Qemu-devel] [PULL 22/57] Return path: Source handling of return path

2015-11-10 Thread Juan Quintela
From: "Dr. David Alan Gilbert" 

Open a return path, and handle messages that are received upon it.

Signed-off-by: Dr. David Alan Gilbert 
Reviewed-by: Juan Quintela 
Signed-off-by: Juan Quintela 
---
 include/migration/migration.h |   8 +++
 migration/migration.c | 159 +-
 trace-events  |  10 +++
 3 files changed, 175 insertions(+), 2 deletions(-)

diff --git a/include/migration/migration.h b/include/migration/migration.h
index 3ce3fda..571466b 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -80,6 +80,14 @@ struct MigrationState

 int state;
 MigrationParams params;
+
+/* State related to return path */
+struct {
+QEMUFile *from_dst_file;
+QemuThreadrp_thread;
+bool  error;
+} rp_state;
+
 double mbps;
 int64_t total_time;
 int64_t downtime;
diff --git a/migration/migration.c b/migration/migration.c
index 4317bab..295deb8 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -696,6 +696,11 @@ static void migrate_fd_cancel(MigrationState *s)
 QEMUFile *f = migrate_get_current()->file;
 trace_migrate_fd_cancel();

+if (s->rp_state.from_dst_file) {
+/* shutdown the rp socket, so causing the rp thread to shutdown */
+qemu_file_shutdown(s->rp_state.from_dst_file);
+}
+
 do {
 old_state = s->state;
 if (!migration_is_setup_or_active(old_state)) {
@@ -1030,6 +1035,154 @@ int64_t migrate_xbzrle_cache_size(void)
 return s->xbzrle_cache_size;
 }

+/* migration thread support */
+/*
+ * Something bad happened to the RP stream, mark an error
+ * The caller shall print or trace something to indicate why
+ */
+static void mark_source_rp_bad(MigrationState *s)
+{
+s->rp_state.error = true;
+}
+
+static struct rp_cmd_args {
+ssize_t len; /* -1 = variable */
+const char *name;
+} rp_cmd_args[] = {
+[MIG_RP_MSG_INVALID]= { .len = -1, .name = "INVALID" },
+[MIG_RP_MSG_SHUT]   = { .len =  4, .name = "SHUT" },
+[MIG_RP_MSG_PONG]   = { .len =  4, .name = "PONG" },
+[MIG_RP_MSG_MAX]= { .len = -1, .name = "MAX" },
+};
+
+/*
+ * Handles messages sent on the return path towards the source VM
+ *
+ */
+static void *source_return_path_thread(void *opaque)
+{
+MigrationState *ms = opaque;
+QEMUFile *rp = ms->rp_state.from_dst_file;
+uint16_t header_len, header_type;
+const int max_len = 512;
+uint8_t buf[max_len];
+uint32_t tmp32, sibling_error;
+int res;
+
+trace_source_return_path_thread_entry();
+while (!ms->rp_state.error && !qemu_file_get_error(rp) &&
+   migration_is_setup_or_active(ms->state)) {
+trace_source_return_path_thread_loop_top();
+header_type = qemu_get_be16(rp);
+header_len = qemu_get_be16(rp);
+
+if (header_type >= MIG_RP_MSG_MAX ||
+header_type == MIG_RP_MSG_INVALID) {
+error_report("RP: Received invalid message 0x%04x length 0x%04x",
+header_type, header_len);
+mark_source_rp_bad(ms);
+goto out;
+}
+
+if ((rp_cmd_args[header_type].len != -1 &&
+header_len != rp_cmd_args[header_type].len) ||
+header_len > max_len) {
+error_report("RP: Received '%s' message (0x%04x) with"
+"incorrect length %d expecting %zu",
+rp_cmd_args[header_type].name, header_type, header_len,
+(size_t)rp_cmd_args[header_type].len);
+mark_source_rp_bad(ms);
+goto out;
+}
+
+/* We know we've got a valid header by this point */
+res = qemu_get_buffer(rp, buf, header_len);
+if (res != header_len) {
+error_report("RP: Failed reading data for message 0x%04x"
+ " read %d expected %d",
+ header_type, res, header_len);
+mark_source_rp_bad(ms);
+goto out;
+}
+
+/* OK, we have the message and the data */
+switch (header_type) {
+case MIG_RP_MSG_SHUT:
+sibling_error = be32_to_cpup((uint32_t *)buf);
+trace_source_return_path_thread_shut(sibling_error);
+if (sibling_error) {
+error_report("RP: Sibling indicated error %d", sibling_error);
+mark_source_rp_bad(ms);
+}
+/*
+ * We'll let the main thread deal with closing the RP
+ * we could do a shutdown(2) on it, but we're the only user
+ * anyway, so there's nothing gained.
+ */
+goto out;
+
+case MIG_RP_MSG_PONG:
+tmp32 = be32_to_cpup((uint32_t *)buf);
+trace_source_return_path_thread_pong(tmp32);
+break;
+
+default:
+break;
+}
+}
+if (rp && qemu_file_get_erro