---
net/colo-proxy.c | 114 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 114 insertions(+)
diff --git a/net/colo-proxy.c b/net/colo-proxy.c
index ba2bbe7..2347bbf 100644
--- a/net/colo-proxy.c
+++ b/net/colo-proxy.c
@@ -172,6 +172,69 @@ bool colo_proxy_query_checkpoint(void)
return colo_do_checkpoint;
}
+/*
+ * send a packet to peer
+ * >=0: success
+ * <0: fail
+ */
+static ssize_t colo_proxy_sock_send(NetFilterState *nf,
+ const struct iovec *iov,
+ int iovcnt)
+{
+ COLOProxyState *s = FILTER_COLO_PROXY(nf);
+ ssize_t ret = 0;
+ ssize_t size = 0;
+ struct iovec sizeiov = {
+ .iov_base = &size,
+ .iov_len = sizeof(size)
+ };
+ size = iov_size(iov, iovcnt);
+ if (!size) {
+ return 0;
+ }
+
+ ret = iov_send(s->sockfd, &sizeiov, 1, 0, sizeof(size));
+ if (ret < 0) {
+ return ret;
+ }
+ ret = iov_send(s->sockfd, iov, iovcnt, 0, size);
+ return ret;
+}
+
+/*
+ * receive a packet from peer
+ * in primary: enqueue packet to secondary_list
+ * in secondary: pass packet to next
+ */
+static void colo_proxy_sock_receive(void *opaque)
+{
+ NetFilterState *nf = opaque;
+ COLOProxyState *s = FILTER_COLO_PROXY(nf);
+ ssize_t len = 0;
+ struct iovec sizeiov = {
+ .iov_base = &len,
+ .iov_len = sizeof(len)
+ };
+
+ iov_recv(s->sockfd, &sizeiov, 1, 0, sizeof(len));
+ if (len > 0 && len < NET_BUFSIZE) {
+ char *buf = g_malloc0(len);
+ struct iovec iov = {
+ .iov_base = buf,
+ .iov_len = len
+ };
+
+ iov_recv(s->sockfd, &iov, 1, 0, len);
+ if (s->colo_mode == COLO_MODE_PRIMARY) {
+ colo_proxy_enqueue_secondary_packet(nf, buf, len);
+ /* buf will be release when pakcet destroy */
+ } else {
+ qemu_net_queue_send(s->incoming_queue, nf->netdev,
+ 0, (const uint8_t *)buf, len, NULL);
+ }
+ }
+}
+
static ssize_t colo_proxy_receive_iov(NetFilterState *nf,
NetClientState *sender,
unsigned flags,
@@ -208,6 +271,57 @@ static void colo_proxy_cleanup(NetFilterState *nf)
qemu_event_destroy(&s->need_compare_ev);
}
+/* wait for peer connecting
+ * NOTE: this function will block the caller
+ * 0 on success, otherwise returns -1
+ */
+static int colo_wait_incoming(COLOProxyState *s)
+{
+ struct sockaddr_in addr;
+ socklen_t addrlen = sizeof(addr);
+ int accept_sock, err;
+ int fd = inet_listen(s->addr, NULL, 256, SOCK_STREAM, 0, NULL);
+
+ if (fd < 0) {
+ error_report("colo proxy listen failed");
+ return -1;
+ }
+
+ do {
+ accept_sock = qemu_accept(fd, (struct sockaddr *)&addr, &addrlen);
+ err = socket_error();
+ } while (accept_sock < 0 && err == EINTR);
+ closesocket(fd);
+
+ if (accept_sock < 0) {
+ error_report("colo proxy accept failed(%s)", strerror(err));
+ return -1;
+ }
+ s->sockfd = accept_sock;
+
+ qemu_set_fd_handler(s->sockfd, colo_proxy_sock_receive, NULL, (void *)s);
+
+ return 0;
+}
+
+/* try to connect listening server
+ * 0 on success, otherwise something wrong
+ */
+static ssize_t colo_proxy_connect(COLOProxyState *s)
+{
+ int sock;
+ sock = inet_connect(s->addr, NULL);
+
+ if (sock < 0) {
+ error_report("colo proxy inet_connect failed");
+ return -1;
+ }
+ s->sockfd = sock;
+ qemu_set_fd_handler(s->sockfd, colo_proxy_sock_receive, NULL, (void *)s);
+
+ return 0;
+}
+
static void colo_proxy_notify_checkpoint(void)
{
trace_colo_proxy("colo_proxy_notify_checkpoint");
--
1.9.1