Hi Ilya,
Thanks for the feedback.
<snip>
> > +static struct dp_packet_afxdp *
> > +dp_packet_cast_afxdp(const struct dp_packet *d)
> > +{
> > + ovs_assert(d->source == DPBUF_AFXDP);
> > + return CONTAINER_OF(d, struct dp_packet_afxdp, packet);
> > +}
> > +
> > +static inline void
> > +prepare_fill_queue(struct xsk_socket_info *xsk_info)
> > +{
> > + struct umem_elem *elems[BATCH_SIZE];
> > + struct xsk_umem_info *umem;
> > + unsigned int idx_fq;
> > + int nb_free;
> > + int i, ret;
> > +
> > + umem = xsk_info->umem;
> > +
> > + nb_free = PROD_NUM_DESCS / 2;
> > + if (xsk_prod_nb_free(&umem->fq, nb_free) < nb_free) {
> > + return;
> > + }
>
>
> Why you're using 'PROD_NUM_DESCS / 2' here?
I don't want to be too aggressive to refill the fq.
> IIUC, we're keeping fill queue half-loaded. Isn't it better to
> use BATCH_SIZE instead?
>
yes, that also works.
>
> > +
> > + ret = umem_elem_pop_n(&umem->mpool, BATCH_SIZE, (void **)elems);
> > + if (OVS_UNLIKELY(ret)) {
> > + return;
> > + }
> > +
> > + if (!xsk_ring_prod__reserve(&umem->fq, BATCH_SIZE, &idx_fq)) {
> > + umem_elem_push_n(&umem->mpool, BATCH_SIZE, (void **)elems);
> > + COVERAGE_INC(afxdp_fq_full);
> > + return;
> > + }
> > +
> > + for (i = 0; i < BATCH_SIZE; i++) {
> > + uint64_t index;
> > + struct umem_elem *elem;
> > +
> > + elem = elems[i];
> > + index = (uint64_t)((char *)elem - (char *)umem->buffer);
> > + ovs_assert((index & FRAME_SHIFT_MASK) == 0);
> > + *xsk_ring_prod__fill_addr(&umem->fq, idx_fq) = index;
> > +
> > + idx_fq++;
> > + }
> > + xsk_ring_prod__submit(&umem->fq, BATCH_SIZE);
> > +}
> > +
> > +int
> > +netdev_afxdp_rxq_recv(struct netdev_rxq *rxq_, struct dp_packet_batch
> > *batch,
> > + int *qfill)
> > +{
> > + struct netdev_rxq_linux *rx = netdev_rxq_linux_cast(rxq_);
> > + struct netdev *netdev = rx->up.netdev;
> > + struct netdev_linux *dev = netdev_linux_cast(netdev);
> > + struct xsk_socket_info *xsk_info;
> > + struct xsk_umem_info *umem;
> > + uint32_t idx_rx = 0;
> > + int qid = rxq_->queue_id;
> > + unsigned int rcvd, i;
> > +
> > + xsk_info = dev->xsks[qid];
> > + if (!xsk_info || !xsk_info->xsk) {
> > + return 0;
>
> Need to return EAGAIN.
OK
>
> > + }
> > +
> > + prepare_fill_queue(xsk_info);
> > +
> > + umem = xsk_info->umem;
> > + rx->fd = xsk_socket__fd(xsk_info->xsk);
> > +
> > + rcvd = xsk_ring_cons__peek(&xsk_info->rx, BATCH_SIZE, &idx_rx);
> > + if (!rcvd) {
> > + return 0;
>
> Need to return EAGAIN.
OK
>
> > + }
> > +
> > + /* Setup a dp_packet batch from descriptors in RX queue */
> > + for (i = 0; i < rcvd; i++) {
> > + uint64_t addr = xsk_ring_cons__rx_desc(&xsk_info->rx,
> > idx_rx)->addr;
> > + uint32_t len = xsk_ring_cons__rx_desc(&xsk_info->rx, idx_rx)->len;
> > + char *pkt = xsk_umem__get_data(umem->buffer, addr);
> > + uint64_t index;
> > +
> > + struct dp_packet_afxdp *xpacket;
> > + struct dp_packet *packet;
> > +
> > + index = addr >> FRAME_SHIFT;
> > + xpacket = UMEM2XPKT(umem->xpool.array, index);
> > + packet = &xpacket->packet;
> > +
> > + /* Initialize the struct dp_packet */
> > + dp_packet_use_afxdp(packet, pkt, FRAME_SIZE - FRAME_HEADROOM);
> > + dp_packet_set_size(packet, len);
> > +
> > + /* Add packet into batch, increase batch->count */
> > + dp_packet_batch_add(batch, packet);
> > +
> > + idx_rx++;
> > + }
> > + /* Release the RX queue */
> > + xsk_ring_cons__release(&xsk_info->rx, rcvd);
> > +
> > + if (qfill) {
> > + /* TODO: return the number of remaining packets in the queue. */
> > + *qfill = 0;
> > + }
> > +
> > +#ifdef AFXDP_DEBUG
> > + log_xsk_stat(xsk_info);
> > +#endif
> > + return 0;
> > +}
> > +
> > +static inline int
> > +kick_tx(struct xsk_socket_info *xsk_info)
> > +{
> > + int ret;
> > +
> > + if (!xsk_info->outstanding_tx) {
> > + return 0;
> > + }
> > +
> > + /* This causes system call into kernel's xsk_sendmsg, and
> > + * xsk_generic_xmit (skb mode) or xsk_async_xmit (driver mode).
> > + */
> > + ret = sendto(xsk_socket__fd(xsk_info->xsk), NULL, 0, MSG_DONTWAIT,
> > + NULL, 0);
> > + if (OVS_UNLIKELY(ret < 0)) {
> > + if (errno == ENXIO || errno == ENOBUFS || errno == EOPNOTSUPP) {
> > + return errno;
> > + }
> > + }
> > + /* no error, or EBUSY or EAGAIN */
> > + return 0;
> > +}
> > +
> > +void
> > +free_afxdp_buf(struct dp_packet *p)
> > +{
> > + struct dp_packet_afxdp *xpacket;
> > + uintptr_t addr;
> > +
> > + xpacket = dp_packet_cast_afxdp(p);
> > + if (xpacket->mpool) {
> > + void *base = dp_packet_base(p);
> > +
> > + addr = (uintptr_t)base & (~FRAME_SHIFT_MASK);
> > + umem_elem_push(xpacket->mpool, (void *)addr);
> > + }
> > +}
> > +
> > +static void
> > +free_afxdp_buf_batch(struct dp_packet_batch *batch)
> > +{
> > + struct dp_packet_afxdp *xpacket = NULL;
> > + struct dp_packet *packet;
> > + void *elems[BATCH_SIZE];
> > + uintptr_t addr;
> > +
> > + DP_PACKET_BATCH_FOR_EACH (i, packet, batch) {
> > + xpacket = dp_packet_cast_afxdp(packet);
> > + if (xpacket->mpool) {
>
>
> Above checking seems useless. Also, if any packet will be
> skipped, we'll push trash pointer to mpool.
>
Thanks, will skip it.
> If you're worrying about the value, you may just assert:
>
> ovs_assert(xpacket->mpool);
>
> > + void *base = dp_packet_base(packet);
> > +
> > + addr = (uintptr_t)base & (~FRAME_SHIFT_MASK);
> > + elems[i] = (void *)addr;
> > + }
> > + }
> > + umem_elem_push_n(xpacket->mpool, batch->count, elems);
> > + dp_packet_batch_init(batch);
> > +}
> > +
> > +static inline bool
> > +check_free_batch(struct dp_packet_batch *batch)
> > +{
> > + struct umem_pool *first_mpool = NULL;
> > + struct dp_packet_afxdp *xpacket;
> > + struct dp_packet *packet;
> > +
> > + DP_PACKET_BATCH_FOR_EACH (i, packet, batch) {
> > + if (packet->source != DPBUF_AFXDP) {
> > + return false;
> > + }
> > + xpacket = dp_packet_cast_afxdp(packet);
> > + if (i == 0) {
> > + first_mpool = xpacket->mpool;
> > + continue;
> > + }
> > + if (xpacket->mpool != first_mpool) {
> > + return false;
> > + }
> > + }
> > + /* All packets are DPBUF_AFXDP and from the same mpool */
> > + return true;
> > +}
> > +
> > +static inline void
> > +afxdp_complete_tx(struct xsk_socket_info *xsk_info)
> > +{
> > + struct umem_elem *elems_push[BATCH_SIZE];
> > + struct xsk_umem_info *umem;
> > + uint32_t idx_cq = 0;
> > + int tx_to_free = 0;
> > + int tx_done, j;
> > +
> > + umem = xsk_info->umem;
> > + tx_done = xsk_ring_cons__peek(&umem->cq, BATCH_SIZE, &idx_cq);
> > +
> > + /* Recycle back to umem pool */
> > + for (j = 0; j < tx_done; j++) {
> > + struct umem_elem *elem;
> > + uint64_t *addr;
> > +
> > + addr = (uint64_t *)xsk_ring_cons__comp_addr(&umem->cq, idx_cq++);
> > + if (*addr == 0) {
>
> 'addr' is an offset from 'umem->buffer'. Zero seems a valid value.
> Maybe it's better to use UINT64_MAX instead?
Thanks a lot! I shouldn't use zero, will switch to use UINT64_MAX.
>
> > + /* The elem has been pushed already */
> > + continue;
> > + }
> > + elem = ALIGNED_CAST(struct umem_elem *,
> > + (char *)umem->buffer + *addr);
> > + elems_push[tx_to_free] = elem;
> > + *addr = 0; /* Mark as pushed */
> > + tx_to_free++;
> > + }
> > +
> > + umem_elem_push_n(&umem->mpool, tx_to_free, (void **)elems_push);
> > +
> > + if (tx_done > 0) {
> > + xsk_ring_cons__release(&umem->cq, tx_done);
> > + xsk_info->outstanding_tx -= tx_done;
>
> We, probably, should substract the 'tx_to_free' instead and do this
> outside of the 'if'.
>
OK
--William
_______________________________________________
dev mailing list
[email protected]
https://mail.openvswitch.org/mailman/listinfo/ovs-dev