From: zhangchen <zhangchen.f...@cn.fujitsu.com> Secondary setup socket server for colo-forward primary setup connect to secondary for colo-forward add data structure will be uesed
Signed-off-by: zhangchen <zhangchen.f...@cn.fujitsu.com> --- net/colo-proxy.c | 87 +++++++++++++++++++++++++++++++++++++++++++++++++++++++- net/colo-proxy.h | 61 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 147 insertions(+), 1 deletion(-) diff --git a/net/colo-proxy.c b/net/colo-proxy.c index 98c2699..89d9616 100644 --- a/net/colo-proxy.c +++ b/net/colo-proxy.c @@ -22,6 +22,8 @@ #define DEBUG(format, ...) #endif +static char *mode; +static bool colo_do_checkpoint; static ssize_t colo_proxy_receive_iov(NetFilterState *nf, NetClientState *sender, @@ -46,13 +48,84 @@ static ssize_t colo_proxy_receive_iov(NetFilterState *nf, static void colo_proxy_cleanup(NetFilterState *nf) { - /* cleanup */ + ColoProxyState *s = FILTER_COLO_PROXY(nf); + close(s->sockfd); + s->sockfd = -1; + g_free(mode); + g_free(s->addr); } +static void colo_accept_incoming(ColoProxyState *s) +{ + DEBUG("into colo_accept_incoming\n"); + struct sockaddr_in addr; + socklen_t addrlen = sizeof(addr); + int acceptsock, err; + + do { + acceptsock = qemu_accept(s->sockfd, (struct sockaddr *)&addr, &addrlen); + err = socket_error(); + } while (acceptsock < 0 && err == EINTR); + qemu_set_fd_handler(s->sockfd, NULL, NULL, NULL); + closesocket(s->sockfd); + + DEBUG("accept colo proxy\n"); + + if (acceptsock < 0) { + printf("could not accept colo connection (%s)\n", + strerror(err)); + return; + } + s->sockfd = acceptsock; + /* TODO: handle the packets that primary forward */ + return; +} + +/* Return 1 on success, or return -1 if failed */ +static ssize_t colo_start_incoming(ColoProxyState *s) +{ + int serversock; + serversock = inet_listen(s->addr, NULL, 256, SOCK_STREAM, 0, NULL); + if (serversock < 0) { + g_free(s->addr); + return -1; + } + s->sockfd = serversock; + qemu_set_fd_handler(serversock, (IOHandler *)colo_accept_incoming, NULL, + (void *)s); + g_free(s->addr); + return 1; +} + +/* Return 1 on success, or return -1 if setup failed */ +static ssize_t colo_proxy_primary_setup(NetFilterState *nf) +{ + ColoProxyState *s = FILTER_COLO_PROXY(nf); + int sock; + sock = inet_connect(s->addr, NULL); + if (sock < 0) { + printf("colo proxy connect failed\n"); + g_free(s->addr); + return -1; + } + DEBUG("colo proxy connect success\n"); + s->sockfd = sock; + /* TODO: handle the packets that secondary forward */ + g_free(s->addr); + return 1; +} + +/* Return 1 on success, or return -1 if setup failed */ +static ssize_t colo_proxy_secondary_setup(NetFilterState *nf) +{ + ColoProxyState *s = FILTER_COLO_PROXY(nf); + return colo_start_incoming(s); +} static void colo_proxy_setup(NetFilterState *nf, Error **errp) { ColoProxyState *s = FILTER_COLO_PROXY(nf); + ssize_t ret = 0; if (!s->addr) { error_setg(errp, "filter colo_proxy needs 'addr' \ property set!"); @@ -68,17 +141,29 @@ static void colo_proxy_setup(NetFilterState *nf, Error **errp) s->sockfd = -1; s->has_failover = false; colo_do_checkpoint = false; + s->incoming_queue = qemu_new_net_queue(qemu_netfilter_pass_to_next, nf); + s->unprocessed_packets = g_hash_table_new_full(connection_key_hash, + connection_key_equal, + g_free, + connection_destroy); g_queue_init(&s->unprocessed_connections); if (!strcmp(mode, PRIMARY_MODE)) { s->colo_mode = COLO_PRIMARY_MODE; + ret = colo_proxy_primary_setup(nf); } else if (!strcmp(mode, SECONDARY_MODE)) { s->colo_mode = COLO_SECONDARY_MODE; + ret = colo_proxy_secondary_setup(nf); } else { error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "mode", "primary or secondary"); return; } + if (ret) { + DEBUG("colo_proxy_setup success\n"); + } else { + DEBUG("colo_proxy_setup failed\n"); + } } static void colo_proxy_class_init(ObjectClass *oc, void *data) diff --git a/net/colo-proxy.h b/net/colo-proxy.h index 94afbc7..f77db2f 100644 --- a/net/colo-proxy.h +++ b/net/colo-proxy.h @@ -60,4 +60,65 @@ typedef struct ColoProxyState { Coroutine *co; } ColoProxyState; +struct ip { +#ifdef HOST_WORDS_BIGENDIAN + uint8_t ip_v:4, /* version */ + ip_hl:4; /* header length */ +#else + uint8_t ip_hl:4, /* header length */ + ip_v:4; /* version */ +#endif + uint8_t ip_tos; /* type of service */ + uint16_t ip_len; /* total length */ + uint16_t ip_id; /* identification */ + uint16_t ip_off; /* fragment offset field */ +#define IP_DF 0x4000 /* don't fragment flag */ +#define IP_MF 0x2000 /* more fragments flag */ +#define IP_OFFMASK 0x1fff +/* mask for fragmenting bits */ + uint8_t ip_ttl; /* time to live */ + uint8_t ip_p; /* protocol */ + uint16_t ip_sum; /* checksum */ + uint32_t ip_src, ip_dst; /* source and dest address */ +} QEMU_PACKED; + +typedef struct Packet { + void *data; + union { + uint8_t *network_layer; + struct ip *ip; + }; + uint8_t *transport_layer; + int size; + ColoProxyState *s; + bool should_be_sent; + NetClientState *sender; +} Packet; + +typedef struct Connection_key { + /* (src, dst) must be grouped, in the same way than in IP header */ + uint32_t src; + uint32_t dst; + union { + uint32_t ports; + uint16_t port16[2]; + }; + uint8_t ip_proto; +} QEMU_PACKED Connection_key; + +typedef struct Connection { + /* connection primary send queue */ + GQueue primary_list; + /* connection secondary send queue */ + GQueue secondary_list; + /* flag to enqueue unprocessed_connections */ + bool processing; +} Connection; + +typedef enum { + PRIMARY_OUTPUT, /* primary output packet queue */ + PRIMARY_INPUT, /* primary input packet queue */ + SECONDARY_OUTPUT, /* secondary output packet queue */ +} packet_type; + #endif /* QEMU_COLO_PROXY_H */ -- 1.9.1