Signed-off-by: Maxim Uvarov <maxim.uva...@linaro.org>
---
 .../linux-generic/include/odp_buffer_internal.h    |  10 +-
 platform/linux-generic/include/odp_internal.h      |   1 -
 .../include/odp_packet_io_ipc_internal.h           |  27 +-
 platform/linux-generic/odp_init.c                  |   5 +-
 platform/linux-generic/pktio/ipc.c                 | 488 +++++++++------------
 5 files changed, 236 insertions(+), 295 deletions(-)

diff --git a/platform/linux-generic/include/odp_buffer_internal.h 
b/platform/linux-generic/include/odp_buffer_internal.h
index 4e75908..d0eb597 100644
--- a/platform/linux-generic/include/odp_buffer_internal.h
+++ b/platform/linux-generic/include/odp_buffer_internal.h
@@ -64,6 +64,11 @@ struct odp_buffer_hdr_t {
        struct {
                void     *hdr;
                uint8_t  *data;
+#ifdef _ODP_PKTIO_IPC
+       /* ipc mapped process can not walk over pointers,
+        * offset has to be used */
+               uint64_t ipc_data_offset;
+#endif
                uint32_t  len;
        } seg[CONFIG_PACKET_MAX_SEGS];
 
@@ -101,11 +106,6 @@ struct odp_buffer_hdr_t {
                queue_entry_t   *target_qe;  /* ordered queue target */
                uint64_t         sync[SCHEDULE_ORDERED_LOCKS_PER_QUEUE];
        };
-#ifdef _ODP_PKTIO_IPC
-       /* ipc mapped process can not walk over pointers,
-        * offset has to be used */
-       uint64_t                 ipc_addr_offset[ODP_CONFIG_PACKET_MAX_SEGS];
-#endif
 
        /* Data or next header */
        uint8_t data[0];
diff --git a/platform/linux-generic/include/odp_internal.h 
b/platform/linux-generic/include/odp_internal.h
index 6a80d9d..b313b1f 100644
--- a/platform/linux-generic/include/odp_internal.h
+++ b/platform/linux-generic/include/odp_internal.h
@@ -50,7 +50,6 @@ struct odp_global_data_s {
        odp_cpumask_t control_cpus;
        odp_cpumask_t worker_cpus;
        int num_cpus_installed;
-       int ipc_ns;
 };
 
 enum init_stage {
diff --git a/platform/linux-generic/include/odp_packet_io_ipc_internal.h 
b/platform/linux-generic/include/odp_packet_io_ipc_internal.h
index 851114d..7cd2948 100644
--- a/platform/linux-generic/include/odp_packet_io_ipc_internal.h
+++ b/platform/linux-generic/include/odp_packet_io_ipc_internal.h
@@ -26,22 +26,31 @@
  */
 struct pktio_info {
        struct {
-               /* number of buffer in remote pool */
-               int shm_pool_bufs_num;
-               /* size of remote pool */
-               size_t shm_pkt_pool_size;
+               /* number of buffer*/
+               int num;
                /* size of packet/segment in remote pool */
-               uint32_t shm_pkt_size;
+               uint32_t block_size;
                /* offset from shared memory block start
-                * to pool_mdata_addr (odp-linux pool specific) */
-               size_t mdata_offset;
+                * to pool *base_addr in remote process.
+                * (odp-linux pool specific) */
+               size_t base_addr_offset;
                char pool_name[ODP_POOL_NAME_LEN];
+               /* 1 if master finished creation of all shared objects */
+               int init_done;
        } master;
        struct {
                /* offset from shared memory block start
-                * to pool_mdata_addr in remote process.
+                * to pool *base_addr in remote process.
                 * (odp-linux pool specific) */
-               size_t mdata_offset;
+               size_t base_addr_offset;
+               void *base_addr;
+               uint32_t block_size;
                char pool_name[ODP_POOL_NAME_LEN];
+               /* pid of the slave process written to shm and
+                * used by master to look up memory created by
+                * slave
+                */
+               int pid;
+               int init_done;
        } slave;
 } ODP_PACKED;
diff --git a/platform/linux-generic/odp_init.c 
b/platform/linux-generic/odp_init.c
index d40a83c..1b0d8f8 100644
--- a/platform/linux-generic/odp_init.c
+++ b/platform/linux-generic/odp_init.c
@@ -67,7 +67,7 @@ static int cleanup_files(const char *dirpath, int odp_pid)
 
 int odp_init_global(odp_instance_t *instance,
                    const odp_init_t *params,
-                   const odp_platform_init_t *platform_params)
+                   const odp_platform_init_t *platform_params ODP_UNUSED)
 {
        char *hpdir;
 
@@ -75,9 +75,6 @@ int odp_init_global(odp_instance_t *instance,
        odp_global_data.main_pid = getpid();
        cleanup_files(_ODP_TMPDIR, odp_global_data.main_pid);
 
-       if (platform_params)
-               odp_global_data.ipc_ns = platform_params->ipc_ns;
-
        enum init_stage stage = NO_INIT;
        odp_global_data.log_fn = odp_override_log;
        odp_global_data.abort_fn = odp_override_abort;
diff --git a/platform/linux-generic/pktio/ipc.c 
b/platform/linux-generic/pktio/ipc.c
index 0e99c6e..5e5e3b2 100644
--- a/platform/linux-generic/pktio/ipc.c
+++ b/platform/linux-generic/pktio/ipc.c
@@ -3,150 +3,84 @@
  *
  * SPDX-License-Identifier:     BSD-3-Clause
  */
-#ifdef _ODP_PKTIO_IPC
 #include <odp_packet_io_ipc_internal.h>
 #include <odp_debug_internal.h>
 #include <odp_packet_io_internal.h>
 #include <odp/api/system_info.h>
 #include <odp_shm_internal.h>
+#include <_ishm_internal.h>
 
 #include <sys/mman.h>
 #include <sys/stat.h>
 #include <fcntl.h>
 
+#define IPC_ODP_DEBUG_PRINT 0
+
+#define IPC_ODP_DBG(fmt, ...) \
+       do { \
+               if (IPC_ODP_DEBUG_PRINT == 1) \
+                       ODP_DBG(fmt, ##__VA_ARGS__);\
+       } while (0)
+
 /* MAC address for the "ipc" interface */
 static const char pktio_ipc_mac[] = {0x12, 0x12, 0x12, 0x12, 0x12, 0x12};
 
-static void *_ipc_map_remote_pool(const char *name, size_t size);
+static void *_ipc_map_remote_pool(const char *name, int pid);
 
 static const char *_ipc_odp_buffer_pool_shm_name(odp_pool_t pool_hdl)
 {
-       pool_entry_t *pool;
-       uint32_t pool_id;
+       pool_t *pool;
        odp_shm_t shm;
        odp_shm_info_t info;
 
-       pool_id = pool_handle_to_index(pool_hdl);
-       pool    = get_pool_entry(pool_id);
-       shm = pool->s.pool_shm;
+       pool    = pool_entry_from_hdl(pool_hdl);
+       shm = pool->shm;
 
        odp_shm_info(shm, &info);
 
        return info.name;
 }
 
-/**
-* Look up for shared memory object.
-*
-* @param name   name of shm object
-*
-* @return 0 on success, otherwise non-zero
-*/
-static int _ipc_shm_lookup(const char *name)
-{
-       int shm;
-       char shm_devname[SHM_DEVNAME_MAXLEN];
-
-       if (!odp_global_data.ipc_ns)
-               ODP_ABORT("ipc_ns not set\n");
-
-       snprintf(shm_devname, SHM_DEVNAME_MAXLEN,
-                SHM_DEVNAME_FORMAT,
-                odp_global_data.ipc_ns, name);
-
-       shm = shm_open(shm_devname, O_RDWR, S_IRUSR | S_IWUSR);
-       if (shm == -1) {
-               if (errno == ENOENT) {
-                       ODP_DBG("no file %s\n", shm_devname);
-                       return -1;
-               }
-               ODP_ABORT("shm_open for %s err %s\n",
-                         shm_devname, strerror(errno));
-       }
-       close(shm);
-       return 0;
-}
-
-static int _ipc_map_pktio_info(pktio_entry_t *pktio_entry,
-                              const char *dev,
-                              int *slave)
-{
-       struct pktio_info *pinfo;
-       char name[ODP_POOL_NAME_LEN + sizeof("_info")];
-       uint32_t flags;
-       odp_shm_t shm;
-
-       /* Create info about remote pktio */
-       snprintf(name, sizeof(name), "%s_info", dev);
-
-       flags = ODP_SHM_PROC | _ODP_SHM_O_EXCL;
-
-       shm = odp_shm_reserve(name, sizeof(struct pktio_info),
-                             ODP_CACHE_LINE_SIZE,
-                             flags);
-       if (ODP_SHM_INVALID != shm) {
-               pinfo = odp_shm_addr(shm);
-               pinfo->master.pool_name[0] = 0;
-               *slave = 0;
-       } else {
-               flags = _ODP_SHM_PROC_NOCREAT | _ODP_SHM_O_EXCL;
-               shm = odp_shm_reserve(name, sizeof(struct pktio_info),
-                                     ODP_CACHE_LINE_SIZE,
-                                     flags);
-               if (ODP_SHM_INVALID == shm)
-                       ODP_ABORT("can not connect to shm\n");
-
-               pinfo = odp_shm_addr(shm);
-               *slave = 1;
-       }
-
-       pktio_entry->s.ipc.pinfo = pinfo;
-       pktio_entry->s.ipc.pinfo_shm = shm;
-
-       return 0;
-}
-
 static int _ipc_master_start(pktio_entry_t *pktio_entry)
 {
        struct pktio_info *pinfo = pktio_entry->s.ipc.pinfo;
-       int ret;
        void *ipc_pool_base;
 
-       if (pinfo->slave.mdata_offset == 0)
+       if (pinfo->slave.init_done == 0)
                return -1;
 
-       ret = _ipc_shm_lookup(pinfo->slave.pool_name);
-       if (ret) {
-               ODP_DBG("no pool file %s\n", pinfo->slave.pool_name);
-               return -1;
-       }
-
        ipc_pool_base = _ipc_map_remote_pool(pinfo->slave.pool_name,
-                                            pinfo->master.shm_pkt_pool_size);
+                                            pinfo->slave.pid);
+       if (ipc_pool_base == NULL) {
+               ODP_DBG("no pool file %s for pid %d\n",
+                       pinfo->slave.pool_name, pinfo->slave.pid);
+               return -1;
+       }
+
+       pktio_entry->s.ipc.pool_base = ipc_pool_base;
        pktio_entry->s.ipc.pool_mdata_base = (char *)ipc_pool_base +
-                                            pinfo->slave.mdata_offset;
+                                            pinfo->slave.base_addr_offset;
 
        odp_atomic_store_u32(&pktio_entry->s.ipc.ready, 1);
 
-       ODP_DBG("%s started.\n",  pktio_entry->s.name);
+       IPC_ODP_DBG("%s started.\n",  pktio_entry->s.name);
        return 0;
 }
 
 static int _ipc_init_master(pktio_entry_t *pktio_entry,
                            const char *dev,
-                           odp_pool_t pool)
+                           odp_pool_t pool_hdl)
 {
        char ipc_shm_name[ODP_POOL_NAME_LEN + sizeof("_m_prod")];
-       pool_entry_t *pool_entry;
-       uint32_t pool_id;
+       pool_t *pool;
        struct pktio_info *pinfo;
        const char *pool_name;
 
-       pool_id = pool_handle_to_index(pool);
-       pool_entry    = get_pool_entry(pool_id);
+       pool = pool_entry_from_hdl(pool_hdl);
+       (void)pool;
 
        if (strlen(dev) > (ODP_POOL_NAME_LEN - sizeof("_m_prod"))) {
-               ODP_DBG("too big ipc name\n");
+               ODP_ERR("too big ipc name\n");
                return -1;
        }
 
@@ -158,7 +92,7 @@ static int _ipc_init_master(pktio_entry_t *pktio_entry,
                        PKTIO_IPC_ENTRIES,
                        _RING_SHM_PROC | _RING_NO_LIST);
        if (!pktio_entry->s.ipc.tx.send) {
-               ODP_DBG("pid %d unable to create ipc ring %s name\n",
+               ODP_ERR("pid %d unable to create ipc ring %s name\n",
                        getpid(), ipc_shm_name);
                return -1;
        }
@@ -174,7 +108,7 @@ static int _ipc_init_master(pktio_entry_t *pktio_entry,
                        PKTIO_IPC_ENTRIES,
                        _RING_SHM_PROC | _RING_NO_LIST);
        if (!pktio_entry->s.ipc.tx.free) {
-               ODP_DBG("pid %d unable to create ipc ring %s name\n",
+               ODP_ERR("pid %d unable to create ipc ring %s name\n",
                        getpid(), ipc_shm_name);
                goto free_m_prod;
        }
@@ -187,7 +121,7 @@ static int _ipc_init_master(pktio_entry_t *pktio_entry,
                        PKTIO_IPC_ENTRIES,
                        _RING_SHM_PROC | _RING_NO_LIST);
        if (!pktio_entry->s.ipc.rx.recv) {
-               ODP_DBG("pid %d unable to create ipc ring %s name\n",
+               ODP_ERR("pid %d unable to create ipc ring %s name\n",
                        getpid(), ipc_shm_name);
                goto free_m_cons;
        }
@@ -200,7 +134,7 @@ static int _ipc_init_master(pktio_entry_t *pktio_entry,
                        PKTIO_IPC_ENTRIES,
                        _RING_SHM_PROC | _RING_NO_LIST);
        if (!pktio_entry->s.ipc.rx.free) {
-               ODP_DBG("pid %d unable to create ipc ring %s name\n",
+               ODP_ERR("pid %d unable to create ipc ring %s name\n",
                        getpid(), ipc_shm_name);
                goto free_s_prod;
        }
@@ -210,24 +144,23 @@ static int _ipc_init_master(pktio_entry_t *pktio_entry,
 
        /* Set up pool name for remote info */
        pinfo = pktio_entry->s.ipc.pinfo;
-       pool_name = _ipc_odp_buffer_pool_shm_name(pool);
+       pool_name = _ipc_odp_buffer_pool_shm_name(pool_hdl);
        if (strlen(pool_name) > ODP_POOL_NAME_LEN) {
-               ODP_DBG("pid %d ipc pool name %s is too big %d\n",
+               ODP_ERR("pid %d ipc pool name %s is too big %d\n",
                        getpid(), pool_name, strlen(pool_name));
                goto free_s_prod;
        }
 
        memcpy(pinfo->master.pool_name, pool_name, strlen(pool_name));
-       pinfo->master.shm_pkt_pool_size = pool_entry->s.pool_size;
-       pinfo->master.shm_pool_bufs_num = pool_entry->s.buf_num;
-       pinfo->master.shm_pkt_size = pool_entry->s.seg_size;
-       pinfo->master.mdata_offset =  pool_entry->s.pool_mdata_addr -
-                              pool_entry->s.pool_base_addr;
-       pinfo->slave.mdata_offset = 0;
+       pinfo->slave.base_addr_offset = 0;
+       pinfo->slave.base_addr = 0;
+       pinfo->slave.pid = 0;
+       pinfo->slave.init_done = 0;
 
-       pktio_entry->s.ipc.pool = pool;
+       pktio_entry->s.ipc.pool = pool_hdl;
 
        ODP_DBG("Pre init... DONE.\n");
+       pinfo->master.init_done = 1;
 
        _ipc_master_start(pktio_entry);
 
@@ -246,55 +179,45 @@ free_m_prod:
 }
 
 static void _ipc_export_pool(struct pktio_info *pinfo,
-                            odp_pool_t pool)
+                            odp_pool_t pool_hdl)
 {
-       pool_entry_t *pool_entry;
-
-       pool_entry = odp_pool_to_entry(pool);
-       if (pool_entry->s.blk_size != pinfo->master.shm_pkt_size)
-               ODP_ABORT("pktio for same name should have the same pool 
size\n");
-       if (pool_entry->s.buf_num != (unsigned)pinfo->master.shm_pool_bufs_num)
-               ODP_ABORT("pktio for same name should have the same pool 
size\n");
+       pool_t *pool = pool_entry_from_hdl(pool_hdl);
 
        snprintf(pinfo->slave.pool_name, ODP_POOL_NAME_LEN, "%s",
-                pool_entry->s.name);
-       pinfo->slave.mdata_offset = pool_entry->s.pool_mdata_addr -
-                                   pool_entry->s.pool_base_addr;
+                _ipc_odp_buffer_pool_shm_name(pool_hdl));
+       pinfo->slave.pid = odp_global_data.main_pid;
+       pinfo->slave.block_size = pool->block_size;
+       pinfo->slave.base_addr = pool->base_addr;
 }
 
-static void *_ipc_map_remote_pool(const char *name, size_t size)
+static void *_ipc_map_remote_pool(const char *name, int pid)
 {
        odp_shm_t shm;
        void *addr;
+       char rname[ODP_SHM_NAME_LEN];
 
-       ODP_DBG("Mapping remote pool %s, size %ld\n", name, size);
-       shm = odp_shm_reserve(name,
-                             size,
-                             ODP_CACHE_LINE_SIZE,
-                             _ODP_SHM_PROC_NOCREAT);
-       if (shm == ODP_SHM_INVALID)
-               ODP_ABORT("unable map %s\n", name);
+       snprintf(rname, ODP_SHM_NAME_LEN, "remote-%s", name);
+       shm = odp_shm_import(name, pid, rname);
+       if (shm == ODP_SHM_INVALID) {
+               ODP_ERR("unable map %s\n", name);
+               return NULL;
+       }
 
        addr = odp_shm_addr(shm);
-       ODP_DBG("MAP master: %p - %p size %ld, pool %s\n",
-               addr, (char *)addr + size, size, name);
+
+       IPC_ODP_DBG("Mapped remote pool %s to local %s\n", name, rname);
        return addr;
 }
 
-static void *_ipc_shm_map(char *name, size_t size)
+static void *_ipc_shm_map(char *name, int pid)
 {
        odp_shm_t shm;
-       int ret;
 
-       ret = _ipc_shm_lookup(name);
-       if (ret == -1)
+       shm = odp_shm_import(name, pid, name);
+       if (ODP_SHM_INVALID == shm) {
+               ODP_ERR("unable to map: %s\n", name);
                return NULL;
-
-       shm = odp_shm_reserve(name, size,
-                             ODP_CACHE_LINE_SIZE,
-                             _ODP_SHM_PROC_NOCREAT);
-       if (ODP_SHM_INVALID == shm)
-               ODP_ABORT("unable to map: %s\n", name);
+       }
 
        return odp_shm_addr(shm);
 }
@@ -313,15 +236,22 @@ static int _ipc_init_slave(const char *dev,
 static int _ipc_slave_start(pktio_entry_t *pktio_entry)
 {
        char ipc_shm_name[ODP_POOL_NAME_LEN + sizeof("_slave_r")];
-       size_t ring_size = PKTIO_IPC_ENTRIES * sizeof(void *) +
-                          sizeof(_ring_t);
        struct pktio_info *pinfo;
        void *ipc_pool_base;
        odp_shm_t shm;
-       const char *dev = pktio_entry->s.name;
+       char tail[ODP_POOL_NAME_LEN];
+       char dev[ODP_POOL_NAME_LEN];
+       int pid;
+
+       if (sscanf(pktio_entry->s.name, "ipc:%d:%s", &pid, tail) != 2) {
+               ODP_ERR("wrong pktio name\n");
+               return -1;
+       }
+
+       sprintf(dev, "ipc:%s", tail);
 
        snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_prod", dev);
-       pktio_entry->s.ipc.rx.recv  = _ipc_shm_map(ipc_shm_name, ring_size);
+       pktio_entry->s.ipc.rx.recv  = _ipc_shm_map(ipc_shm_name, pid);
        if (!pktio_entry->s.ipc.rx.recv) {
                ODP_DBG("pid %d unable to find ipc ring %s name\n",
                        getpid(), dev);
@@ -333,9 +263,9 @@ static int _ipc_slave_start(pktio_entry_t *pktio_entry)
                _ring_free_count(pktio_entry->s.ipc.rx.recv));
 
        snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_cons", dev);
-       pktio_entry->s.ipc.rx.free = _ipc_shm_map(ipc_shm_name, ring_size);
+       pktio_entry->s.ipc.rx.free = _ipc_shm_map(ipc_shm_name, pid);
        if (!pktio_entry->s.ipc.rx.free) {
-               ODP_DBG("pid %d unable to find ipc ring %s name\n",
+               ODP_ERR("pid %d unable to find ipc ring %s name\n",
                        getpid(), dev);
                goto free_m_prod;
        }
@@ -344,9 +274,9 @@ static int _ipc_slave_start(pktio_entry_t *pktio_entry)
                _ring_free_count(pktio_entry->s.ipc.rx.free));
 
        snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_prod", dev);
-       pktio_entry->s.ipc.tx.send = _ipc_shm_map(ipc_shm_name, ring_size);
+       pktio_entry->s.ipc.tx.send = _ipc_shm_map(ipc_shm_name, pid);
        if (!pktio_entry->s.ipc.tx.send) {
-               ODP_DBG("pid %d unable to find ipc ring %s name\n",
+               ODP_ERR("pid %d unable to find ipc ring %s name\n",
                        getpid(), dev);
                goto free_m_cons;
        }
@@ -355,9 +285,9 @@ static int _ipc_slave_start(pktio_entry_t *pktio_entry)
                _ring_free_count(pktio_entry->s.ipc.tx.send));
 
        snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_cons", dev);
-       pktio_entry->s.ipc.tx.free = _ipc_shm_map(ipc_shm_name, ring_size);
+       pktio_entry->s.ipc.tx.free = _ipc_shm_map(ipc_shm_name, pid);
        if (!pktio_entry->s.ipc.tx.free) {
-               ODP_DBG("pid %d unable to find ipc ring %s name\n",
+               ODP_ERR("pid %d unable to find ipc ring %s name\n",
                        getpid(), dev);
                goto free_s_prod;
        }
@@ -368,14 +298,15 @@ static int _ipc_slave_start(pktio_entry_t *pktio_entry)
        /* Get info about remote pool */
        pinfo = pktio_entry->s.ipc.pinfo;
        ipc_pool_base = _ipc_map_remote_pool(pinfo->master.pool_name,
-                                            pinfo->master.shm_pkt_pool_size);
+                                            pid);
        pktio_entry->s.ipc.pool_mdata_base = (char *)ipc_pool_base +
-                                            pinfo->master.mdata_offset;
-       pktio_entry->s.ipc.pkt_size = pinfo->master.shm_pkt_size;
+                                            pinfo->master.base_addr_offset;
+       pktio_entry->s.ipc.pkt_size = pinfo->master.block_size;
 
        _ipc_export_pool(pinfo, pktio_entry->s.ipc.pool);
 
        odp_atomic_store_u32(&pktio_entry->s.ipc.ready, 1);
+       pinfo->slave.init_done = 1;
 
        ODP_DBG("%s started.\n",  pktio_entry->s.name);
        return 0;
@@ -401,7 +332,11 @@ static int ipc_pktio_open(odp_pktio_t id ODP_UNUSED,
                          odp_pool_t pool)
 {
        int ret = -1;
-       int slave;
+       int pid ODP_UNUSED;
+       struct pktio_info *pinfo;
+       char name[ODP_POOL_NAME_LEN + sizeof("_info")];
+       char tail[ODP_POOL_NAME_LEN];
+       odp_shm_t shm;
 
        ODP_STATIC_ASSERT(ODP_POOL_NAME_LEN == _RING_NAMESIZE,
                          "mismatch pool and ring name arrays");
@@ -411,65 +346,59 @@ static int ipc_pktio_open(odp_pktio_t id ODP_UNUSED,
 
        odp_atomic_init_u32(&pktio_entry->s.ipc.ready, 0);
 
-       _ipc_map_pktio_info(pktio_entry, dev, &slave);
-       pktio_entry->s.ipc.type = (slave == 0) ? PKTIO_TYPE_IPC_MASTER :
-                                                PKTIO_TYPE_IPC_SLAVE;
+       /* Shared info about remote pktio */
+       if (sscanf(dev, "ipc:%d:%s", &pid, tail) == 2) {
+               pktio_entry->s.ipc.type = PKTIO_TYPE_IPC_SLAVE;
 
-       if (pktio_entry->s.ipc.type == PKTIO_TYPE_IPC_MASTER) {
+               snprintf(name, sizeof(name), "ipc:%s_info", tail);
+               IPC_ODP_DBG("lookup for name %s for pid %d\n", name, pid);
+               shm = odp_shm_import(name, pid, name);
+               if (ODP_SHM_INVALID == shm)
+                       return -1;
+               pinfo = odp_shm_addr(shm);
+
+               if (!pinfo->master.init_done) {
+                       odp_shm_free(shm);
+                       return -1;
+               }
+               pktio_entry->s.ipc.pinfo = pinfo;
+               pktio_entry->s.ipc.pinfo_shm = shm;
+               ODP_DBG("process %d is slave\n", getpid());
+               ret = _ipc_init_slave(name, pktio_entry, pool);
+       } else {
+               pktio_entry->s.ipc.type = PKTIO_TYPE_IPC_MASTER;
+               snprintf(name, sizeof(name), "%s_info", dev);
+               shm = odp_shm_reserve(name, sizeof(struct pktio_info),
+                                     ODP_CACHE_LINE_SIZE,
+                                     _ODP_ISHM_EXPORT | _ODP_ISHM_LOCK);
+               if (ODP_SHM_INVALID == shm) {
+                       ODP_ERR("can not create shm %s\n", name);
+                       return -1;
+               }
+
+               pinfo = odp_shm_addr(shm);
+               pinfo->master.init_done = 0;
+               pinfo->master.pool_name[0] = 0;
+               pktio_entry->s.ipc.pinfo = pinfo;
+               pktio_entry->s.ipc.pinfo_shm = shm;
                ODP_DBG("process %d is master\n", getpid());
                ret = _ipc_init_master(pktio_entry, dev, pool);
-       } else {
-               ODP_DBG("process %d is slave\n", getpid());
-               ret = _ipc_init_slave(dev, pktio_entry, pool);
        }
 
        return ret;
 }
 
-static inline void *_ipc_buffer_map(odp_buffer_hdr_t *buf,
-                                   uint32_t offset,
-                                   uint32_t *seglen,
-                                   uint32_t limit)
+static void _ipc_free_ring_packets(pktio_entry_t *pktio_entry, _ring_t *r)
 {
-       int seg_index  = offset / buf->segsize;
-       int seg_offset = offset % buf->segsize;
-#ifdef _ODP_PKTIO_IPC
-       void *addr = (char *)buf - buf->ipc_addr_offset[seg_index];
-#else
-       /** buf_hdr.ipc_addr_offset defined only when ipc is
-        *  enabled. */
-       void *addr = NULL;
-
-       (void)seg_index;
-#endif
-       if (seglen) {
-               uint32_t buf_left = limit - offset;
-               *seglen = seg_offset + buf_left <= buf->segsize ?
-                       buf_left : buf->segsize - seg_offset;
-       }
-
-       return (void *)(seg_offset + (uint8_t *)addr);
-}
-
-static inline void *_ipc_packet_map(odp_packet_hdr_t *pkt_hdr,
-                                   uint32_t offset, uint32_t *seglen)
-{
-       if (offset > pkt_hdr->frame_len)
-               return NULL;
-
-       return _ipc_buffer_map(&pkt_hdr->buf_hdr,
-                         pkt_hdr->headroom + offset, seglen,
-                         pkt_hdr->headroom + pkt_hdr->frame_len);
-}
-
-static void _ipc_free_ring_packets(_ring_t *r)
-{
-       odp_packet_t r_p_pkts[PKTIO_IPC_ENTRIES];
+       uint64_t offsets[PKTIO_IPC_ENTRIES];
        int ret;
        void **rbuf_p;
        int i;
 
-       rbuf_p = (void *)&r_p_pkts;
+       if (!r)
+               return;
+
+       rbuf_p = (void *)&offsets;
 
        while (1) {
                ret = _ring_mc_dequeue_burst(r, rbuf_p,
@@ -477,8 +406,13 @@ static void _ipc_free_ring_packets(_ring_t *r)
                if (0 == ret)
                        break;
                for (i = 0; i < ret; i++) {
-                       if (r_p_pkts[i] != ODP_PACKET_INVALID)
-                               odp_packet_free(r_p_pkts[i]);
+                       odp_packet_hdr_t *phdr;
+                       odp_packet_t pkt;
+                       void *mbase = pktio_entry->s.ipc.pool_mdata_base;
+
+                       phdr = (void *)((uint8_t *)mbase + offsets[i]);
+                       pkt = (odp_packet_t)phdr->buf_hdr.handle.handle;
+                       odp_packet_free(pkt);
                }
        }
 }
@@ -490,22 +424,23 @@ static int ipc_pktio_recv_lockless(pktio_entry_t 
*pktio_entry,
        int i;
        _ring_t *r;
        _ring_t *r_p;
+       uint64_t offsets[PKTIO_IPC_ENTRIES];
+       void **ipcbufs_p = (void *)&offsets;
+       uint32_t ready;
+       int pkts_ring;
 
-       odp_packet_t remote_pkts[PKTIO_IPC_ENTRIES];
-       void **ipcbufs_p = (void *)&remote_pkts;
-       uint32_t ready = odp_atomic_load_u32(&pktio_entry->s.ipc.ready);
-
+       ready = odp_atomic_load_u32(&pktio_entry->s.ipc.ready);
        if (odp_unlikely(!ready)) {
-               ODP_DBG("start pktio is missing before usage?\n");
+               IPC_ODP_DBG("start pktio is missing before usage?\n");
                return -1;
        }
 
-       _ipc_free_ring_packets(pktio_entry->s.ipc.tx.free);
+       _ipc_free_ring_packets(pktio_entry, pktio_entry->s.ipc.tx.free);
 
        r = pktio_entry->s.ipc.rx.recv;
        pkts = _ring_mc_dequeue_burst(r, ipcbufs_p, len);
        if (odp_unlikely(pkts < 0))
-               ODP_ABORT("error to dequeue no packets\n");
+               ODP_ABORT("internal error dequeue\n");
 
        /* fast path */
        if (odp_likely(0 == pkts))
@@ -514,36 +449,21 @@ static int ipc_pktio_recv_lockless(pktio_entry_t 
*pktio_entry,
        for (i = 0; i < pkts; i++) {
                odp_pool_t pool;
                odp_packet_t pkt;
-               odp_packet_hdr_t phdr;
-               void *ptr;
-               odp_buffer_bits_t handle;
-               int idx; /* Remote packet has coded pool and index.
-                         * We need only index.*/
+               odp_packet_hdr_t *phdr;
                void *pkt_data;
-               void *remote_pkt_data;
+               uint64_t data_pool_off;
+               void *rmt_data_ptr;
 
-               if (remote_pkts[i] == ODP_PACKET_INVALID)
-                       continue;
+               phdr = (void *)((uint8_t *)pktio_entry->s.ipc.pool_mdata_base +
+                      offsets[i]);
 
-               handle.handle = _odp_packet_to_buffer(remote_pkts[i]);
-               idx = handle.index;
-
-               /* Link to packed data. To this line we have Zero-Copy between
-                * processes, to simplify use packet copy in that version which
-                * can be removed later with more advance buffer management
-                * (ref counters).
-                */
-               /* reverse odp_buf_to_hdr() */
-               ptr = (char *)pktio_entry->s.ipc.pool_mdata_base +
-                     (idx * ODP_CACHE_LINE_SIZE);
-               memcpy(&phdr, ptr, sizeof(odp_packet_hdr_t));
-
-               /* Allocate new packet. Select*/
                pool = pktio_entry->s.ipc.pool;
                if (odp_unlikely(pool == ODP_POOL_INVALID))
                        ODP_ABORT("invalid pool");
 
-               pkt = odp_packet_alloc(pool, phdr.frame_len);
+               data_pool_off = phdr->buf_hdr.seg[0].ipc_data_offset;
+
+               pkt = odp_packet_alloc(pool, phdr->frame_len);
                if (odp_unlikely(pkt == ODP_PACKET_INVALID)) {
                        /* Original pool might be smaller then
                        *  PKTIO_IPC_ENTRIES. If packet can not be
@@ -562,30 +482,40 @@ static int ipc_pktio_recv_lockless(pktio_entry_t 
*pktio_entry,
                                  (PKTIO_TYPE_IPC_SLAVE ==
                                        pktio_entry->s.ipc.type));
 
-               remote_pkt_data = _ipc_packet_map(ptr, 0, NULL);
-               if (odp_unlikely(!remote_pkt_data))
-                       ODP_ABORT("unable to map remote_pkt_data, ipc_slave 
%d\n",
-                                 (PKTIO_TYPE_IPC_SLAVE ==
-                                       pktio_entry->s.ipc.type));
-
                /* Copy packet data from shared pool to local pool. */
-               memcpy(pkt_data, remote_pkt_data, phdr.frame_len);
+               rmt_data_ptr = (uint8_t *)pktio_entry->s.ipc.pool_mdata_base +
+                              data_pool_off;
+               memcpy(pkt_data, rmt_data_ptr, phdr->frame_len);
 
                /* Copy packets L2, L3 parsed offsets and size */
-               copy_packet_cls_metadata(&phdr, odp_packet_hdr(pkt));
+               copy_packet_cls_metadata(phdr, odp_packet_hdr(pkt));
+
+               odp_packet_hdr(pkt)->frame_len = phdr->frame_len;
+               odp_packet_hdr(pkt)->headroom = phdr->headroom;
+               odp_packet_hdr(pkt)->tailroom = phdr->tailroom;
+
+               /* Take classification fields */
+               odp_packet_hdr(pkt)->p = phdr->p;
 
-               odp_packet_hdr(pkt)->frame_len = phdr.frame_len;
-               odp_packet_hdr(pkt)->headroom = phdr.headroom;
-               odp_packet_hdr(pkt)->tailroom = phdr.tailroom;
-               odp_packet_hdr(pkt)->input = pktio_entry->s.handle;
                pkt_table[i] = pkt;
        }
 
        /* Now tell other process that we no longer need that buffers.*/
        r_p = pktio_entry->s.ipc.rx.free;
-       pkts = _ring_mp_enqueue_burst(r_p, ipcbufs_p, i);
+
+repeat:
+       pkts_ring = _ring_mp_enqueue_burst(r_p, ipcbufs_p, pkts);
        if (odp_unlikely(pkts < 0))
                ODP_ABORT("ipc: odp_ring_mp_enqueue_bulk r_p fail\n");
+       if (odp_unlikely(pkts != pkts_ring)) {
+               IPC_ODP_DBG("odp_ring_full: %d, odp_ring_count %d,"
+                           " _ring_free_count %d\n",
+                           _ring_full(r_p), _ring_count(r_p),
+                           _ring_free_count(r_p));
+               ipcbufs_p = (void *)&offsets[pkts_ring - 1];
+               pkts = pkts - pkts_ring;
+               goto repeat;
+       }
 
        return pkts;
 }
@@ -614,26 +544,23 @@ static int ipc_pktio_send_lockless(pktio_entry_t 
*pktio_entry,
        uint32_t ready = odp_atomic_load_u32(&pktio_entry->s.ipc.ready);
        odp_packet_t pkt_table_mapped[len]; /**< Ready to send packet has to be
                                              * in memory mapped pool. */
+       uint64_t offsets[len];
 
        if (odp_unlikely(!ready))
                return 0;
 
-       _ipc_free_ring_packets(pktio_entry->s.ipc.tx.free);
+       _ipc_free_ring_packets(pktio_entry, pktio_entry->s.ipc.tx.free);
 
-       /* Prepare packets: calculate offset from address. */
+       /* Copy packets to shm shared pool if they are in different */
        for (i = 0; i < len; i++) {
-               int j;
                odp_packet_t pkt =  pkt_table[i];
-               odp_packet_hdr_t *pkt_hdr = odp_packet_hdr(pkt);
+               pool_t *ipc_pool = pool_entry_from_hdl(pktio_entry->s.ipc.pool);
                odp_buffer_bits_t handle;
-               uint32_t cur_mapped_pool_id =
-                        pool_handle_to_index(pktio_entry->s.ipc.pool);
-               uint32_t pool_id;
+               uint32_t pkt_pool_id;
 
-               /* do copy if packet was allocated from not mapped pool */
                handle.handle = _odp_packet_to_buffer(pkt);
-               pool_id = handle.pool_id;
-               if (pool_id != cur_mapped_pool_id) {
+               pkt_pool_id = handle.pool_id;
+               if (pkt_pool_id != ipc_pool->pool_idx) {
                        odp_packet_t newpkt;
 
                        newpkt = odp_packet_copy(pkt, pktio_entry->s.ipc.pool);
@@ -645,24 +572,30 @@ static int ipc_pktio_send_lockless(pktio_entry_t 
*pktio_entry,
                } else {
                        pkt_table_mapped[i] = pkt;
                }
+       }
 
-               /* buf_hdr.addr can not be used directly in remote process,
-                * convert it to offset
-                */
-               for (j = 0; j < ODP_BUFFER_MAX_SEG; j++) {
-#ifdef _ODP_PKTIO_IPC
-                       pkt_hdr->buf_hdr.ipc_addr_offset[j] = (char *)pkt_hdr -
-                               (char *)pkt_hdr->buf_hdr.addr[j];
-#else
-                       /** buf_hdr.ipc_addr_offset defined only when ipc is
-                        *  enabled. */
-                       (void)pkt_hdr;
-#endif
-               }
+       /* Set offset to phdr for outgoing packets */
+       for (i = 0; i < len; i++) {
+               uint64_t data_pool_off;
+               odp_packet_t pkt = pkt_table_mapped[i];
+               odp_packet_hdr_t *pkt_hdr = odp_packet_hdr(pkt);
+               odp_pool_t pool_hdl = odp_packet_pool(pkt);
+               pool_t *pool = pool_entry_from_hdl(pool_hdl);
+
+               offsets[i] = (uint8_t *)pkt_hdr -
+                            (uint8_t *)odp_shm_addr(pool->shm);
+               data_pool_off = (uint8_t *)pkt_hdr->buf_hdr.seg[0].data -
+                               (uint8_t *)odp_shm_addr(pool->shm);
+               pkt_hdr->buf_hdr.seg[0].ipc_data_offset = data_pool_off;
+               IPC_ODP_DBG("%d/%d send packet %llx, pool %llx,"
+                           "phdr = %p, offset %x\n",
+                           i, len,
+                           odp_packet_to_u64(pkt), odp_pool_to_u64(pool_hdl),
+                           pkt_hdr, pkt_hdr->buf_hdr.seg[0].ipc_data_offset);
        }
 
        /* Put packets to ring to be processed by other process. */
-       rbuf_p = (void *)&pkt_table_mapped[0];
+       rbuf_p = (void *)&offsets[0];
        r = pktio_entry->s.ipc.tx.send;
        ret = _ring_mp_enqueue_burst(r, rbuf_p, len);
        if (odp_unlikely(ret < 0)) {
@@ -673,6 +606,7 @@ static int ipc_pktio_send_lockless(pktio_entry_t 
*pktio_entry,
                ODP_ERR("odp_ring_full: %d, odp_ring_count %d, _ring_free_count 
%d\n",
                        _ring_full(r), _ring_count(r),
                        _ring_free_count(r));
+               ODP_ABORT("Unexpected!\n");
        }
 
        return ret;
@@ -722,22 +656,25 @@ static int ipc_start(pktio_entry_t *pktio_entry)
 
 static int ipc_stop(pktio_entry_t *pktio_entry)
 {
-       unsigned tx_send, tx_free;
+       unsigned tx_send = 0, tx_free = 0;
 
        odp_atomic_store_u32(&pktio_entry->s.ipc.ready, 0);
 
-       _ipc_free_ring_packets(pktio_entry->s.ipc.tx.send);
+       if (pktio_entry->s.ipc.tx.send)
+               _ipc_free_ring_packets(pktio_entry, pktio_entry->s.ipc.tx.send);
        /* other process can transfer packets from one ring to
         * other, use delay here to free that packets. */
        sleep(1);
-       _ipc_free_ring_packets(pktio_entry->s.ipc.tx.free);
+       if (pktio_entry->s.ipc.tx.free)
+               _ipc_free_ring_packets(pktio_entry, pktio_entry->s.ipc.tx.free);
 
-       tx_send = _ring_count(pktio_entry->s.ipc.tx.send);
-       tx_free = _ring_count(pktio_entry->s.ipc.tx.free);
+       if (pktio_entry->s.ipc.tx.send)
+               tx_send = _ring_count(pktio_entry->s.ipc.tx.send);
+       if (pktio_entry->s.ipc.tx.free)
+               tx_free = _ring_count(pktio_entry->s.ipc.tx.free);
        if (tx_send | tx_free) {
                ODP_DBG("IPC rings: tx send %d tx free %d\n",
-                       _ring_free_count(pktio_entry->s.ipc.tx.send),
-                       _ring_free_count(pktio_entry->s.ipc.tx.free));
+                       tx_send, tx_free);
        }
 
        return 0;
@@ -795,4 +732,3 @@ const pktio_if_ops_t ipc_pktio_ops = {
        .pktin_ts_from_ns = NULL,
        .config = NULL
 };
-#endif
-- 
2.7.1.250.gff4ea60

Reply via email to