The original 'timer_check_lock' mutex lock of struct CompareState is used to protect the 'conn_list' queue and its child queues which are 'primary_list' and 'secondary_list', which is a little abused and confusing
To make it clearer, we rename 'timer_check_lock' to 'conn_list_lock' which is used to protect 'conn_list' queue, use another 'conn_lock' to protect 'primary_list' and 'secondary_list'. Besides, fix some missing places which need these mutex lock. Signed-off-by: zhanghailiang <zhang.zhanghaili...@huawei.com> --- net/colo-compare.c | 33 +++++++++++++++++++++++---------- net/colo.c | 2 ++ net/colo.h | 2 ++ 3 files changed, 27 insertions(+), 10 deletions(-) diff --git a/net/colo-compare.c b/net/colo-compare.c index 5a4f335..9bea62a 100644 --- a/net/colo-compare.c +++ b/net/colo-compare.c @@ -79,13 +79,15 @@ typedef struct CompareState { * element type: Connection */ GQueue conn_list; + QemuMutex conn_list_lock; /* hashtable to save connection */ GHashTable *connection_track_table; + /* compare thread, a thread for each NIC */ QemuThread thread; + /* Timer used on the primary to find packets that are never matched */ QEMUTimer *timer; - QemuMutex timer_check_lock; } CompareState; typedef struct CompareClass { @@ -133,6 +135,7 @@ static int packet_enqueue(CompareState *s, int mode) } fill_connection_key(pkt, &key); + qemu_mutex_lock(&s->conn_list_lock); conn = connection_get(s->connection_track_table, &key, &s->conn_list); @@ -141,16 +144,19 @@ static int packet_enqueue(CompareState *s, int mode) g_queue_push_tail(&s->conn_list, conn); conn->processing = true; } + qemu_mutex_unlock(&s->conn_list_lock); if (mode == PRIMARY_IN) { if (g_queue_get_length(&conn->primary_list) <= MAX_QUEUE_SIZE) { + qemu_mutex_lock(&conn->conn_lock); g_queue_push_tail(&conn->primary_list, pkt); if (conn->ip_proto == IPPROTO_TCP) { g_queue_sort(&conn->primary_list, (GCompareDataFunc)seq_sorter, NULL); } + qemu_mutex_unlock(&conn->conn_lock); } else { error_report("colo compare primary queue size too big," "drop packet"); @@ -158,12 +164,14 @@ static int packet_enqueue(CompareState *s, int mode) } else { if (g_queue_get_length(&conn->secondary_list) <= MAX_QUEUE_SIZE) { + qemu_mutex_lock(&conn->conn_lock); g_queue_push_tail(&conn->secondary_list, pkt); if (conn->ip_proto == IPPROTO_TCP) { g_queue_sort(&conn->secondary_list, (GCompareDataFunc)seq_sorter, NULL); } + qemu_mutex_unlock(&conn->conn_lock); } else { error_report("colo compare secondary queue size too big," "drop packet"); @@ -338,10 +346,11 @@ static void colo_old_packet_check_one_conn(void *opaque, GList *result = NULL; int64_t check_time = REGULAR_PACKET_CHECK_MS; + qemu_mutex_lock(&conn->conn_lock); result = g_queue_find_custom(&conn->primary_list, &check_time, (GCompareFunc)colo_old_packet_check_one); - + qemu_mutex_unlock(&conn->conn_lock); if (result) { /* do checkpoint will flush old packet */ /* TODO: colo_notify_checkpoint();*/ @@ -357,7 +366,9 @@ static void colo_old_packet_check(void *opaque) { CompareState *s = opaque; + qemu_mutex_lock(&s->conn_list_lock); g_queue_foreach(&s->conn_list, colo_old_packet_check_one_conn, NULL); + qemu_mutex_unlock(&s->conn_list_lock); } /* @@ -372,11 +383,10 @@ static void colo_compare_connection(void *opaque, void *user_data) GList *result = NULL; int ret; + qemu_mutex_lock(&conn->conn_lock); while (!g_queue_is_empty(&conn->primary_list) && !g_queue_is_empty(&conn->secondary_list)) { - qemu_mutex_lock(&s->timer_check_lock); pkt = g_queue_pop_tail(&conn->primary_list); - qemu_mutex_unlock(&s->timer_check_lock); switch (conn->ip_proto) { case IPPROTO_TCP: result = g_queue_find_custom(&conn->secondary_list, @@ -411,13 +421,12 @@ static void colo_compare_connection(void *opaque, void *user_data) * until next comparison. */ trace_colo_compare_main("packet different"); - qemu_mutex_lock(&s->timer_check_lock); g_queue_push_tail(&conn->primary_list, pkt); - qemu_mutex_unlock(&s->timer_check_lock); /* TODO: colo_notify_checkpoint();*/ break; } } + qemu_mutex_unlock(&conn->conn_lock); } static int compare_chr_send(CharBackend *out, @@ -561,8 +570,10 @@ static void compare_pri_rs_finalize(SocketReadState *pri_rs) trace_colo_compare_main("primary: unsupported packet in"); compare_chr_send(&s->chr_out, pri_rs->buf, pri_rs->packet_len); } else { + qemu_mutex_lock(&s->conn_list_lock); /* compare connection */ g_queue_foreach(&s->conn_list, colo_compare_connection, s); + qemu_mutex_unlock(&s->conn_list_lock); } } @@ -573,8 +584,10 @@ static void compare_sec_rs_finalize(SocketReadState *sec_rs) if (packet_enqueue(s, SECONDARY_IN)) { trace_colo_compare_main("secondary: unsupported packet in"); } else { + qemu_mutex_lock(&s->conn_list_lock); /* compare connection */ g_queue_foreach(&s->conn_list, colo_compare_connection, s); + qemu_mutex_unlock(&s->conn_list_lock); } } @@ -618,9 +631,7 @@ static void check_old_packet_regular(void *opaque) * TODO: Make timer handler run in compare thread * like qemu_chr_add_handlers_full. */ - qemu_mutex_lock(&s->timer_check_lock); colo_old_packet_check(s); - qemu_mutex_unlock(&s->timer_check_lock); } /* @@ -665,7 +676,7 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp) net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize); g_queue_init(&s->conn_list); - qemu_mutex_init(&s->timer_check_lock); + qemu_mutex_init(&s->conn_list_lock); s->connection_track_table = g_hash_table_new_full(connection_key_hash, connection_key_equal, @@ -718,8 +729,10 @@ static void colo_compare_finalize(Object *obj) g_queue_free(&s->conn_list); if (qemu_thread_is_self(&s->thread)) { + qemu_mutex_lock(&s->conn_list_lock); /* compare connection */ g_queue_foreach(&s->conn_list, colo_compare_connection, s); + qemu_mutex_unlock(&s->conn_list_lock); qemu_thread_join(&s->thread); } @@ -727,7 +740,7 @@ static void colo_compare_finalize(Object *obj) timer_del(s->timer); } - qemu_mutex_destroy(&s->timer_check_lock); + qemu_mutex_destroy(&s->conn_list_lock); g_free(s->pri_indev); g_free(s->sec_indev); diff --git a/net/colo.c b/net/colo.c index 6a6eacd..267f29c 100644 --- a/net/colo.c +++ b/net/colo.c @@ -138,6 +138,7 @@ Connection *connection_new(ConnectionKey *key) conn->syn_flag = 0; g_queue_init(&conn->primary_list); g_queue_init(&conn->secondary_list); + qemu_mutex_init(&conn->conn_lock); return conn; } @@ -151,6 +152,7 @@ void connection_destroy(void *opaque) g_queue_foreach(&conn->secondary_list, packet_destroy, NULL); g_queue_free(&conn->secondary_list); g_slice_free(Connection, conn); + qemu_mutex_destroy(&conn->conn_lock); } Packet *packet_new(const void *data, int size) diff --git a/net/colo.h b/net/colo.h index 7c524f3..2d5f9be 100644 --- a/net/colo.h +++ b/net/colo.h @@ -61,6 +61,8 @@ typedef struct Connection { GQueue secondary_list; /* flag to enqueue unprocessed_connections */ bool processing; + /* Protect the access of primary_list or secondary list */ + QemuMutex conn_lock; uint8_t ip_proto; /* offset = secondary_seq - primary_seq */ tcp_seq offset; -- 1.8.3.1