On 04/18/2016 07:11 PM, Zhang Chen wrote: > if packets are same, we send primary packet and drop secondary > packet, otherwise notify COLO do checkpoint. > > Signed-off-by: Zhang Chen <zhangchen.f...@cn.fujitsu.com> > Signed-off-by: Li Zhijian <lizhij...@cn.fujitsu.com> > Signed-off-by: Wen Congyang <we...@cn.fujitsu.com> > --- > net/colo-compare.c | 126 > +++++++++++++++++++++++++++++++++++++++++++++++++++++ > trace-events | 2 + > 2 files changed, 128 insertions(+) > > diff --git a/net/colo-compare.c b/net/colo-compare.c > index dc57eac..4b5a2d4 100644 > --- a/net/colo-compare.c > +++ b/net/colo-compare.c > @@ -26,6 +26,7 @@ > #include "qemu/jhash.h" > #include "net/eth.h" > > +#define DEBUG_TCP_COMPARE 1 > #define TYPE_COLO_COMPARE "colo-compare" > #define COLO_COMPARE(obj) \ > OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE) > @@ -90,6 +91,13 @@ typedef struct CompareState { > GQueue unprocessed_connections; > /* proxy current hash size */ > uint32_t hashtable_size; > + > + /* notify compare thread */ > + QemuEvent event; > + /* compare thread, a thread for each NIC */ > + QemuThread thread; > + int thread_status; > + > } CompareState; > > typedef struct CompareClass { > @@ -132,6 +140,15 @@ enum { > SECONDARY_IN, > }; > > +enum { > + /* compare thread isn't started */ > + COMPARE_THREAD_NONE, > + /* compare thread is running */ > + COMPARE_THREAD_RUNNING, > + /* compare thread exit */ > + COMPARE_THREAD_EXIT, > +}; > + > static void packet_destroy(void *opaque, void *user_data); > static int compare_chr_send(CharDriverState *out, > const uint8_t *buf, > @@ -336,6 +353,94 @@ static void packet_destroy(void *opaque, void *user_data) > g_slice_free(Packet, pkt); > } > > +static inline void colo_dump_packet(Packet *pkt) > +{ > + int i; > + for (i = 0; i < pkt->size; i++) { > + printf("%02x ", ((uint8_t *)pkt->data)[i]); > + } > + printf("\n");
Can we use something like qemu_hexdump() here? > +} > + > +/* > + * The IP packets sent by primary and secondary > + * will be compared in here > + * TODO support ip fragment, Out-Of-Order > + * return: 0 means packet same > + * > 0 || < 0 means packet different > + */ > +static int colo_packet_compare(Packet *ppkt, Packet *spkt) > +{ > + trace_colo_compare_with_int("ppkt size", ppkt->size); > + trace_colo_compare_with_char("ppkt ip_src", inet_ntoa(ppkt->ip->ip_src)); > + trace_colo_compare_with_char("ppkt ip_dst", inet_ntoa(ppkt->ip->ip_dst)); > + trace_colo_compare_with_int("spkt size", spkt->size); > + trace_colo_compare_with_char("spkt ip_src", inet_ntoa(spkt->ip->ip_src)); > + trace_colo_compare_with_char("spkt ip_dst", inet_ntoa(spkt->ip->ip_dst)); Can we use a single tracepoint here instead? > + > + if (ppkt->size == spkt->size) { > + return memcmp(ppkt->data, spkt->data, spkt->size); > + } else { > + return -1; > + } > +} > + > +static int colo_packet_compare_all(Packet *spkt, Packet *ppkt) > +{ > + trace_colo_compare_main("compare all"); > + return colo_packet_compare(ppkt, spkt); Why need this? > +} > + > +/* > + * called from the compare thread on the primary > + * for compare connection > + */ > +static void colo_compare_connection(void *opaque, void *user_data) > +{ > + Connection *conn = opaque; > + Packet *pkt = NULL; > + GList *result = NULL; > + int ret; > + > + qemu_mutex_lock(&conn->list_lock); > + while (!g_queue_is_empty(&conn->primary_list) && > + !g_queue_is_empty(&conn->secondary_list)) { > + pkt = g_queue_pop_head(&conn->primary_list); > + result = g_queue_find_custom(&conn->secondary_list, > + pkt, (GCompareFunc)colo_packet_compare_all); > + > + if (result) { > + ret = compare_chr_send(pkt->s->chr_out, pkt->data, pkt->size); > + if (ret < 0) { > + error_report("colo_send_primary_packet failed"); > + } > + trace_colo_compare_main("packet same and release packet"); > + g_queue_remove(&conn->secondary_list, result->data); > + } else { > + trace_colo_compare_main("packet different"); > + g_queue_push_head(&conn->primary_list, pkt); Is this possible that the packet from secondary has not been arrived on time? If yes, do we still need to notify the checkpoint here? > + /* TODO: colo_notify_checkpoint();*/ > + break; > + } > + } > + qemu_mutex_unlock(&conn->list_lock); > +} > + > +static void *colo_compare_thread(void *opaque) > +{ > + CompareState *s = opaque; > + > + while (s->thread_status == COMPARE_THREAD_RUNNING) { > + qemu_event_wait(&s->event); > + qemu_event_reset(&s->event); > + qemu_mutex_lock(&s->conn_list_lock); > + g_queue_foreach(&s->conn_list, colo_compare_connection, NULL); > + qemu_mutex_unlock(&s->conn_list_lock); > + } > + > + return NULL; > +} > + > static int compare_chr_send(CharDriverState *out, > const uint8_t *buf, > uint32_t size) > @@ -440,6 +545,8 @@ static void compare_pri_chr_in(void *opaque, const > uint8_t *buf, int size) > if (packet_enqueue(s, PRIMARY_IN)) { > trace_colo_compare_main("primary: unsupported packet in"); > compare_chr_send(s->chr_out, s->pri_rs.buf, > s->pri_rs.packet_len); > + } else { > + qemu_event_set(&s->event); > } > } else if (ret == -1) { > qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL); > @@ -461,6 +568,8 @@ static void compare_sec_chr_in(void *opaque, const > uint8_t *buf, int size) > trace_colo_compare_main("secondary: unsupported packet in"); > /* should we send sec arp pkt? */ > compare_chr_send(s->chr_out, s->sec_rs.buf, > s->sec_rs.packet_len); > + } else { > + qemu_event_set(&s->event); > } > } else if (ret == -1) { > qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL); > @@ -519,6 +628,8 @@ static void compare_set_outdev(Object *obj, const char > *value, Error **errp) > static void colo_compare_complete(UserCreatable *uc, Error **errp) > { > CompareState *s = COLO_COMPARE(uc); > + char thread_name[64]; > + static int compare_id; > > if (!s->pri_indev || !s->sec_indev || !s->outdev) { > error_setg(errp, "colo compare needs 'primary_in' ," > @@ -564,6 +675,7 @@ static void colo_compare_complete(UserCreatable *uc, > Error **errp) > QTAILQ_INSERT_TAIL(&net_compares, s, next); > > g_queue_init(&s->conn_list); > + qemu_event_init(&s->event, false); > qemu_mutex_init(&s->conn_list_lock); > s->hashtable_size = 0; > > @@ -572,6 +684,13 @@ static void colo_compare_complete(UserCreatable *uc, > Error **errp) > g_free, > connection_destroy); > > + s->thread_status = COMPARE_THREAD_RUNNING; > + sprintf(thread_name, "compare %d", compare_id); > + qemu_thread_create(&s->thread, thread_name, > + colo_compare_thread, s, > + QEMU_THREAD_JOINABLE); > + compare_id++; > + > return; > } > > @@ -607,6 +726,13 @@ static void colo_compare_class_finalize(ObjectClass *oc, > void *data) > QTAILQ_REMOVE(&net_compares, s, next); > } > qemu_mutex_destroy(&s->conn_list_lock); > + > + if (s->thread.thread) { > + s->thread_status = COMPARE_THREAD_EXIT; > + qemu_event_set(&s->event); > + qemu_thread_join(&s->thread); > + } > + qemu_event_destroy(&s->event); > } > > static void colo_compare_init(Object *obj) > diff --git a/trace-events b/trace-events > index 8862288..978c47f 100644 > --- a/trace-events > +++ b/trace-events > @@ -1919,3 +1919,5 @@ aspeed_vic_write(uint64_t offset, unsigned size, > uint32_t data) "To 0x%" PRIx64 > > # net/colo-compare.c > colo_compare_main(const char *chr) "chr: %s" > +colo_compare_with_int(const char *sta, int size) ": %s = %d" > +colo_compare_with_char(const char *sta, const char *stb) ": %s = %s"