Add a new driver to DPDK which supports taking in memory e.g. hugepage
memory via a unix socket connection and maps it into the DPDK process
replacing the current socket memory as the default memory for use by
future requests.

Signed-off-by: Bruce Richardson <bruce.richard...@intel.com>
---
 drivers/bus/meson.build                 |   1 +
 drivers/bus/shared_mem/meson.build      |  11 +
 drivers/bus/shared_mem/shared_mem_bus.c | 323 ++++++++++++++++++++++++
 drivers/bus/shared_mem/shared_mem_bus.h |  75 ++++++
 drivers/bus/shared_mem/version.map      |  11 +
 5 files changed, 421 insertions(+)
 create mode 100644 drivers/bus/shared_mem/meson.build
 create mode 100644 drivers/bus/shared_mem/shared_mem_bus.c
 create mode 100644 drivers/bus/shared_mem/shared_mem_bus.h
 create mode 100644 drivers/bus/shared_mem/version.map

diff --git a/drivers/bus/meson.build b/drivers/bus/meson.build
index a78b4283bf..0e64959d1a 100644
--- a/drivers/bus/meson.build
+++ b/drivers/bus/meson.build
@@ -9,6 +9,7 @@ drivers = [
         'ifpga',
         'pci',
         'platform',
+        'shared_mem',
         'vdev',
         'vmbus',
 ]
