This patch adds tests/vhost-user-cc.c. It is mostly a rehash of tests/vhost-user-bridge.c to support vhost sockets on both sides.
--- diff -urN a/tests/Makefile.include b/tests/Makefile.include --- a/tests/Makefile.include 2019-11-14 19:06:21.000000000 +0100 +++ b/tests/Makefile.include 2020-01-07 19:10:21.000000000 +0100 @@ -821,6 +821,7 @@ tests/test-x86-cpuid-compat$(EXESUF): tests/test-x86-cpuid-compat.o $(qtest-obj-y) tests/ivshmem-test$(EXESUF): tests/ivshmem-test.o contrib/ivshmem-server/ivshmem-server.o $(libqos-pc-obj-y) $(libqos-spapr-obj-y) tests/vhost-user-bridge$(EXESUF): tests/vhost-user-bridge.o $(test-util-obj-y) libvhost-user.a +tests/vhost-user-cc$(EXESUF): tests/vhost-user-cc.o $(test-util-obj-y) libvhost-user.a tests/test-uuid$(EXESUF): tests/test-uuid.o $(test-util-obj-y) tests/test-arm-mptimer$(EXESUF): tests/test-arm-mptimer.o tests/test-qapi-util$(EXESUF): tests/test-qapi-util.o $(test-util-obj-y) diff -urN a/tests/vhost-user-cc.c b/tests/vhost-user-cc.c --- a/tests/vhost-user-cc.c 1970-01-01 01:00:00.000000000 +0100 +++ b/tests/vhost-user-cc.c 2020-01-08 01:04:19.000000000 +0100 @@ -0,0 +1,782 @@ +/* + * Vhost User Cross Cable + * + * Copyright (c) 2020 V. <m...@winaoe.org> + * + * Refractored from and based on vhost-user-bridge.c + * Copyright (c) 2015 Red Hat, Inc. + * Victor Kaplansky <vict...@redhat.com> + * + * All bugs however are proudly my own. + * + * This work is licensed under the terms of the GNU GPL, version 2 or + * later. See the COPYING file in the top-level directory. + */ + +/* + * TODO: + * - allow pairs of -l + -r's to make a vhost-user-patch-panel + * - implement all request handlers. Still not implemented: + * vucc_get_queue_num_exec() + * vucc_send_rarp_exec() + * - test for broken requests and virtqueue. + * - implement features defined by Virtio 1.0 spec. + * - support mergeable buffers and indirect descriptors. + * - implement clean shutdown. + * - implement non-blocking writes to UDP backend. + * - implement polling strategy. + * - implement clean starting/stopping of vq processing + * - implement clean starting/stopping of used and buffers + * dirty page logging. + */ + +#define _FILE_OFFSET_BITS 64 + +#include "qemu/osdep.h" +#include "qemu/atomic.h" +#include "qemu/ctype.h" +#include "qemu/iov.h" +#include "standard-headers/linux/virtio_net.h" +#include "contrib/libvhost-user/libvhost-user.h" + + +#define VHOST_USER_CC_DEBUG 0 + +#define VUCC_LEFT 0 +#define VUCC_RIGHT 1 + +#define DPRINT(...) \ + do { \ + if (VHOST_USER_CC_DEBUG) { \ + printf(__VA_ARGS__); \ + } \ + } while (0) + +#define VUCC_SIDE(side) (side == VUCC_LEFT ? "LEFT" : "RIGHT") + +enum { + VHOST_USER_CC_MAX_QUEUES = 8, +}; + +typedef void (*CallbackFunc)(int sock, void *ctx, int side); + +typedef struct Event { + void *ctx; + int side; + CallbackFunc callback; +} Event; + +typedef struct Dispatcher { + int max_sock; + fd_set fdset; + Event events[FD_SETSIZE]; +} Dispatcher; + +// FIXME: Hack to get container_of working. +// Can we get a context in vu callbacks to not +// have to rely on globals or hacks like this? +typedef struct VuccVuDev { + VuDev vudev; + struct VuccState *state; + int side; +} VuccVuDev; + +typedef struct VuccState { + VuccVuDev *vudev[2]; + int quit; + Dispatcher dispatcher; + int hdrlen[2]; + int sock[2]; + int ready; + const char *ud_socket_path[2]; + bool client[2]; + bool host_notifier[2]; + struct { + int fd; + void *addr; + pthread_t thread; + } notifier[2]; +} VuccState; + +static void +vucc_die(const char *s) +{ + perror(s); + exit(1); +} + +static int +dispatcher_init(Dispatcher *dispr) +{ + FD_ZERO(&dispr->fdset); + dispr->max_sock = -1; + return 0; +} + +static int +dispatcher_add(Dispatcher *dispr, int sock, void *ctx, int side, CallbackFunc cb) +{ + if (sock >= FD_SETSIZE) { + fprintf(stderr, + "Error: Failed to add new event. sock %d should be less than %d\n", + sock, FD_SETSIZE); + return -1; + } + + dispr->events[sock].ctx = ctx; + dispr->events[sock].side = side; + dispr->events[sock].callback = cb; + + FD_SET(sock, &dispr->fdset); + if (sock > dispr->max_sock) { + dispr->max_sock = sock; + } + DPRINT("Added sock %d for watching. max_sock: %d\n", + sock, dispr->max_sock); + return 0; +} + +static int +dispatcher_remove(Dispatcher *dispr, int sock) +{ + if (sock >= FD_SETSIZE) { + fprintf(stderr, + "Error: Failed to remove event. sock %d should be less than %d\n", + sock, FD_SETSIZE); + return -1; + } + + FD_CLR(sock, &dispr->fdset); + DPRINT("Sock %d removed from dispatcher watch.\n", sock); + return 0; +} + +/* timeout in us */ +static int +dispatcher_wait(Dispatcher *dispr, uint32_t timeout) +{ + struct timeval tv; + tv.tv_sec = timeout / 1000000; + tv.tv_usec = timeout % 1000000; + + fd_set fdset = dispr->fdset; + + /* wait until some of sockets become readable. */ + int rc = select(dispr->max_sock + 1, &fdset, 0, 0, &tv); + + if (rc == -1) { + vucc_die("select"); + } + + /* Timeout */ + if (rc == 0) { + return 0; + } + + /* Now call callback for every ready socket. */ + + int sock; + for (sock = 0; sock < dispr->max_sock + 1; sock++) { + /* The callback on a socket can remove other sockets from the + * dispatcher, thus we have to check that the socket is + * still not removed from dispatcher's list + */ + if (FD_ISSET(sock, &fdset) && FD_ISSET(sock, &dispr->fdset)) { + Event *e = &dispr->events[sock]; + e->callback(sock, e->ctx, e->side); + } + } + + return 0; +} + +/* this function reverse the effect of iov_discard_front() it must be + * called with 'front' being the original struct iovec and 'bytes' + * being the number of bytes you shaved off + */ +static void +iov_restore_front(struct iovec *front, struct iovec *iov, size_t bytes) +{ + struct iovec *cur; + + for (cur = front; cur != iov; cur++) { + assert(bytes >= cur->iov_len); + bytes -= cur->iov_len; + } + + cur->iov_base -= bytes; + cur->iov_len += bytes; +} + +static void +iov_truncate(struct iovec *iov, unsigned iovc, size_t bytes) +{ + unsigned i; + + for (i = 0; i < iovc; i++, iov++) { + if (bytes < iov->iov_len) { + iov->iov_len = bytes; + return; + } + + bytes -= iov->iov_len; + } + + assert(!"couldn't truncate iov"); +} + +static void +vucc_tx(VuccState *state, int side, struct iovec *out_sg, unsigned int out_num) +{ + VuDev *dev = &state->vudev[side]->vudev; + VuVirtq *vq = vu_get_queue(dev, 0); + VuVirtqElement *elem = NULL; + struct iovec tx_sg[VIRTQUEUE_MAX_SIZE]; + struct virtio_net_hdr_mrg_rxbuf tx; + unsigned tx_cnt = 0; + int i = 0; + int hdrlen = state->hdrlen[side]; + struct virtio_net_hdr hdr = { + .flags = 0, + .gso_type = VIRTIO_NET_HDR_GSO_NONE + }; + // FIXME: 1 MB copy buffer. + static unsigned char speed_killer[1024*1024]; + + if (VHOST_USER_CC_DEBUG) { + DPRINT("\n *** IN VUCC_DO_TX %s ***\n", VUCC_SIDE(side)); + DPRINT(" hdrlen = %d\n", state->hdrlen[side]); + iov_hexdump(out_sg, out_num, stderr, "", 2048); + } + + if (!vu_queue_enabled(dev, vq) || + !vu_queue_started(dev, vq) || + !vu_queue_avail_bytes(dev, vq, hdrlen, 0)) { + DPRINT("Doing TX, but no available descriptors on TX virtq.\n"); + return; + } + + while (1) { + struct iovec *sg; + ssize_t ret, total = 0; + unsigned int num; + + elem = vu_queue_pop(dev, vq, sizeof(VuVirtqElement)); + if (!elem) { + break; + } + + if (elem->in_num < 1) { + fprintf(stderr, "virtio-net contains no in buffers\n"); + break; + } + + sg = elem->in_sg; + num = elem->in_num; + if (i == 0) { + if (hdrlen== 12) { + tx_cnt = iov_copy(tx_sg, ARRAY_SIZE(tx_sg), + sg, num, + offsetof(typeof(tx), num_buffers), + sizeof(tx.num_buffers)); + } + iov_from_buf(sg, num, 0, &hdr, sizeof hdr); + total += hdrlen; + ret = iov_discard_front(&sg, &num, hdrlen); + assert(ret == hdrlen); + } + + // FIXME: Unfortunately this does not work. Vhost does not do any data copying itself. + // ret = iov_copy(sg, num, out_sg, out_num, 0, + // MIN(iov_size(sg, num), iov_size(out_sg, out_num))); + // + // This does, but is ugly, slow and does mem copying twice. + ret = iov_from_buf(sg, num, 0, speed_killer, + iov_to_buf(out_sg, out_num, 0, speed_killer, + MIN(iov_size(out_sg, out_num), sizeof speed_killer) + ) + ); + // Is there an iov_copy which does copy data? + // Or can we do this in a zero copy way as vhost is supposed to be? + + if (i == 0) { + iov_restore_front(elem->in_sg, sg, hdrlen); + } + + total += ret; + iov_truncate(elem->in_sg, elem->in_num, total); + vu_queue_fill(dev, vq, elem, total, i++); + + free(elem); + elem = NULL; + + break; /* could loop if DONTWAIT worked? */ + } + + if (tx_cnt) { + tx.num_buffers = i; + iov_from_buf(tx_sg, tx_cnt, + 0, + &tx.num_buffers, sizeof tx.num_buffers); + } + + vu_queue_flush(dev, vq, i); + vu_queue_notify(dev, vq); + + free(elem); +} + +static void +vucc_handle_rx(VuDev *dev, int qidx) +{ + VuVirtq *vq = vu_get_queue(dev, qidx); + VuccVuDev *vudev = container_of(dev, VuccVuDev, vudev); + VuccState *state = vudev->state; + int hdrlen = state->hdrlen[vudev->side]; + VuVirtqElement *elem = NULL; + + assert(qidx % 2); + + for (;;) { + unsigned int out_num; + struct iovec sg[VIRTQUEUE_MAX_SIZE], *out_sg; + + elem = vu_queue_pop(dev, vq, sizeof(VuVirtqElement)); + if (!elem) { + break; + } + + out_num = elem->out_num; + out_sg = elem->out_sg; + if (out_num < 1) { + fprintf(stderr, "virtio-net header not in first element\n"); + break; + } + if (VHOST_USER_CC_DEBUG) { + DPRINT("\n *** IN VUCC_HANDLE_RX %s ***\n", VUCC_SIDE(vudev->side)); + iov_hexdump(out_sg, out_num, stderr, "", 2048); + } + + if (hdrlen) { + unsigned sg_num = iov_copy(sg, ARRAY_SIZE(sg), + out_sg, out_num, + hdrlen, -1); + out_num = sg_num; + out_sg = sg; + } + + vucc_tx(state, (vudev->side == VUCC_LEFT ? VUCC_RIGHT : VUCC_LEFT), out_sg, out_num); + + vu_queue_push(dev, vq, elem, 0); + vu_queue_notify(dev, vq); + + free(elem); + elem = NULL; + } + + free(elem); +} + +static void +vucc_receive_cb(int sock, void *ctx, int side) +{ + VuccState *state = (VuccState *)ctx; + + if (!vu_dispatch(&state->vudev[side]->vudev)) { + fprintf(stderr, "Error while dispatching\n"); + } +} + +typedef struct WatchData { + VuDev *dev; + vu_watch_cb cb; + void *data; +} WatchData; + +static void +watch_cb(int sock, void *ctx, int side) +{ + struct WatchData *wd = ctx; + + wd->cb(wd->dev, VU_WATCH_IN, wd->data); +} + +static void +vucc_set_watch(VuDev *dev, int fd, int condition, + vu_watch_cb cb, void *data) +{ + VuccVuDev *vudev = container_of(dev, VuccVuDev, vudev); + VuccState *state = vudev->state; + static WatchData watches[FD_SETSIZE]; + struct WatchData *wd = &watches[fd]; + + wd->cb = cb; + wd->data = data; + wd->dev = dev; + dispatcher_add(&state->dispatcher, fd, wd, -1, watch_cb); +} + +static void +vucc_remove_watch(VuDev *dev, int fd) +{ + VuccVuDev *vudev = container_of(dev, VuccVuDev, vudev); + VuccState *state = vudev->state; + + dispatcher_remove(&state->dispatcher, fd); +} + +static int +vucc_send_rarp_exec(VuDev *dev, VhostUserMsg *vmsg) +{ + DPRINT("Function %s() not implemented yet.\n", __func__); + return 0; +} + +static int +vucc_process_msg(VuDev *dev, VhostUserMsg *vmsg, int *do_reply) +{ + switch (vmsg->request) { + case VHOST_USER_SEND_RARP: + *do_reply = vucc_send_rarp_exec(dev, vmsg); + return 1; + default: + /* let the library handle the rest */ + return 0; + } + + return 0; +} + +static void +vucc_set_features(VuDev *dev, uint64_t features) +{ + VuccVuDev *vudev = container_of(dev, VuccVuDev, vudev); + VuccState *state = vudev->state; + + if ((features & (1ULL << VIRTIO_F_VERSION_1)) || + (features & (1ULL << VIRTIO_NET_F_MRG_RXBUF))) { + state->hdrlen[vudev->side] = 12; + } else { + state->hdrlen[vudev->side] = 10; + } +} + +static uint64_t +vucc_get_features(VuDev *dev) +{ + return 1ULL << VIRTIO_NET_F_GUEST_ANNOUNCE | + 1ULL << VIRTIO_NET_F_MRG_RXBUF | + 1ULL << VIRTIO_F_VERSION_1; +} + +static void +vucc_queue_set_started(VuDev *dev, int qidx, bool started) +{ + VuccVuDev *vudev = container_of(dev, VuccVuDev, vudev); + VuccState *state = vudev->state; + VuVirtq *vq = vu_get_queue(dev, qidx); + + if (started && state->notifier[vudev->side].fd >= 0) { + vu_set_queue_host_notifier(dev, vq, state->notifier[vudev->side].fd, + getpagesize(), + qidx * getpagesize()); + } + + if (qidx % 2 == 1) { + vu_set_queue_handler(dev, vq, started ? vucc_handle_rx : NULL); + } +} + +static void +vucc_panic(VuDev *dev, const char *msg) +{ + VuccVuDev *vudev = container_of(dev, VuccVuDev, vudev); + VuccState *state = vudev->state; + + fprintf(stderr, "PANIC: %s\n", msg); + + dispatcher_remove(&state->dispatcher, state->sock[vudev->side]); + state->quit = 1; +} + +static bool +vucc_queue_is_processed_in_order(VuDev *dev, int qidx) +{ + return true; +} + +static const VuDevIface vuiface = { + .get_features = vucc_get_features, + .set_features = vucc_set_features, + .process_msg = vucc_process_msg, + .queue_set_started = vucc_queue_set_started, + .queue_is_processed_in_order = vucc_queue_is_processed_in_order, +}; + +static void +vucc_accept_cb(int sock, void *ctx, int side) +{ + VuccState *state = (VuccState *)ctx; + int conn_fd; + struct sockaddr_un un; + socklen_t len = sizeof(un); + + conn_fd = accept(sock, (struct sockaddr *) &un, &len); + if (conn_fd == -1) { + vucc_die("accept()"); + } + DPRINT("Got connection from remote peer on sock %d\n", conn_fd); + + if (!vu_init(&state->vudev[side]->vudev, + VHOST_USER_CC_MAX_QUEUES, + conn_fd, + vucc_panic, + vucc_set_watch, + vucc_remove_watch, + &vuiface)) { + fprintf(stderr, "Failed to initialize libvhost-user\n"); + exit(1); + } + + dispatcher_add(&state->dispatcher, conn_fd, ctx, side, vucc_receive_cb); + dispatcher_remove(&state->dispatcher, sock); +} + +static void +vucc_new(VuccState *state, int side) +{ + struct sockaddr_un un; + CallbackFunc cb; + size_t len; + + /* Get a UNIX socket. */ + state->sock[side] = socket(AF_UNIX, SOCK_STREAM, 0); + if (state->sock[side] == -1) { + vucc_die("socket"); + } + + state->notifier[side].fd = -1; + + un.sun_family = AF_UNIX; + strcpy(un.sun_path, state->ud_socket_path[side]); + len = sizeof(un.sun_family) + strlen(state->ud_socket_path[side]); + + if (!state->client[side]) { + unlink(state->ud_socket_path[side]); + + if (bind(state->sock[side], (struct sockaddr *) &un, len) == -1) { + vucc_die("bind"); + } + + if (listen(state->sock[side], 1) == -1) { + vucc_die("listen"); + } + cb = vucc_accept_cb; + + DPRINT("Waiting for connections on UNIX socket %s ...\n", state->ud_socket_path[side]); + } else { + if (connect(state->sock[side], (struct sockaddr *)&un, len) == -1) { + vucc_die("connect"); + } + + if (!vu_init(&state->vudev[side]->vudev, + VHOST_USER_CC_MAX_QUEUES, + state->sock[side], + vucc_panic, + vucc_set_watch, + vucc_remove_watch, + &vuiface)) { + fprintf(stderr, "Failed to initialize libvhost-user\n"); + exit(1); + } + + cb = vucc_receive_cb; + } + + dispatcher_add(&state->dispatcher, state->sock[side], (void *)state, side, cb); +} + +typedef struct ThreadArgs { + VuccState *state; + int side; +} ThreadArgs; + +static void *notifier_thread(void *arg) +{ + ThreadArgs *args = (ThreadArgs *)arg; + VuccState *state = args->state; + int side = args->side; + int pagesize = getpagesize(); + int qidx; + + while (true) { + for (qidx = 0; qidx < VHOST_USER_CC_MAX_QUEUES; qidx++) { + uint16_t *n = state->notifier[side].addr + pagesize * qidx; + + if (*n == qidx) { + *n = 0xffff; + /* We won't miss notifications if we reset + * the memory first. */ + smp_mb(); + + DPRINT("Got a notification for queue%d via host notifier.\n", + qidx); + + if (qidx % 2 == 1) { + vucc_handle_rx(&state->vudev[side]->vudev, qidx); + } + } + usleep(1000); + } + } + + return NULL; +} + +static void +vucc_host_notifier_setup(VuccState *state, int side) +{ + char template[] = "/tmp/vucc-XXXXXX"; + pthread_t thread; + size_t length; + void *addr; + int fd; + ThreadArgs args; + + length = getpagesize() * VHOST_USER_CC_MAX_QUEUES; + + fd = mkstemp(template); + if (fd < 0) { + vucc_die("mkstemp()"); + } + + if (posix_fallocate(fd, 0, length) != 0) { + vucc_die("posix_fallocate()"); + } + + addr = mmap(NULL, length, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + if (addr == MAP_FAILED) { + vucc_die("mmap()"); + } + + memset(addr, 0xff, length); + + args.state = state; + args.side = side; + + if (pthread_create(&thread, NULL, notifier_thread, &args) != 0) { + vucc_die("pthread_create()"); + } + + state->notifier[side].fd = fd; + state->notifier[side].addr = addr; + state->notifier[side].thread = thread; +} + +static void +vucc_setup(VuccState *state, int side) +{ + vucc_new(state, side); + + if (state->host_notifier[side]) { + vucc_host_notifier_setup(state, side); + } +} + +static int +vucc_parse_options(VuccState *state, int argc, char *argv[]) +{ + int opt; + bool client = false; + bool host_notifier = false; + + while ((opt = getopt(argc, argv, "l:r:scH")) != -1) { + + switch (opt) { + case 'l': + state->ud_socket_path[VUCC_LEFT] = strdup(optarg); + state->client[VUCC_LEFT] = client; + state->host_notifier[VUCC_LEFT] = host_notifier; + client = false; + host_notifier = false; + break; + case 'r': + state->ud_socket_path[VUCC_RIGHT] = strdup(optarg); + state->client[VUCC_RIGHT] = client; + state->host_notifier[VUCC_RIGHT] = host_notifier; + client = false; + host_notifier = false; + break; + case 's': + client = false; + break; + case 'c': + client = true; + break; + case 'H': + host_notifier = true; + break; + default: + goto out; + } + } + + if (!state->ud_socket_path[VUCC_LEFT] || !state->ud_socket_path[VUCC_RIGHT]) { + goto out; + } + + return 0; + +out: + fprintf(stderr, "Usage: %s", argv[0]); + fprintf(stderr, "[-s or -c] [-H] <-l left_ud_socket_path> [-s or -c] [-H] <-r right_ud_socket_path>\n"); + fprintf(stderr, "\t-l/-r path to unix domain sockets.\n"); + fprintf(stderr, "\t-s server mode (default) for the next specified socket.\n"); + fprintf(stderr, "\t-c client mode for the next socket.\n"); + fprintf(stderr, "\t-H use host notifier for the next socket.\n"); + + return 1; +} + +int +main(int argc, char *argv[]) +{ + VuccState *state = (VuccState *) calloc(1, sizeof(VuccState)); + + state->vudev[VUCC_LEFT] = (VuccVuDev *) calloc(1, sizeof(VuccVuDev)); + state->vudev[VUCC_RIGHT] = (VuccVuDev *) calloc(1, sizeof(VuccVuDev)); + + state->vudev[VUCC_LEFT]->state = state->vudev[VUCC_RIGHT]->state = state; + state->vudev[VUCC_LEFT]->side = VUCC_LEFT; + state->vudev[VUCC_RIGHT]->side = VUCC_RIGHT; + + if (!state || !state->vudev[VUCC_LEFT] || !state->vudev[VUCC_LEFT]) { + fprintf(stderr, "State calloc() failed. Out of memory?"); + return 1; + } + + if (vucc_parse_options(state, argc, argv)) { + return 1; + } + + dispatcher_init(&state->dispatcher); + + vucc_setup(state, VUCC_LEFT); + vucc_setup(state, VUCC_RIGHT); + + DPRINT("ud left socket: %s (%s)\n", state->ud_socket_path[VUCC_LEFT], + state->client[VUCC_LEFT] ? "client" : "server"); + DPRINT("ud right socket: %s (%s)\n", state->ud_socket_path[VUCC_RIGHT], + state->client[VUCC_RIGHT] ? "client" : "server"); + + while (!state->quit) { + /* timeout 200ms */ + dispatcher_wait(&state->dispatcher, 200000); + /* Here one can try polling strategy. */ + } + + vu_deinit(&state->vudev[VUCC_LEFT]->vudev); + vu_deinit(&state->vudev[VUCC_RIGHT]->vudev); + + return 0; +}