From: Juha-Matti Tilli <juha-matti.ti...@iki.fi> ODP has two functions, odp_pktin_recv_tmo and odp_pktin_recv_mq_tmo that allow sleeping for a certain time. However, the sleep was not a genuine interruptible sleep (select/poll/epoll) but rather a busy loop. This resulted in high CPU use when this call was used. This functionality adds to three pktio mechanisms a way to allow sleeping using select() system call. The way is used if all used pktio mechanisms support it.
This new version of the patch handles ODP_PKTIN_WAIT correctly and uses ODP_TIME_SEC_IN_NS. Also, socket_mmap is also supported now in genuinely sleeping. Furthermore, recv_tmo and recv_mq_tmo implementations for pktio mechanisms are provided and used and the select() based code is separated from the non-select code in odp_pktin_recv_mq_tmo in a function with no odp_ prefix. The code is a bit optimized further from earlier patch versions and better-commented. Signed-off-by: Juha-Matti Tilli <juha-matti.ti...@iki.fi> --- /** Email created from pull request 341 (jmtilli:genuinesleep) ** https://github.com/Linaro/odp/pull/341 ** Patch: https://github.com/Linaro/odp/pull/341.patch ** Base sha: 6b5cdc77eb9759a2349b10372a964648559bc92c ** Merge commit sha: 4caaf12e4e7af121afb1fe7a6562a8891ed6ec29 **/ example/l2fwd_simple/odp_l2fwd_simple.c | 3 +- .../linux-generic/include/odp_packet_io_internal.h | 6 + platform/linux-generic/include/odp_packet_socket.h | 20 ++ platform/linux-generic/odp_packet_io.c | 40 ++++ platform/linux-generic/pktio/netmap.c | 115 +++++++++++- platform/linux-generic/pktio/socket.c | 205 +++++++++++++++++++++ platform/linux-generic/pktio/socket_mmap.c | 89 +++++++++ 7 files changed, 476 insertions(+), 2 deletions(-) diff --git a/example/l2fwd_simple/odp_l2fwd_simple.c b/example/l2fwd_simple/odp_l2fwd_simple.c index e63814555..fd0cec3cf 100644 --- a/example/l2fwd_simple/odp_l2fwd_simple.c +++ b/example/l2fwd_simple/odp_l2fwd_simple.c @@ -88,6 +88,7 @@ static int run_worker(void *arg ODP_UNUSED) odp_packet_t pkt_tbl[MAX_PKT_BURST]; int pkts, sent, tx_drops, i; int total_pkts = 0; + uint64_t wait_time = odp_pktin_wait_time(ODP_TIME_SEC_IN_NS); if (odp_pktio_start(global.if0)) { printf("unable to start input interface\n"); @@ -103,7 +104,7 @@ static int run_worker(void *arg ODP_UNUSED) while (!exit_thr) { pkts = odp_pktin_recv_tmo(global.if0in, pkt_tbl, MAX_PKT_BURST, - ODP_PKTIN_NO_WAIT); + wait_time); if (odp_unlikely(pkts <= 0)) continue; diff --git a/platform/linux-generic/include/odp_packet_io_internal.h b/platform/linux-generic/include/odp_packet_io_internal.h index 83c449e81..cc4b36459 100644 --- a/platform/linux-generic/include/odp_packet_io_internal.h +++ b/platform/linux-generic/include/odp_packet_io_internal.h @@ -205,6 +205,12 @@ typedef struct pktio_if_ops { odp_time_t (*pktin_ts_from_ns)(pktio_entry_t *pktio_entry, uint64_t ns); int (*recv)(pktio_entry_t *entry, int index, odp_packet_t packets[], int num); + int (*recv_tmo)(pktio_entry_t *entry, int index, odp_packet_t packets[], + int num, uint64_t wait_usecs); + int (*recv_mq_tmo)(pktio_entry_t *entry[], int index[], int num_q, + odp_packet_t packets[], int num, unsigned *from, + uint64_t wait_usecs); + int (*fd_set)(pktio_entry_t *entry, int index, fd_set *readfds); int (*send)(pktio_entry_t *entry, int index, const odp_packet_t packets[], int num); uint32_t (*mtu_get)(pktio_entry_t *pktio_entry); diff --git a/platform/linux-generic/include/odp_packet_socket.h b/platform/linux-generic/include/odp_packet_socket.h index 0e61f6f0c..43f982c7b 100644 --- a/platform/linux-generic/include/odp_packet_socket.h +++ b/platform/linux-generic/include/odp_packet_socket.h @@ -169,4 +169,24 @@ void rss_conf_print(const odp_pktin_hash_proto_t *hash_proto); */ int ethtool_stats_get_fd(int fd, const char *name, odp_pktio_stats_t *stats); +/** + * Try interrupt-driven receive + * + * @param queues Pktin queues + * @param num_q Number of queues + * @param packets Output packet slots + * @param num Number of output packet slots + * @param from Queue from which the call received packets + * @param usecs Microseconds to wait + * @param trial_successful Will receive information whether trial was successful + * + * @return >=0 on success, number of packets received + * @return <0 on failure + */ +int sock_recv_mq_tmo_try_int_driven(const struct odp_pktin_queue_t queues[], + unsigned num_q, unsigned *from, + odp_packet_t packets[], int num, + uint64_t usecs, + int *trial_successful); + #endif diff --git a/platform/linux-generic/odp_packet_io.c b/platform/linux-generic/odp_packet_io.c index 5880a2a12..a56229dcd 100644 --- a/platform/linux-generic/odp_packet_io.c +++ b/platform/linux-generic/odp_packet_io.c @@ -1577,10 +1577,33 @@ int odp_pktin_recv_tmo(odp_pktin_queue_t queue, odp_packet_t packets[], int num, odp_time_t t1, t2; struct timespec ts; int started = 0; + pktio_entry_t *entry; ts.tv_sec = 0; ts.tv_nsec = SLEEP_NSEC; + entry = get_pktio_entry(queue.pktio); + if (entry == NULL) { + ODP_DBG("pktio entry %d does not exist\n", queue.pktio); + return -1; + } + + if (entry->s.ops->recv_tmo) { +#if SLEEP_NSEC == 1000 + uint64_t usecs = wait; +#else + uint64_t usecs = (wait * SLEEP_NSEC + 999) / 1000; +#endif + + if (wait == ODP_PKTIN_WAIT) + usecs = ODP_PKTIN_WAIT; + else if (wait > UINT64_MAX / SLEEP_NSEC || usecs == UINT64_MAX) + usecs = UINT64_MAX - 1; + + return entry->s.ops->recv_tmo(entry, queue.index, packets, num, + usecs); + } + while (1) { ret = odp_pktin_recv(queue, packets, num); @@ -1626,6 +1649,23 @@ int odp_pktin_recv_mq_tmo(const odp_pktin_queue_t queues[], unsigned num_q, odp_time_t t1, t2; struct timespec ts; int started = 0; +#if SLEEP_NSEC == 1000 + uint64_t usecs = wait; +#else + uint64_t usecs = (wait * SLEEP_NSEC + 999) / 1000; +#endif + int trial_successful = 0; + + if (wait == ODP_PKTIN_WAIT) + usecs = ODP_PKTIN_WAIT; + else if (wait > UINT64_MAX / SLEEP_NSEC || usecs == UINT64_MAX) + usecs = UINT64_MAX - 1; + + ret = sock_recv_mq_tmo_try_int_driven(queues, num_q, from, + packets, num, usecs, + &trial_successful); + if (trial_successful) + return ret; ts.tv_sec = 0; ts.tv_nsec = SLEEP_NSEC; diff --git a/platform/linux-generic/pktio/netmap.c b/platform/linux-generic/pktio/netmap.c index cbcf7789d..d19c160f2 100644 --- a/platform/linux-generic/pktio/netmap.c +++ b/platform/linux-generic/pktio/netmap.c @@ -732,6 +732,44 @@ static inline int netmap_recv_desc(pktio_entry_t *pktio_entry, return 0; } +static int netmap_fd_set(pktio_entry_t *pktio_entry, int index, fd_set *readfds) +{ + struct nm_desc *desc; + pkt_netmap_t *pkt_nm = &pktio_entry->s.pkt_nm; + unsigned first_desc_id = pkt_nm->rx_desc_ring[index].s.first; + unsigned last_desc_id = pkt_nm->rx_desc_ring[index].s.last; + unsigned desc_id; + int num_desc = pkt_nm->rx_desc_ring[index].s.num; + int i; + int max_fd = 0; + + if (odp_unlikely(pktio_entry->s.state != PKTIO_STATE_STARTED)) + return 0; + + if (!pkt_nm->lockless_rx) + odp_ticketlock_lock(&pkt_nm->rx_desc_ring[index].s.lock); + + desc_id = pkt_nm->rx_desc_ring[index].s.cur; + + for (i = 0; i < num_desc; i++) { + if (desc_id > last_desc_id) + desc_id = first_desc_id; + + desc = pkt_nm->rx_desc_ring[index].s.desc[desc_id]; + + FD_SET(desc->fd, readfds); + if (desc->fd > max_fd) + max_fd = desc->fd; + desc_id++; + } + pkt_nm->rx_desc_ring[index].s.cur = desc_id; + + if (!pkt_nm->lockless_rx) + odp_ticketlock_unlock(&pkt_nm->rx_desc_ring[index].s.lock); + + return max_fd; +} + static int netmap_recv(pktio_entry_t *pktio_entry, int index, odp_packet_t pkt_table[], int num) { @@ -786,6 +824,78 @@ static int netmap_recv(pktio_entry_t *pktio_entry, int index, return num_rx; } +static int netmap_recv_tmo(pktio_entry_t *pktio_entry, int index, + odp_packet_t pkt_table[], int num, uint64_t usecs) +{ + struct timeval timeout; + int ret; + int maxfd; + fd_set readfds; + + ret = netmap_recv(pktio_entry, index, pkt_table, num); + if (ret != 0) + return ret; + + timeout.tv_sec = usecs / 1000 / 1000; + timeout.tv_usec = usecs % (1000 * 1000); + FD_ZERO(&readfds); + maxfd = netmap_fd_set(pktio_entry, index, &readfds); + + if (select(maxfd + 1, &readfds, NULL, NULL, + usecs == ODP_PKTIN_WAIT ? NULL : &timeout) == 0) + return 0; + + return netmap_recv(pktio_entry, index, pkt_table, num); +} + +static int netmap_recv_mq_tmo(pktio_entry_t *pktio_entry[], int index[], + int num_q, odp_packet_t pkt_table[], int num, + unsigned *from, uint64_t usecs) +{ + struct timeval timeout; + int i; + int ret; + int maxfd = -1, maxfd2; + fd_set readfds; + + timeout.tv_sec = usecs / 1000 / 1000; + timeout.tv_usec = usecs % (1000 * 1000); + + for (i = 0; i < num_q; i++) { + ret = netmap_recv(pktio_entry[i], index[i], pkt_table, num); + + if (ret > 0 && from) + *from = i; + + if (ret != 0) + return ret; + } + + FD_ZERO(&readfds); + + for (i = 0; i < num_q; i++) { + maxfd2 = netmap_fd_set(pktio_entry[i], index[i], &readfds); + if (maxfd2 > maxfd) + maxfd = maxfd2; + } + + if (select(maxfd + 1, &readfds, NULL, NULL, + usecs == ODP_PKTIN_WAIT ? NULL : &timeout) == 0) + return 0; + + for (i = 0; i < num_q; i++) { + ret = netmap_recv(pktio_entry[i], index[i], pkt_table, num); + + if (ret > 0 && from) + *from = i; + + if (ret != 0) + return ret; + } + + return 0; +} + static int netmap_send(pktio_entry_t *pktio_entry, int index, const odp_packet_t pkt_table[], int num) { @@ -974,7 +1084,10 @@ const pktio_if_ops_t netmap_pktio_ops = { .input_queues_config = netmap_input_queues_config, .output_queues_config = netmap_output_queues_config, .recv = netmap_recv, - .send = netmap_send + .recv_tmo = netmap_recv_tmo, + .recv_mq_tmo = netmap_recv_mq_tmo, + .send = netmap_send, + .fd_set = netmap_fd_set }; #endif /* ODP_NETMAP */ diff --git a/platform/linux-generic/pktio/socket.c b/platform/linux-generic/pktio/socket.c index 5841e2448..0056359de 100644 --- a/platform/linux-generic/pktio/socket.c +++ b/platform/linux-generic/pktio/socket.c @@ -693,6 +693,88 @@ static int sock_mmsg_recv(pktio_entry_t *pktio_entry, int index ODP_UNUSED, return nb_rx; } +static int sock_fd_set(pktio_entry_t *pktio_entry, int index ODP_UNUSED, + fd_set *readfds) +{ + pkt_sock_t *pkt_sock = &pktio_entry->s.pkt_sock; + const int sockfd = pkt_sock->sockfd; + + FD_SET(sockfd, readfds); + return sockfd; +} + +static int sock_recv_tmo(pktio_entry_t *pktio_entry, int index, + odp_packet_t pkt_table[], int num, uint64_t usecs) +{ + struct timeval timeout; + int ret; + int maxfd; + fd_set readfds; + + ret = sock_mmsg_recv(pktio_entry, index, pkt_table, num); + if (ret != 0) + return ret; + + timeout.tv_sec = usecs / 1000 / 1000; + timeout.tv_usec = usecs % (1000 * 1000); + FD_ZERO(&readfds); + maxfd = sock_fd_set(pktio_entry, index, &readfds); + + if (select(maxfd + 1, &readfds, NULL, NULL, + usecs == ODP_PKTIN_WAIT ? NULL : &timeout) == 0) + return 0; + + return sock_mmsg_recv(pktio_entry, index, pkt_table, num); +} + +static int sock_recv_mq_tmo(pktio_entry_t *pktio_entry[], int index[], + int num_q, odp_packet_t pkt_table[], int num, + unsigned *from, uint64_t usecs) +{ + struct timeval timeout; + int i; + int ret; + int maxfd = -1, maxfd2; + fd_set readfds; + + for (i = 0; i < num_q; i++) { + ret = sock_mmsg_recv(pktio_entry[i], index[i], pkt_table, num); + + if (ret > 0 && from) + *from = i; + + if (ret != 0) + return ret; + } + + timeout.tv_sec = usecs / 1000 / 1000; + timeout.tv_usec = usecs % (1000 * 1000); + + FD_ZERO(&readfds); + + for (i = 0; i < num_q; i++) { + maxfd2 = sock_fd_set(pktio_entry[i], index[i], &readfds); + if (maxfd2 > maxfd) + maxfd = maxfd2; + } + + if (select(maxfd + 1, &readfds, NULL, NULL, + usecs == ODP_PKTIN_WAIT ? NULL : &timeout) == 0) + return 0; + + for (i = 0; i < num_q; i++) { + ret = sock_mmsg_recv(pktio_entry[i], index[i], pkt_table, num); + + if (ret > 0 && from) + *from = i; + + if (ret != 0) + return ret; + } + + return 0; +} + static uint32_t _tx_pkt_to_iovec(odp_packet_t pkt, struct iovec iovecs[MAX_SEGS]) { @@ -855,6 +937,126 @@ static int sock_init_global(void) return 0; } +static int sock_recv_mq_tmo_select(pktio_entry_t * const *entry, + const int index[], + unsigned num_q, unsigned *from, + odp_packet_t packets[], int num, + uint64_t usecs, fd_set *readfds, + int maxfd) +{ + struct timeval timeout; + unsigned i; + int ret; + + timeout.tv_sec = usecs / 1000 / 1000; + timeout.tv_usec = usecs % (1000 * 1000); + + for (i = 0; i < num_q; i++) { + ret = entry[i]->s.ops->recv(entry[i], index[i], packets, num); + + if (ret > 0 && from) + *from = i; + + if (ret != 0) + return ret; + } + + if (select(maxfd + 1, readfds, NULL, NULL, + usecs == ODP_PKTIN_WAIT ? NULL : &timeout) == 0) + return 0; + + for (i = 0; i < num_q; i++) { + ret = entry[i]->s.ops->recv(entry[i], index[i], packets, num); + + if (ret > 0 && from) + *from = i; + + if (ret != 0) + return ret; + } + + return 0; +} + +int sock_recv_mq_tmo_try_int_driven(const struct odp_pktin_queue_t queues[], + unsigned num_q, unsigned *from, + odp_packet_t packets[], int num, + uint64_t usecs, int *trial_successful) +{ + unsigned i; + pktio_entry_t *entry[num_q]; + int index[num_q]; + fd_set readfds; + int maxfd = -1; + int (*impl)(pktio_entry_t *entry[], int index[], int num_q, + odp_packet_t packets[], int num, unsigned *from, + uint64_t wait_usecs) = NULL; + int impl_set = 0; + + /* First, we get pktio entries and queue indices. We then see if the + implementation function pointers are the same. If they are the + same, impl will be set to non-NULL; otherwise it will be NULL. */ + + for (i = 0; i < num_q; i++) { + entry[i] = get_pktio_entry(queues[i].pktio); + index[i] = queues[i].index; + if (entry[i] == NULL) { + ODP_DBG("pktio entry %d does not exist\n", + queues[i].pktio); + *trial_successful = 0; + return -1; + } + if (!impl_set) { + impl = entry[i]->s.ops->recv_mq_tmo; + impl_set = 1; + } else { + if (impl != entry[i]->s.ops->recv_mq_tmo) { + impl = NULL; + break; + } + } + } + + /* Check whether we can call the compatible implementation */ + if (impl != NULL) { + *trial_successful = 1; + return impl(entry, index, num_q, packets, num, from, usecs); + } + + /* Get file descriptor sets of devices. maxfd will be -1 if this + fails. */ + FD_ZERO(&readfds); + for (i = 0; i < num_q; i++) { + if (entry[i]->s.ops->fd_set) { + int maxfd2; + + maxfd2 = entry[i]->s.ops->fd_set( + entry[i], queues[i].index, &readfds); + if (maxfd2 < 0) { + maxfd = -1; + break; + } + if (maxfd2 > maxfd) + maxfd = maxfd2; + } else { + maxfd = -1; + } + } + + /* Check whether we can call the select() implementation */ + if (maxfd >= 0) { + *trial_successful = 1; + return sock_recv_mq_tmo_select(entry, index, num_q, from, + packets, num, usecs, + &readfds, maxfd); + } + + /* No mechanism worked. Set trial_successful to 0 so that polling will + be used by the main implementation. */ + *trial_successful = 0; + return 0; +} + const pktio_if_ops_t sock_mmsg_pktio_ops = { .name = "socket", .print = NULL, @@ -868,6 +1070,9 @@ const pktio_if_ops_t sock_mmsg_pktio_ops = { .stats = sock_stats, .stats_reset = sock_stats_reset, .recv = sock_mmsg_recv, + .recv_tmo = sock_recv_tmo, + .recv_mq_tmo = sock_recv_mq_tmo, + .fd_set = sock_fd_set, .send = sock_mmsg_send, .mtu_get = sock_mtu_get, .promisc_mode_set = sock_promisc_mode_set, diff --git a/platform/linux-generic/pktio/socket_mmap.c b/platform/linux-generic/pktio/socket_mmap.c index 42f4a2628..5a7e1a4dc 100644 --- a/platform/linux-generic/pktio/socket_mmap.c +++ b/platform/linux-generic/pktio/socket_mmap.c @@ -600,6 +600,20 @@ static int sock_mmap_open(odp_pktio_t id ODP_UNUSED, return -1; } +static int sock_mmap_fd_set(pktio_entry_t *pktio_entry, int index ODP_UNUSED, + fd_set *readfds) +{ + pkt_sock_mmap_t *const pkt_sock = &pktio_entry->s.pkt_sock_mmap; + int fd; + + odp_ticketlock_lock(&pktio_entry->s.rxl); + fd = pkt_sock->sockfd; + FD_SET(fd, readfds); + odp_ticketlock_unlock(&pktio_entry->s.rxl); + + return fd; +} + static int sock_mmap_recv(pktio_entry_t *pktio_entry, int index ODP_UNUSED, odp_packet_t pkt_table[], int len) { @@ -614,6 +628,78 @@ static int sock_mmap_recv(pktio_entry_t *pktio_entry, int index ODP_UNUSED, return ret; } +static int sock_mmap_recv_tmo(pktio_entry_t *pktio_entry, int index, + odp_packet_t pkt_table[], int num, uint64_t usecs) +{ + struct timeval timeout; + int ret; + int maxfd; + fd_set readfds; + + ret = sock_mmap_recv(pktio_entry, index, pkt_table, num); + if (ret != 0) + return ret; + + timeout.tv_sec = usecs / 1000 / 1000; + timeout.tv_usec = usecs % (1000 * 1000); + FD_ZERO(&readfds); + maxfd = sock_mmap_fd_set(pktio_entry, index, &readfds); + + if (select(maxfd + 1, &readfds, NULL, NULL, + usecs == ODP_PKTIN_WAIT ? NULL : &timeout) == 0) + return 0; + + return sock_mmap_recv(pktio_entry, index, pkt_table, num); +} + +static int sock_mmap_recv_mq_tmo(pktio_entry_t *pktio_entry[], int index[], + int num_q, odp_packet_t pkt_table[], int num, + unsigned *from, uint64_t usecs) +{ + struct timeval timeout; + int i; + int ret; + int maxfd = -1, maxfd2; + fd_set readfds; + + for (i = 0; i < num_q; i++) { + ret = sock_mmap_recv(pktio_entry[i], index[i], pkt_table, num); + + if (ret > 0 && from) + *from = i; + + if (ret != 0) + return ret; + } + + timeout.tv_sec = usecs / 1000 / 1000; + timeout.tv_usec = usecs % (1000 * 1000); + + FD_ZERO(&readfds); + + for (i = 0; i < num_q; i++) { + maxfd2 = sock_mmap_fd_set(pktio_entry[i], index[i], &readfds); + if (maxfd2 > maxfd) + maxfd = maxfd2; + } + + if (select(maxfd + 1, &readfds, NULL, NULL, + usecs == ODP_PKTIN_WAIT ? NULL : &timeout) == 0) + return 0; + + for (i = 0; i < num_q; i++) { + ret = sock_mmap_recv(pktio_entry[i], index[i], pkt_table, num); + + if (ret > 0 && from) + *from = i; + + if (ret != 0) + return ret; + } + + return 0; +} + static int sock_mmap_send(pktio_entry_t *pktio_entry, int index ODP_UNUSED, const odp_packet_t pkt_table[], int len) { @@ -725,7 +811,10 @@ const pktio_if_ops_t sock_mmap_pktio_ops = { .stats = sock_mmap_stats, .stats_reset = sock_mmap_stats_reset, .recv = sock_mmap_recv, + .recv_tmo = sock_mmap_recv_tmo, + .recv_mq_tmo = sock_mmap_recv_mq_tmo, .send = sock_mmap_send, + .fd_set = sock_mmap_fd_set, .mtu_get = sock_mmap_mtu_get, .promisc_mode_set = sock_mmap_promisc_mode_set, .promisc_mode_get = sock_mmap_promisc_mode_get,