From: zhangchen <zhangchen.f...@cn.fujitsu.com>

Secondary setup socket server for colo-forward
primary setup connect to secondary for colo-forward
add data structure will be uesed

Signed-off-by: zhangchen <zhangchen.f...@cn.fujitsu.com>
---
 net/colo-proxy.c | 87 +++++++++++++++++++++++++++++++++++++++++++++++++++++++-
 net/colo-proxy.h | 61 +++++++++++++++++++++++++++++++++++++++
 2 files changed, 147 insertions(+), 1 deletion(-)

diff --git a/net/colo-proxy.c b/net/colo-proxy.c
index 98c2699..89d9616 100644
--- a/net/colo-proxy.c
+++ b/net/colo-proxy.c
@@ -22,6 +22,8 @@
 #define DEBUG(format, ...)
 #endif
 
+static char *mode;
+static bool colo_do_checkpoint;
 
 static ssize_t colo_proxy_receive_iov(NetFilterState *nf,
                                          NetClientState *sender,
@@ -46,13 +48,84 @@ static ssize_t colo_proxy_receive_iov(NetFilterState *nf,
 
 static void colo_proxy_cleanup(NetFilterState *nf)
 {
-     /* cleanup */
+    ColoProxyState *s = FILTER_COLO_PROXY(nf);
+    close(s->sockfd);
+    s->sockfd = -1;
+    g_free(mode);
+    g_free(s->addr);
 }
 
+static void colo_accept_incoming(ColoProxyState *s)
+{
+    DEBUG("into colo_accept_incoming\n");
+    struct sockaddr_in addr;
+    socklen_t addrlen = sizeof(addr);
+    int acceptsock, err;
+
+    do {
+        acceptsock = qemu_accept(s->sockfd, (struct sockaddr *)&addr, 
&addrlen);
+        err = socket_error();
+    } while (acceptsock < 0 && err == EINTR);
+    qemu_set_fd_handler(s->sockfd, NULL, NULL, NULL);
+    closesocket(s->sockfd);
+
+    DEBUG("accept colo proxy\n");
+
+    if (acceptsock < 0) {
+        printf("could not accept colo connection (%s)\n",
+                     strerror(err));
+        return;
+    }
+    s->sockfd = acceptsock;
+    /* TODO: handle the packets that primary forward */
+    return;
+}
+
+/* Return 1 on success, or return -1 if failed */
+static ssize_t colo_start_incoming(ColoProxyState *s)
+{
+    int serversock;
+    serversock = inet_listen(s->addr, NULL, 256, SOCK_STREAM, 0, NULL);
+    if (serversock < 0) {
+        g_free(s->addr);
+        return -1;
+    }
+    s->sockfd = serversock;
+    qemu_set_fd_handler(serversock, (IOHandler *)colo_accept_incoming, NULL,
+                        (void *)s);
+    g_free(s->addr);
+    return 1;
+}
+
+/* Return 1 on success, or return -1 if setup failed */
+static ssize_t colo_proxy_primary_setup(NetFilterState *nf)
+{
+    ColoProxyState *s = FILTER_COLO_PROXY(nf);
+    int sock;
+    sock = inet_connect(s->addr, NULL);
+    if (sock < 0) {
+        printf("colo proxy connect failed\n");
+        g_free(s->addr);
+        return -1;
+    }
+    DEBUG("colo proxy connect success\n");
+    s->sockfd = sock;
+   /* TODO: handle the packets that secondary forward */
+    g_free(s->addr);
+    return 1;
+}
+
+/* Return 1 on success, or return -1 if setup failed */
+static ssize_t colo_proxy_secondary_setup(NetFilterState *nf)
+{
+    ColoProxyState *s = FILTER_COLO_PROXY(nf);
+    return colo_start_incoming(s);
+}
 
 static void colo_proxy_setup(NetFilterState *nf, Error **errp)
 {
     ColoProxyState *s = FILTER_COLO_PROXY(nf);
+    ssize_t ret = 0;
     if (!s->addr) {
         error_setg(errp, "filter colo_proxy needs 'addr' \
                      property set!");
@@ -68,17 +141,29 @@ static void colo_proxy_setup(NetFilterState *nf, Error 
**errp)
     s->sockfd = -1;
     s->has_failover = false;
     colo_do_checkpoint = false;
+    s->incoming_queue = qemu_new_net_queue(qemu_netfilter_pass_to_next, nf);
+    s->unprocessed_packets = g_hash_table_new_full(connection_key_hash,
+                                                       connection_key_equal,
+                                                       g_free,
+                                                       connection_destroy);
     g_queue_init(&s->unprocessed_connections);
 
     if (!strcmp(mode, PRIMARY_MODE)) {
         s->colo_mode = COLO_PRIMARY_MODE;
+        ret = colo_proxy_primary_setup(nf);
     } else if (!strcmp(mode, SECONDARY_MODE)) {
         s->colo_mode = COLO_SECONDARY_MODE;
+        ret = colo_proxy_secondary_setup(nf);
     } else {
         error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "mode",
                     "primary or secondary");
         return;
     }
+    if (ret) {
+        DEBUG("colo_proxy_setup success\n");
+    } else {
+        DEBUG("colo_proxy_setup failed\n");
+    }
 }
 
 static void colo_proxy_class_init(ObjectClass *oc, void *data)
diff --git a/net/colo-proxy.h b/net/colo-proxy.h
index 94afbc7..f77db2f 100644
--- a/net/colo-proxy.h
+++ b/net/colo-proxy.h
@@ -60,4 +60,65 @@ typedef struct ColoProxyState {
     Coroutine *co;
 } ColoProxyState;
 
+struct ip {
+#ifdef HOST_WORDS_BIGENDIAN
+    uint8_t  ip_v:4,                 /* version */
+             ip_hl:4;                /* header length */
+#else
+    uint8_t  ip_hl:4,                /* header length */
+             ip_v:4;                 /* version */
+#endif
+    uint8_t  ip_tos;                 /* type of service */
+    uint16_t ip_len;                 /* total length */
+    uint16_t ip_id;                  /* identification */
+    uint16_t ip_off;                 /* fragment offset field */
+#define    IP_DF 0x4000              /* don't fragment flag */
+#define    IP_MF 0x2000              /* more fragments flag */
+#define    IP_OFFMASK 0x1fff
+/* mask for fragmenting bits */
+    uint8_t  ip_ttl;                 /* time to live */
+    uint8_t  ip_p;                   /* protocol */
+    uint16_t ip_sum;                 /* checksum */
+    uint32_t ip_src, ip_dst;         /* source and dest address */
+} QEMU_PACKED;
+
+typedef struct Packet {
+    void *data;
+    union {
+        uint8_t *network_layer;
+        struct ip *ip;
+    };
+    uint8_t *transport_layer;
+    int size;
+    ColoProxyState *s;
+    bool should_be_sent;
+    NetClientState *sender;
+} Packet;
+
+typedef struct Connection_key {
+    /* (src, dst) must be grouped, in the same way than in IP header */
+    uint32_t src;
+    uint32_t dst;
+    union {
+        uint32_t ports;
+        uint16_t port16[2];
+    };
+    uint8_t ip_proto;
+} QEMU_PACKED Connection_key;
+
+typedef struct Connection {
+    /* connection primary send queue */
+    GQueue primary_list;
+    /* connection secondary send queue */
+    GQueue secondary_list;
+     /* flag to enqueue unprocessed_connections */
+    bool processing;
+} Connection;
+
+typedef enum {
+    PRIMARY_OUTPUT,           /* primary output packet queue */
+    PRIMARY_INPUT,            /* primary input packet queue */
+    SECONDARY_OUTPUT,         /* secondary output packet queue */
+} packet_type;
+
 #endif /* QEMU_COLO_PROXY_H */
-- 
1.9.1




Reply via email to