diff --git a/drivers/bus/shared_mem/meson.build 
b/drivers/bus/shared_mem/meson.build
new file mode 100644
index 0000000000..1fa21f3a09
--- /dev/null
+++ b/drivers/bus/shared_mem/meson.build
@@ -0,0 +1,11 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2023 Intel Corporation
+
+if is_windows
+    build = false
+    reason = 'not supported on Windows'
+endif
+
+sources = files('shared_mem_bus.c')
+require_iova_in_mbuf = false
+deps += ['mbuf', 'net']
diff --git a/drivers/bus/shared_mem/shared_mem_bus.c 
b/drivers/bus/shared_mem/shared_mem_bus.c
new file mode 100644
index 0000000000..e0369ed416
--- /dev/null
+++ b/drivers/bus/shared_mem/shared_mem_bus.c
@@ -0,0 +1,323 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2023 Intel Corporation
+ */
+#include <errno.h>
+#include <malloc.h>
+#include <inttypes.h>
+#include <sys/un.h>
+#include <sys/stat.h>
+#include <sys/mman.h>
+#include <sys/socket.h>
+
+#include <rte_log.h>
+#include <rte_lcore.h>
+#include <rte_errno.h>
+#include <rte_malloc.h>
+#include <rte_devargs.h>
+#include <rte_mbuf_pool_ops.h>
+
+#include <bus_driver.h>
+#include <dev_driver.h>
+#include "shared_mem_bus.h"
+
+RTE_LOG_REGISTER_DEFAULT(shared_mem_bus_logtype, DEBUG);
+#define BUS_LOG(level, fmt, args...) rte_log(RTE_LOG_ ## level, \
+               shared_mem_bus_logtype, "## SHARED MEM BUS: %s(): " fmt "\n", 
__func__, ##args)
+#define BUS_ERR(fmt, args...)  BUS_LOG(ERR, fmt, ## args)
+#define BUS_INFO(fmt, args...)  BUS_LOG(INFO, fmt, ## args)
+#define BUS_DEBUG(fmt, args...)  BUS_LOG(DEBUG, fmt, ## args)
+
+static int dev_scan(void);
+static int dev_probe(void);
+static struct rte_device *find_device(const struct rte_device *start, 
rte_dev_cmp_t cmp,
+                const void *data);
+static enum rte_iova_mode get_iommu_class(void);
+static int addr_parse(const char *, void *);
+
+struct socket_device {
+       struct rte_device rte_device;
+       TAILQ_ENTRY(socket_device) next;
+       int fd;
+       uintptr_t membase;
+       uintptr_t memlen;
+};
+
+/** List of devices */
+TAILQ_HEAD(socket_list, socket_device);
+TAILQ_HEAD(device_list, rte_device);
+
+struct shared_mem_bus {
+       struct rte_bus bus;
+       struct socket_list socket_list;
+       struct shared_mem_drv *ethdrv;
+       struct device_list device_list;
+};
+
+static struct shared_mem_bus shared_mem_bus = {
+       .bus = {
+               .scan = dev_scan,
+               .probe = dev_probe,
+               .find_device = find_device,
+               .get_iommu_class = get_iommu_class,
+               .parse = addr_parse,
+       },
+
+       .socket_list = TAILQ_HEAD_INITIALIZER(shared_mem_bus.socket_list),
+       .device_list = TAILQ_HEAD_INITIALIZER(shared_mem_bus.device_list),
+};
+
+RTE_REGISTER_BUS(shared_mem, shared_mem_bus.bus);
+
+int
+rte_shm_bus_send_message(void *msg, size_t msglen)
+{
+       return send(shared_mem_bus.socket_list.tqh_first->fd, msg, msglen, 0);
+}
+
+int
+rte_shm_bus_recv_message(void *msg, size_t msglen)
+{
+       return recv(shared_mem_bus.socket_list.tqh_first->fd, msg, msglen, 0);
+}
+
+uintptr_t
+rte_shm_bus_get_mem_offset(void *ptr)
+{
+       struct socket_device *dev;
+       uintptr_t pval = (uintptr_t)ptr;
+
+       TAILQ_FOREACH(dev, &shared_mem_bus.socket_list, next) {
+               if (dev->membase < pval && dev->membase + dev->memlen > pval)
+                       return pval - dev->membase;
+       }
+       return (uintptr_t)-1;
+}
+
+void *
+rte_shm_bus_get_mem_ptr(uintptr_t offset)
+{
+       struct socket_device *dev;
+
+       TAILQ_FOREACH(dev, &shared_mem_bus.socket_list, next) {
+               if (offset < dev->memlen)
+                       return RTE_PTR_ADD(dev->membase, offset);
+       }
+       return (void *)-1;
+}
+
+static int
+dev_scan(void)
+{
+       if (shared_mem_bus.bus.conf.scan_mode != RTE_BUS_SCAN_ALLOWLIST)
+               return 0;
+
+       struct rte_devargs *devargs;
+       RTE_EAL_DEVARGS_FOREACH(shared_mem_bus.bus.name, devargs) {
+
+               int fd = socket(AF_UNIX, SOCK_SEQPACKET, 0);
+               if (fd < 0) {
+                       BUS_ERR("Error creating socket");
+                       return -errno;
+               }
+
+               struct sockaddr_un sun = {.sun_family = AF_UNIX};
+               if (strlen(devargs->name) - 5 >= sizeof(sun.sun_path) ||
+                               addr_parse(devargs->name, sun.sun_path) != 0) {
+                       BUS_ERR("Error parsing device address");
+                       return -EINVAL;
+               }
+
+               if (connect(fd, (void *)&sun, sizeof(sun)) != 0) {
+                       BUS_ERR("Error connecting to socket");
+                       return -errno;
+               }
+
+               struct socket_device *sdev = malloc(sizeof(*sdev));
+               if (sdev == NULL) {
+                       BUS_ERR("Error with malloc");
+                       return -ENOMEM;
+               }
+               BUS_INFO("Allocating dev for %s", devargs->name);
+               sdev->rte_device.name = devargs->name;
+               sdev->rte_device.numa_node = rte_socket_id();
+               sdev->rte_device.bus = &shared_mem_bus.bus;
+               sdev->fd = fd;
+               TAILQ_INSERT_TAIL(&shared_mem_bus.socket_list, sdev, next);
+       }
+
+       return 0;
+}
+
+static int
+recv_fd(int from, uint64_t *memsize, rte_iova_t *iova, uint64_t *pg_size)
+{
+       int fd = 0;
+       struct {
+               uint64_t fd_size;
+               rte_iova_t iova;
+               uint64_t pg_size;
+       } data_message;
+
+       size_t cmsglen = CMSG_LEN(sizeof(fd));
+       struct cmsghdr *cmhdr = malloc(cmsglen);
+       if (cmhdr == NULL) {
+               BUS_ERR("Malloc error");
+               return -1;
+       }
+
+       struct iovec iov = {
+                       .iov_base = (void *)&data_message,
+                       .iov_len = sizeof(data_message)
+       };
+       struct msghdr msg = {
+                       .msg_iov = &iov,
+                       .msg_iovlen = 1,
+                       .msg_control = cmhdr,
+                       .msg_controllen = cmsglen,
+       };
+       if (recvmsg(from, &msg, 0) != (int)iov.iov_len) {
+               BUS_ERR("recvmsg error %s", strerror(errno));
+               return -1;
+       }
+       if (msg.msg_controllen != cmsglen) {
+               BUS_ERR("Error with fd on message received");
+               return -1;
+       }
+       fd = *(int *)CMSG_DATA(cmhdr);
+
+       free(cmhdr);
+
+       *memsize = data_message.fd_size;
+       *iova = data_message.iova;
+       *pg_size = data_message.pg_size;
+       return fd;
+}
+
+static int
+dev_probe(void)
+{
+       if (TAILQ_EMPTY(&shared_mem_bus.socket_list))
+               return 0;
+
+       if (rte_mbuf_set_platform_mempool_ops("shared_mem") != 0) {
+               BUS_ERR("Error setting default mempool ops\n");
+               return -1;
+       }
+       BUS_INFO("Set default mempool ops to 'shared_mem'");
+
+       struct socket_device *dev;
+       TAILQ_FOREACH(dev, &shared_mem_bus.socket_list, next) {
+               uint64_t memsize = 0;
+               uint64_t pgsize = 0;
+               rte_iova_t iova = 0;
+               int memfd = recv_fd(dev->fd, &memsize, &iova, &pgsize);
+               /* check memfd is valid, the size is non-zero and multiple of 
2MB */
+               if (memfd < 0 || memsize <= 0 || memsize % (1 << 21) != 0) {
+                       BUS_ERR("Error getting memfd and size");
+                       return -1;
+               }
+               BUS_DEBUG("Received fd %d with memsize %"PRIu64" and pgsize 
%"PRIu64,
+                               memfd, memsize, pgsize);
+
+               void *mem = mmap(NULL, memsize, PROT_READ|PROT_WRITE, 
MAP_SHARED, memfd, 0);
+               if (mem == MAP_FAILED) {
+                       BUS_ERR("Error mmapping the received fd");
+                       return -1;
+               }
+               BUS_DEBUG("%u MB of memory mapped at %p\n", (unsigned 
int)(memsize >> 20), mem);
+               dev->membase = (uintptr_t)mem;
+               dev->memlen = memsize;
+
+               struct eth_shared_mem_msg msg = {
+                               .type = MSG_TYPE_MMAP_BASE_ADDR,
+                               .offset = dev->membase,
+               };
+               rte_shm_bus_send_message(&msg, sizeof(msg));
+
+               char malloc_heap_name[32];
+               snprintf(malloc_heap_name, sizeof(malloc_heap_name),
+                               "socket_%d_ext", rte_socket_id());
+               if (rte_malloc_heap_create(malloc_heap_name) != 0) {
+                       BUS_ERR("Error creating heap %s\n", malloc_heap_name);
+                       return -1;
+               }
+
+               int nb_pages = (memsize / pgsize);
+               rte_iova_t *iovas = malloc(sizeof(iovas[0]) * nb_pages);
+               iovas[0] = iova;
+               for (int i = 1; i < nb_pages; i++)
+                       iovas[i] = iovas[i - 1] + pgsize;
+               BUS_DEBUG("Attempting to add memory to heap: %s", 
malloc_heap_name);
+               if (rte_malloc_heap_memory_add(malloc_heap_name, mem, memsize,
+                               iovas, nb_pages, pgsize) < 0) {
+                       BUS_ERR("Error adding to malloc heap: %s", 
strerror(rte_errno));
+                       free(iovas);
+                       return -1;
+               }
+               free(iovas);
+               BUS_DEBUG("Added memory to heap");
+               rte_malloc_heap_swap_socket(rte_socket_id(),
+                               rte_malloc_heap_get_socket(malloc_heap_name));
+               BUS_DEBUG("Swapped in memory as socket %d memory\n", 
rte_socket_id());
+
+               if (shared_mem_bus.ethdrv != NULL) {
+                       struct rte_device *dev = malloc(sizeof(*dev));
+                       if (dev == NULL)
+                               return -1;
+                       *dev = (struct rte_device){
+                               .name = "shared_mem_ethdev",
+                               .driver = &shared_mem_bus.ethdrv->driver,
+                               .bus = &shared_mem_bus.bus,
+                               .numa_node = SOCKET_ID_ANY,
+                       };
+                       shared_mem_bus.ethdrv->probe(shared_mem_bus.ethdrv, 
dev);
+               }
+       }
+       return 0;
+}
+static struct rte_device *
+find_device(const struct rte_device *start, rte_dev_cmp_t cmp,
+                        const void *data)
+{
+       RTE_SET_USED(start);
+       RTE_SET_USED(cmp);
+       RTE_SET_USED(data);
+       return NULL;
+}
+
+static enum rte_iova_mode
+get_iommu_class(void)
+{
+       /* if there are no devices, report don't care, otherwise VA mode */
+       return TAILQ_EMPTY(&shared_mem_bus.socket_list) ?  RTE_IOVA_DC : 
RTE_IOVA_VA;
+}
+
+static int
+addr_parse(const char *name, void *addr)
+{
+       if (strncmp(name, "sock:", 5) != 0) {
+               BUS_DEBUG("no sock: prefix on %s", name);
+               return -1;
+       }
+
+       const char *filename = &name[5];
+       struct stat st;
+       if (stat(filename, &st) < 0 || (st.st_mode & S_IFMT) != S_IFSOCK) {
+               BUS_ERR("stat failed, or not a socket, %s", filename);
+               return -1;
+       }
+       if (addr != NULL)
+               strcpy(addr, filename);
+       BUS_DEBUG("Matched filename: %s", filename);
+       return 0;
+}
+
+int
+shared_mem_register_driver(struct shared_mem_drv *drv)
+{
+       if (drv->probe == NULL)
+               return -1;
+       shared_mem_bus.ethdrv = drv;
+       return 0;
+}
+
diff --git a/drivers/bus/shared_mem/shared_mem_bus.h 
b/drivers/bus/shared_mem/shared_mem_bus.h
new file mode 100644
index 0000000000..01a9a2a99a
--- /dev/null
+++ b/drivers/bus/shared_mem/shared_mem_bus.h
@@ -0,0 +1,75 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2023 Intel Corporation
+ */
+
+#ifndef DRIVERS_BUS_SHARED_MEM_H_
+#define DRIVERS_BUS_SHARED_MEM_H_
+
+#include <stdint.h>
+#include <rte_common.h>
+#include <rte_ether.h>
+#include <dev_driver.h>
+
+enum shared_mem_msg_type {
+       MSG_TYPE_ACK = 0,
+       MSG_TYPE_MMAP_BASE_ADDR,
+       MSG_TYPE_MEMPOOL_OFFSET,
+       MSG_TYPE_RX_RING_OFFSET,
+       MSG_TYPE_TX_RING_OFFSET,
+       MSG_TYPE_START,
+       MSG_TYPE_GET_MAC,
+       MSG_TYPE_REPORT_MAC,
+};
+
+struct eth_shared_mem_msg {
+       enum shared_mem_msg_type type;  /* type implicitly defines which union 
member is used */
+       union {
+               uintptr_t offset;    /* for many messages, just pass an offset 
*/
+               struct rte_ether_addr ethaddr; /* allow passing mac address */
+               uintptr_t datalen;   /* for other messages, pass a data length 
after the data */
+       };
+       char data[];
+};
+
+struct shared_mem_drv;
+
+/**
+ * Initialisation function for the driver
+ */
+typedef int (c_eth_probe_t)(struct shared_mem_drv *drv, struct rte_device 
*dev);
+
+struct shared_mem_drv {
+       struct rte_driver driver;
+       c_eth_probe_t *probe;            /**< Device probe function. */
+};
+
+/** Helper for PCI device registration from driver (eth, crypto) instance */
+#define RTE_PMD_REGISTER_SHMEM_DRV(nm, c_drv) \
+RTE_INIT(shared_mem_initfn_ ##nm) \
+{\
+       (c_drv).driver.name = RTE_STR(nm);\
+       shared_mem_register_driver(&c_drv); \
+} \
+RTE_PMD_EXPORT_NAME(nm, __COUNTER__)
+
+__rte_internal
+int
+shared_mem_register_driver(struct shared_mem_drv *drv);
+
+__rte_internal
+int
+rte_shm_bus_send_message(void *msg, size_t msglen);
+
+__rte_internal
+int
+rte_shm_bus_recv_message(void *msg, size_t msglen);
+
+__rte_internal
+uintptr_t
+rte_shm_bus_get_mem_offset(void *ptr);
+
+__rte_internal
+void *
+rte_shm_bus_get_mem_ptr(uintptr_t offset);
+
+#endif /* DRIVERS_BUS_SHARED_MEM_H_ */
diff --git a/drivers/bus/shared_mem/version.map 
b/drivers/bus/shared_mem/version.map
new file mode 100644
index 0000000000..2af82689b1
--- /dev/null
+++ b/drivers/bus/shared_mem/version.map
@@ -0,0 +1,11 @@
+INTERNAL {
+       global:
+
+       shared_mem_register_driver;
+       rte_shm_bus_get_mem_offset;
+       rte_shm_bus_get_mem_ptr;
+       rte_shm_bus_recv_message;
+       rte_shm_bus_send_message;
+
+       local: *;
+};
-- 
2.39.2

Reply via email to