On Fri, 8 May 2020 02:19:00 +0000 "Zhang, Chen" <chen.zh...@intel.com> wrote: > > > No need to init the notify_sendco each time, because the notify dev just > > an optional parameter. > > > You can use the if (s->notify_dev) here. Just Xen use the chr_notify_dev. > > > > > > > Ok, I will change that and the code below in the next version. > > > > > Overall, make the chr_send job to coroutine is a good idea. It looks good > > > > > for me. > > > And your patch inspired me, it looks we can re-use the compare_chr_send > > code on filter mirror/redirector too. > > > > I already have patch for that, but I don't think it is a good idea, because > > the > > guest then can send packets faster than colo-compare can process. This leads > > bufferbloat and the performance drops in my tests: > > Client-to-server tcp: > > without patch: ~66 Mbit/s > > with patch: ~59 Mbit/s > > Server-to-client tcp: > > without patch: ~702 Kbit/s > > with patch: ~328 Kbit/s > > Oh, a big performance drop, is that caused by memcpy/zero_copy parts ? > > Thanks > Zhang Chen
No, there is no memcpy overhead with this patch, see below. Regards, Lukas Straub --- net/filter-mirror.c | 142 +++++++++++++++++++++++++++++++++----------- 1 file changed, 106 insertions(+), 36 deletions(-) diff --git a/net/filter-mirror.c b/net/filter-mirror.c index d83e815545..6bcd317502 100644 --- a/net/filter-mirror.c +++ b/net/filter-mirror.c @@ -20,6 +20,8 @@ #include "chardev/char-fe.h" #include "qemu/iov.h" #include "qemu/sockets.h" +#include "block/aio-wait.h" +#include "qemu/coroutine.h" #define FILTER_MIRROR(obj) \ OBJECT_CHECK(MirrorState, (obj), TYPE_FILTER_MIRROR) @@ -31,6 +33,18 @@ #define TYPE_FILTER_REDIRECTOR "filter-redirector" #define REDIRECTOR_MAX_LEN NET_BUFSIZE +typedef struct SendCo { + Coroutine *co; + GQueue send_list; + bool done; + int ret; +} SendCo; + +typedef struct SendEntry { + ssize_t size; + uint8_t buf[]; +} SendEntry; + typedef struct MirrorState { NetFilterState parent_obj; char *indev; @@ -38,59 +52,101 @@ typedef struct MirrorState { CharBackend chr_in; CharBackend chr_out; SocketReadState rs; + SendCo sendco; bool vnet_hdr; } MirrorState; -static int filter_send(MirrorState *s, - const struct iovec *iov, - int iovcnt) +static void coroutine_fn _filter_send(void *opaque) { + MirrorState *s = opaque; + SendCo *sendco = &s->sendco; NetFilterState *nf = NETFILTER(s); int ret = 0; - ssize_t size = 0; - uint32_t len = 0; - char *buf; - - size = iov_size(iov, iovcnt); - if (!size) { - return 0; - } - len = htonl(size); - ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len)); - if (ret != sizeof(len)) { - goto err; - } + while (!g_queue_is_empty(&sendco->send_list)) { + SendEntry *entry = g_queue_pop_tail(&sendco->send_list); + uint32_t len = htonl(entry->size); - if (s->vnet_hdr) { - /* - * If vnet_hdr = on, we send vnet header len to make other - * module(like colo-compare) know how to parse net - * packet correctly. - */ - ssize_t vnet_hdr_len; + ret = qemu_chr_fe_write_all(&s->chr_out, + (uint8_t *)&len, + sizeof(len)); + if (ret != sizeof(len)) { + g_free(entry); + goto err; + } - vnet_hdr_len = nf->netdev->vnet_hdr_len; + if (s->vnet_hdr) { + /* + * If vnet_hdr = on, we send vnet header len to make other + * module(like colo-compare) know how to parse net + * packet correctly. + */ + + len = htonl(nf->netdev->vnet_hdr_len); + ret = qemu_chr_fe_write_all(&s->chr_out, + (uint8_t *)&len, + sizeof(len)); + if (ret != sizeof(len)) { + g_free(entry); + goto err; + } + } - len = htonl(vnet_hdr_len); - ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len)); - if (ret != sizeof(len)) { + ret = qemu_chr_fe_write_all(&s->chr_out, + (uint8_t *)entry->buf, + entry->size); + if (ret != entry->size) { + g_free(entry); goto err; } - } - buf = g_malloc(size); - iov_to_buf(iov, iovcnt, 0, buf, size); - ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size); - g_free(buf); - if (ret != size) { - goto err; + g_free(entry); } - return 0; + sendco->ret = 0; + goto out; err: - return ret < 0 ? ret : -EIO; + while (!g_queue_is_empty(&sendco->send_list)) { + SendEntry *entry = g_queue_pop_tail(&sendco->send_list); + g_free(entry); + } + sendco->ret = ret < 0 ? ret : -EIO; +out: + sendco->co = NULL; + sendco->done = true; + aio_wait_kick(); +} + +static int filter_send(MirrorState *s, + const struct iovec *iov, + int iovcnt) +{ + SendCo *sendco = &s->sendco; + SendEntry *entry; + + ssize_t size = iov_size(iov, iovcnt); + if (!size) { + return 0; + } + + entry = g_malloc(sizeof(SendEntry) + size); + entry->size = size; + iov_to_buf(iov, iovcnt, 0, entry->buf, size); + g_queue_push_head(&sendco->send_list, entry); + + if (sendco->done) { + sendco->co = qemu_coroutine_create(_filter_send, s); + sendco->done = false; + qemu_coroutine_enter(sendco->co); + if (sendco->done) { + /* report early errors */ + return sendco->ret; + } + } + + /* assume success */ + return 0; } static void redirector_to_filter(NetFilterState *nf, @@ -194,6 +250,10 @@ static void filter_mirror_cleanup(NetFilterState *nf) { MirrorState *s = FILTER_MIRROR(nf); + AIO_WAIT_WHILE(NULL, !s->sendco.done); + + g_queue_clear(&s->sendco.send_list); + qemu_chr_fe_deinit(&s->chr_out, false); } @@ -201,6 +261,10 @@ static void filter_redirector_cleanup(NetFilterState *nf) { MirrorState *s = FILTER_REDIRECTOR(nf); + AIO_WAIT_WHILE(NULL, !s->sendco.done); + + g_queue_clear(&s->sendco.send_list); + qemu_chr_fe_deinit(&s->chr_in, false); qemu_chr_fe_deinit(&s->chr_out, false); } @@ -224,6 +288,9 @@ static void filter_mirror_setup(NetFilterState *nf, Error **errp) } qemu_chr_fe_init(&s->chr_out, chr, errp); + + s->sendco.done = true; + g_queue_init(&s->sendco.send_list); } static void redirector_rs_finalize(SocketReadState *rs) @@ -281,6 +348,9 @@ static void filter_redirector_setup(NetFilterState *nf, Error **errp) return; } } + + s->sendco.done = true; + g_queue_init(&s->sendco.send_list); } static void filter_mirror_class_init(ObjectClass *oc, void *data) -- 2.20.1
pgpPj7HBwzX_b.pgp
Description: OpenPGP digital signature