On Mon, Mar 31, 2014 at 03:39:19PM +0100, anton.iva...@kot-begemot.co.uk wrote: > +static void net_l2tpv3_process_queue(NetL2TPV3State *s) > +{ > + int size = 0; > + struct iovec *vec; > + bool bad_read; > + int data_size; > + struct mmsghdr *msgvec; > + > + /* go into ring mode only if there is a "pending" tail */ > + if (s->queue_depth > 0) { > + do { > + msgvec = s->msgvec + s->queue_tail; > + if (msgvec->msg_len > 0) { > + data_size = msgvec->msg_len - s->header_size; > + vec = msgvec->msg_hdr.msg_iov; > + if ((data_size > 0) && > + (l2tpv3_verify_header(s, vec->iov_base) == 0)) { > + vec++; > + /* Use the legacy delivery for now, we will > + * switch to using our own ring as a queueing mechanism > + * at a later date > + */ > + size = qemu_send_packet_async( > + &s->nc, > + vec->iov_base, > + data_size, > + l2tpv3_send_completed > + ); > + bad_read = false; > + } else { > + bad_read = true; > + if (!s->header_mismatch) { > + /* report error only once */ > + error_report("l2tpv3 header verification failed"); > + s->header_mismatch = true; > + } > + } > + } else { > + bad_read = true; > + } > + if ((bad_read) || (size > 0)) { > + s->queue_tail = (s->queue_tail + 1) % MAX_L2TPV3_MSGCNT; > + s->queue_depth--; > + } > + } while ( > + (s->queue_depth > 0) && > + qemu_can_send_packet(&s->nc) && > + ((size > 0) || bad_read) > + );
This doesn't handle the qemu_send_packet_async() return 0 case correctly: When qemu_send_packet_async() returns 0 this function simply returns but doesn't turn off read poll. The packet is now queued in the net layer waiting for the peer to re-enable receive. When the socket becomes readable again, we will resend the same packet again. It will be queued again by the net layer. This can happen many times so the net layer's queue may fill up. Since read poll wasn't disabled we also burn CPU calling net_l2tpv3_send() from the event loop while the peer refuses to receive. In order to fix this: 1. Change the do ... while loop into a while loop. This guarantees we will only attempt to send a packet if the peer can receive. This is a safety measure. 2. Increment queue_tail when size == 0 because the net layer has queued the packet - we are done with it! 3. Disable read poll when size == 0 so that net_l2tpv3_send() will not be called until the peer re-enables receive again. We'll know because the queue will be flushed and l2tpv3_send_completed() gets called. I hope that once these changes are made the performance will also be much closer to what you had when l2tpv3 implemented its own buffering. > +int net_init_l2tpv3(const NetClientOptions *opts, > + const char *name, > + NetClientState *peer) > +{ > + > + > + const NetdevL2TPv3Options *l2tpv3; > + NetL2TPV3State *s; > + NetClientState *nc; > + int fd = -1, gairet; > + struct addrinfo hints; > + struct addrinfo *result = NULL; > + char *srcport, *dstport; > + > + nc = qemu_new_net_client(&net_l2tpv3_info, peer, "l2tpv3", name); > + > + s = DO_UPCAST(NetL2TPV3State, nc, nc); > + The file descriptor should be initialized to -1 so we don't accidentally close stdin (0) when a goto outerr path is taken in this function. s->fd = -1; > + s->queue_head = 0; > + s->queue_tail = 0; > + s->header_mismatch = false; > + > + assert(opts->kind == NET_CLIENT_OPTIONS_KIND_L2TPV3); > + l2tpv3 = opts->l2tpv3; > + > + if (l2tpv3->has_ipv6 && l2tpv3->ipv6) { > + s->ipv6 = l2tpv3->ipv6; > + } else { > + s->ipv6 = false; > + } > + > + if (l2tpv3->has_rxcookie || l2tpv3->has_txcookie) { > + if (l2tpv3->has_rxcookie && l2tpv3->has_txcookie) { > + s->cookie = true; > + } else { > + goto outerr; > + } > + } else { > + s->cookie = false; > + } > + > + if (l2tpv3->has_cookie64 || l2tpv3->cookie64) { > + s->cookie_is_64 = true; > + } else { > + s->cookie_is_64 = false; > + } > + > + if (l2tpv3->has_udp && l2tpv3->udp) { > + s->udp = true; > + if (!(l2tpv3->has_srcport && l2tpv3->has_dstport)) { > + error_report("l2tpv3_open : need both src and dst port for udp"); > + goto outerr; > + } else { > + srcport = l2tpv3->srcport; > + dstport = l2tpv3->dstport; > + } > + } else { > + s->udp = false; > + srcport = NULL; > + dstport = NULL; > + } > + > + > + s->offset = 4; > + s->session_offset = 0; > + s->cookie_offset = 4; > + s->counter_offset = 4; > + > + s->tx_session = l2tpv3->txsession; > + if (l2tpv3->has_rxsession) { > + s->rx_session = l2tpv3->rxsession; > + } else { > + s->rx_session = s->tx_session; > + } > + > + if (s->cookie) { > + s->rx_cookie = l2tpv3->rxcookie; > + s->tx_cookie = l2tpv3->txcookie; > + if (s->cookie_is_64 == true) { > + /* 64 bit cookie */ > + s->offset += 8; > + s->counter_offset += 8; > + } else { > + /* 32 bit cookie */ > + s->offset += 4; > + s->counter_offset += 4; > + } > + } > + > + memset(&hints, 0, sizeof(hints)); > + > + if (s->ipv6) { > + hints.ai_family = AF_INET6; > + } else { > + hints.ai_family = AF_INET; > + } > + if (s->udp) { > + hints.ai_socktype = SOCK_DGRAM; > + hints.ai_protocol = 0; > + s->offset += 4; > + s->counter_offset += 4; > + s->session_offset += 4; > + s->cookie_offset += 4; > + } else { > + hints.ai_socktype = SOCK_RAW; > + hints.ai_protocol = IPPROTO_L2TP; > + } > + > + gairet = getaddrinfo(l2tpv3->src, srcport, &hints, &result); > + > + if ((gairet != 0) || (result == NULL)) { > + error_report( > + "l2tpv3_open : could not resolve src, errno = %s", > + gai_strerror(gairet) > + ); > + goto outerr; > + } > + fd = socket(result->ai_family, result->ai_socktype, result->ai_protocol); > + if (fd == -1) { > + fd = -errno; > + error_report("l2tpv3_open : socket creation failed, errno = %d", > -fd); > + freeaddrinfo(result); > + goto outerr; > + } > + if (bind(fd, (struct sockaddr *) result->ai_addr, result->ai_addrlen)) { > + error_report("l2tpv3_open : could not bind socket err=%i", errno); > + goto outerr; result is leaked > + } > + > + freeaddrinfo(result); > + > + memset(&hints, 0, sizeof(hints)); > + > + if (s->ipv6) { > + hints.ai_family = AF_INET6; > + } else { > + hints.ai_family = AF_INET; > + } > + if (s->udp) { > + hints.ai_socktype = SOCK_DGRAM; > + hints.ai_protocol = 0; > + } else { > + hints.ai_socktype = SOCK_RAW; > + hints.ai_protocol = IPPROTO_L2TP; > + } > + > + gairet = getaddrinfo(l2tpv3->dst, dstport, &hints, &result); > + if ((gairet != 0) || (result == NULL)) { > + error_report( > + "l2tpv3_open : could not resolve dst, error = %s", > + gai_strerror(gairet) > + ); > + goto outerr; > + } > + > + s->dgram_dst = g_malloc(sizeof(struct sockaddr_storage)); > + memset(s->dgram_dst, '\0' , sizeof(struct sockaddr_storage)); > + memcpy(s->dgram_dst, result->ai_addr, result->ai_addrlen); > + s->dst_size = result->ai_addrlen; > + > + freeaddrinfo(result); > + > + if (l2tpv3->has_counter && l2tpv3->counter) { > + s->has_counter = true; > + s->offset += 4; > + } else { > + s->has_counter = false; > + } > + > + if (l2tpv3->has_pincounter && l2tpv3->pincounter) { > + s->has_counter = true; /* pin counter implies that there is counter > */ > + s->pin_counter = true; > + } else { > + s->pin_counter = false; > + } > + > + if (l2tpv3->has_offset) { > + /* extra offset */ > + s->offset += l2tpv3->offset; > + } This input needs to be validated to prevent huge memory allocations. Something like 2 KB should be plenty, even with jumbo frames 8 KB is more than high enough. if (l2tpv3->offset > 2 * 1024) { error_report("l2tpv3_open : offset must be less than 2 KB"); goto outerr; } > + > + if ((s->ipv6) || (s->udp)) { > + s->header_size = s->offset; > + } else { > + s->header_size = s->offset + sizeof(struct iphdr); > + } > + > + s->msgvec = build_l2tpv3_vector(s, MAX_L2TPV3_MSGCNT); > + s->vec = g_malloc(sizeof(struct iovec) * MAX_L2TPV3_IOVCNT); This is never freed. > + s->header_buf = g_malloc(s->header_size); > + > + qemu_set_nonblock(fd); > + > + s->fd = fd; We must avoid double-closing the file descriptor since net_l2tpv3_cleanup() will free s->fd: fd = -1; > + s->counter = 0; > + > + l2tpv3_read_poll(s, true); > + > + if (!s) { > + error_report("l2tpv3_open : failed to set fd handler"); > + goto outerr; > + } > + snprintf(s->nc.info_str, sizeof(s->nc.info_str), > + "l2tpv3: connected"); > + return 0; > +outerr: > + qemu_del_net_client(nc); > + if (fd > 0) { > + close(fd); > + } > + return -1; > +}