Use atomic to handle position in ring buffer.
Rx part is quite straigth forward.

For the Tx part, it means that packet are not filled in order
within the ring. But each send() only triggers one call to sendto().
Thus not all written packet may be sent by this call. However packet
pending in the ring should be flushed by the thread which reserved
the slot in the ring buffer in the first place.

Signed-off-by: Nicolas Morey-Chaisemartin <nmo...@kalray.eu>
---
 platform/linux-generic/include/odp_packet_socket.h |   2 +-
 platform/linux-generic/pktio/socket_mmap.c         | 126 ++++++++++++---------
 2 files changed, 72 insertions(+), 56 deletions(-)

diff --git a/platform/linux-generic/include/odp_packet_socket.h 
b/platform/linux-generic/include/odp_packet_socket.h
index a5e0eb3..25a5e44 100644
--- a/platform/linux-generic/include/odp_packet_socket.h
+++ b/platform/linux-generic/include/odp_packet_socket.h
@@ -49,7 +49,7 @@ typedef struct {
 /** packet mmap ring */
 struct ring {
        struct iovec *rd;
-       unsigned frame_num;
+       odp_atomic_u32_t frame_num;
        int rd_num;
 
        int sock;
diff --git a/platform/linux-generic/pktio/socket_mmap.c 
b/platform/linux-generic/pktio/socket_mmap.c
index ba773a3..7d077cf 100644
--- a/platform/linux-generic/pktio/socket_mmap.c
+++ b/platform/linux-generic/pktio/socket_mmap.c
@@ -108,13 +108,28 @@ static inline void mmap_tx_user_ready(struct tpacket2_hdr 
*hdr)
        __sync_synchronize();
 }
 
+static inline int pkt_mmap_get_rx(struct ring *ring)
+{
+       unsigned frame_num, next_frame_num;
+
+       do {
+               frame_num = odp_atomic_load_u32(&ring->frame_num);
+               if (!mmap_rx_kernel_ready(ring->rd[frame_num].iov_base))
+                               return -1;
+               next_frame_num = (frame_num + 1) % ring->rd_num;
+       } while (!_odp_atomic_u32_cmp_xchg_strong_mm(&ring->frame_num,
+                                                    &frame_num, next_frame_num,
+                                                    _ODP_MEMMODEL_ACQ,
+                                                    _ODP_MEMMODEL_RLX));
+       return frame_num;
+}
 static inline unsigned pkt_mmap_v2_rx(int sock, struct ring *ring,
                                      odp_packet_t pkt_table[], unsigned len,
                                      odp_pool_t pool,
                                      unsigned char if_mac[])
 {
        union frame_map ppd;
-       unsigned frame_num, next_frame_num;
+       int frame_num;
        uint8_t *pkt_buf;
        int pkt_len;
        struct ethhdr *eth_hdr;
@@ -122,91 +137,92 @@ static inline unsigned pkt_mmap_v2_rx(int sock, struct 
ring *ring,
 
        (void)sock;
 
-       frame_num = ring->frame_num;
 
        while (i < len) {
-               if (mmap_rx_kernel_ready(ring->rd[frame_num].iov_base)) {
-                       ppd.raw = ring->rd[frame_num].iov_base;
-
-                       next_frame_num = (frame_num + 1) % ring->rd_num;
-
-                       pkt_buf = (uint8_t *)ppd.raw + ppd.v2->tp_h.tp_mac;
-                       pkt_len = ppd.v2->tp_h.tp_snaplen;
-
-                       /* Don't receive packets sent by ourselves */
-                       eth_hdr = (struct ethhdr *)pkt_buf;
-                       if (odp_unlikely(ethaddrs_equal(if_mac,
-                                                       eth_hdr->h_source))) {
-                               mmap_rx_user_ready(ppd.raw); /* drop */
-                               frame_num = next_frame_num;
-                               continue;
-                       }
+               frame_num = pkt_mmap_get_rx(ring);
+               if (frame_num < 0)
+                       break;
 
-                       pkt_table[i] = odp_packet_alloc(pool, pkt_len);
-                       if (odp_unlikely(pkt_table[i] == ODP_PACKET_INVALID))
-                               break;
+               ppd.raw = ring->rd[frame_num].iov_base;
 
-                       if (odp_packet_copydata_in(pkt_table[i], 0,
-                                                  pkt_len, pkt_buf) != 0) {
-                               odp_packet_free(pkt_table[i]);
-                               break;
-                       }
+               pkt_buf = (uint8_t *)ppd.raw + ppd.v2->tp_h.tp_mac;
+               pkt_len = ppd.v2->tp_h.tp_snaplen;
 
-                       mmap_rx_user_ready(ppd.raw);
+               /* Don't receive packets sent by ourselves */
+               eth_hdr = (struct ethhdr *)pkt_buf;
+               if (odp_unlikely(ethaddrs_equal(if_mac,
+                                               eth_hdr->h_source))) {
+                       mmap_rx_user_ready(ppd.raw); /* drop */
+                       continue;
+               }
 
-                       /* Parse and set packet header data */
-                       _odp_packet_reset_parse(pkt_table[i]);
+               pkt_table[i] = odp_packet_alloc(pool, pkt_len);
+               if (odp_unlikely(pkt_table[i] == ODP_PACKET_INVALID))
+                       break;
 
-                       frame_num = next_frame_num;
-                       i++;
-               } else {
+               if (odp_packet_copydata_in(pkt_table[i], 0,
+                                          pkt_len, pkt_buf) != 0) {
+                       odp_packet_free(pkt_table[i]);
                        break;
                }
-       }
 
-       ring->frame_num = frame_num;
+               mmap_rx_user_ready(ppd.raw);
+
+               /* Parse and set packet header data */
+               _odp_packet_reset_parse(pkt_table[i]);
+               i++;
+       }
 
        return i;
 }
 
+static inline int pkt_mmap_v2_get_rx(struct ring *ring)
+{
+       unsigned frame_num, next_frame_num;
+
+       do {
+               frame_num = odp_atomic_load_u32(&ring->frame_num);
+               if (!mmap_tx_kernel_ready(ring->rd[frame_num].iov_base))
+                       return -1;
+               next_frame_num = (frame_num + 1) % ring->rd_num;
+       } while (!_odp_atomic_u32_cmp_xchg_strong_mm(&ring->frame_num,
+                                                    &frame_num, next_frame_num,
+                                                    _ODP_MEMMODEL_ACQ,
+                                                    _ODP_MEMMODEL_RLX));
+       return frame_num;
+}
 static inline unsigned pkt_mmap_v2_tx(int sock, struct ring *ring,
                                      odp_packet_t pkt_table[], unsigned len)
 {
        union frame_map ppd;
        uint32_t pkt_len;
-       unsigned frame_num, next_frame_num;
+       int frame_num;
        int ret;
        unsigned i = 0;
 
-       frame_num = ring->frame_num;
 
        while (i < len) {
-               if (mmap_tx_kernel_ready(ring->rd[frame_num].iov_base)) {
-                       ppd.raw = ring->rd[frame_num].iov_base;
+               frame_num = pkt_mmap_v2_get_rx(ring);
+               if (frame_num < 0)
+                       break;
 
-                       next_frame_num = (frame_num + 1) % ring->rd_num;
+               ppd.raw = ring->rd[frame_num].iov_base;
 
-                       pkt_len = odp_packet_len(pkt_table[i]);
-                       ppd.v2->tp_h.tp_snaplen = pkt_len;
-                       ppd.v2->tp_h.tp_len = pkt_len;
+               pkt_len = odp_packet_len(pkt_table[i]);
+               ppd.v2->tp_h.tp_snaplen = pkt_len;
+               ppd.v2->tp_h.tp_len = pkt_len;
 
-                       odp_packet_copydata_out(pkt_table[i], 0, pkt_len,
-                                               (uint8_t *)ppd.raw +
-                                               TPACKET2_HDRLEN -
-                                               sizeof(struct sockaddr_ll));
+               odp_packet_copydata_out(pkt_table[i], 0, pkt_len,
+                                       (uint8_t *)ppd.raw +
+                                       TPACKET2_HDRLEN -
+                                       sizeof(struct sockaddr_ll));
 
-                       mmap_tx_user_ready(ppd.raw);
+               mmap_tx_user_ready(ppd.raw);
 
-                       odp_packet_free(pkt_table[i]);
-                       frame_num = next_frame_num;
-                       i++;
-               } else {
-                       break;
-               }
+               odp_packet_free(pkt_table[i]);
+               i++;
        }
 
-       ring->frame_num = frame_num;
-
        ret = sendto(sock, NULL, 0, MSG_DONTWAIT, NULL, 0);
        if (ret == -1) {
                if (errno != EAGAIN) {
-- 
2.6.1.3.g8d02103


_______________________________________________
lng-odp mailing list
lng-odp@lists.linaro.org
https://lists.linaro.org/mailman/listinfo/lng-odp

Reply via email to