if packets are same, we send primary packet and drop secondary packet, otherwise notify COLO do checkpoint.
Signed-off-by: Zhang Chen <zhangchen.f...@cn.fujitsu.com> Signed-off-by: Li Zhijian <lizhij...@cn.fujitsu.com> Signed-off-by: Wen Congyang <we...@cn.fujitsu.com> --- net/colo-compare.c | 122 ++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 121 insertions(+), 1 deletion(-) diff --git a/net/colo-compare.c b/net/colo-compare.c index 0bb5a51..1debc0e 100644 --- a/net/colo-compare.c +++ b/net/colo-compare.c @@ -36,6 +36,7 @@ static QTAILQ_HEAD(, CompareState) net_compares = QTAILQ_HEAD_INITIALIZER(net_compares); static ssize_t hashtable_max_size; +static int colo_need_checkpoint; typedef struct ReadState { int state; /* 0 = getting length, 1 = getting data */ @@ -91,6 +92,13 @@ typedef struct CompareState { GQueue unprocessed_connections; /* proxy current hash size */ ssize_t hashtable_size; + + /* notify compare thread */ + QemuEvent event; + /* compare thread, a thread for each NIC */ + QemuThread thread; + int thread_status; + } CompareState; typedef struct Packet { @@ -129,6 +137,15 @@ enum { SECONDARY_IN, }; +enum { + /* compare thread isn't started */ + COMPARE_THREAD_NONE, + /* compare thread is running */ + COMPARE_THREAD_RUNNING, + /* compare thread exit */ + COMPARE_THREAD_EXIT, +}; + static void packet_destroy(void *opaque, void *user_data); static int compare_chr_send(CharDriverState *out, const uint8_t *buf, int size); @@ -340,6 +357,88 @@ static inline void colo_flush_connection(void *opaque, void *user_data) qemu_mutex_unlock(&conn->list_lock); } +static void colo_notify_checkpoint(void) +{ + colo_need_checkpoint = true; +} + +/* TODO colo_do_checkpoint() { + * we flush the connections and reset 'colo_need_checkpoint' + * } + */ + +static inline void colo_dump_packet(Packet *pkt) +{ + int i; + for (i = 0; i < pkt->size; i++) { + printf("%02x ", ((uint8_t *)pkt->data)[i]); + } + printf("\n"); +} + +/* + * The IP packets sent by primary and secondary + * will be compared in here + * TODO support ip fragment, Out-Of-Order + * return: 0 means packet same + * > 0 || < 0 means packet different + */ +static int colo_packet_compare(Packet *ppkt, Packet *spkt) +{ + colo_dump_packet(ppkt); + colo_dump_packet(spkt); + + if (ppkt->size == spkt->size) { + return memcmp(ppkt->data, spkt->data, spkt->size); + } else { + return -1; + } +} + +static void colo_compare_connection(void *opaque, void *user_data) +{ + Connection *conn = opaque; + Packet *pkt = NULL; + GList *result = NULL; + int ret; + + qemu_mutex_lock(&conn->list_lock); + while (!g_queue_is_empty(&conn->primary_list) && + !g_queue_is_empty(&conn->secondary_list)) { + pkt = g_queue_pop_head(&conn->primary_list); + result = g_queue_find_custom(&conn->secondary_list, + pkt, (GCompareFunc)colo_packet_compare); + + if (result) { + ret = compare_chr_send(pkt->s->chr_out, pkt->data, pkt->size); + if (ret < 0) { + error_report("colo_send_primary_packet failed"); + } + g_queue_remove(&conn->secondary_list, result); + } else { + g_queue_push_head(&conn->primary_list, pkt); + colo_notify_checkpoint(); + break; + } + } + qemu_mutex_unlock(&conn->list_lock); +} + +static void *colo_compare_thread(void *opaque) +{ + CompareState *s = opaque; + + while (s->thread_status == COMPARE_THREAD_RUNNING) { + qemu_event_wait(&s->event); + qemu_event_reset(&s->event); + qemu_mutex_lock(&s->conn_list_lock); + g_queue_foreach(&s->conn_list, colo_compare_connection, NULL); + qemu_mutex_unlock(&s->conn_list_lock); + } + + return NULL; +} + static int compare_chr_send(CharDriverState *out, const uint8_t *buf, int size) { int ret = 0; @@ -433,7 +532,9 @@ static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size) if (ret == 1) { if (packet_enqueue(s, PRIMARY_IN)) { error_report("primary: unsupported packet in"); - compare_chr_send(s->chr_out, buf, size); + compare_chr_send(s->chr_out, s->pri_rs.buf, s->pri_rs.packet_len); + } else { + qemu_event_set(&s->event); } } else if (ret == -1) { qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL); @@ -449,6 +550,8 @@ static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size) if (ret == 1) { if (packet_enqueue(s, SECONDARY_IN)) { error_report("secondary: unsupported packet in"); + } else { + qemu_event_set(&s->event); } } else if (ret == -1) { qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL); @@ -504,6 +607,8 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp) { CompareState *s = COLO_COMPARE(uc); struct sysinfo si; + char thread_name[64]; + static int compare_id; if (!s->pri_indev || !s->sec_indev || !s->outdev) { error_setg(errp, "colo compare needs 'primary_in' ," @@ -552,6 +657,7 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp) g_queue_init(&s->conn_list); qemu_mutex_init(&s->conn_list_lock); + colo_need_checkpoint = false; s->hashtable_size = 0; /* * Idea from kernel tcp.c: use 1/16384 of memory. On i386: 32MB @@ -572,6 +678,13 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp) g_free, connection_destroy); + s->thread_status = COMPARE_THREAD_RUNNING; + sprintf(thread_name, "proxy compare %d", compare_id); + qemu_thread_create(&s->thread, thread_name, + colo_compare_thread, s, + QEMU_THREAD_JOINABLE); + compare_id++; + return; out: @@ -615,6 +728,13 @@ static void colo_compare_class_finalize(ObjectClass *oc, void *data) QTAILQ_REMOVE(&net_compares, s, next); } qemu_mutex_destroy(&s->conn_list_lock); + + if (s->thread.thread) { + s->thread_status = COMPARE_THREAD_EXIT; + qemu_event_set(&s->event); + qemu_thread_join(&s->thread); + } + qemu_event_destroy(&s->event); } static void colo_compare_init(Object *obj) -- 1.9.1