laforge has submitted this change. ( 
https://gerrit.osmocom.org/c/libosmocore/+/35068?usp=email )

Change subject: osmo_io: sendmsg/recvmsg support
......................................................................

osmo_io: sendmsg/recvmsg support

Add support osmo_io operations resembling sendmsg() and recvmsg() socket
operations.  This is what will enable the implementation of higher-layer
functions like equivalents of sctp_recvmsg() and sctp_send() in
libosmo-netif and/or other users.

Change-Id: I89eb519b22d21011d61a7855b2364bc3c295df82
Related: OS#5751
---
M include/osmocom/core/osmo_io.h
M src/core/libosmocore.map
M src/core/osmo_io.c
M src/core/osmo_io_internal.h
M src/core/osmo_io_poll.c
M src/core/osmo_io_uring.c
6 files changed, 184 insertions(+), 20 deletions(-)

Approvals:
  pespin: Looks good to me, but someone else must approve
  Jenkins Builder: Verified
  laforge: Looks good to me, approved




diff --git a/include/osmocom/core/osmo_io.h b/include/osmocom/core/osmo_io.h
index 3c704be..dda9e6d 100644
--- a/include/osmocom/core/osmo_io.h
+++ b/include/osmocom/core/osmo_io.h
@@ -4,6 +4,8 @@

 #pragma once

+#include <sys/socket.h>
+
 #include <osmocom/core/linuxlist.h>
 #include <osmocom/core/logging.h>
 #include <osmocom/core/msgb.h>
@@ -21,8 +23,8 @@
        OSMO_IO_FD_MODE_READ_WRITE,
        /*! use recvfrom() / sendto() calls */
        OSMO_IO_FD_MODE_RECVFROM_SENDTO,
-       /*! emulate sctp_recvmsg() and sctp_send() */
-       OSMO_IO_FD_MODE_SCTP_RECVMSG_SEND,
+       /*! emulate recvmsg() / sendmsg() */
+       OSMO_IO_FD_MODE_RECVMSG_SENDMSG,
 };

 enum osmo_io_backend {
@@ -65,12 +67,20 @@
                                  struct msgb *msg,
                                  const struct osmo_sockaddr *daddr);
        };
+
+       /* mode OSMO_IO_FD_MODE_RECVMSG_SENDMSG: */
+       struct {
+               void (*recvmsg_cb)(struct osmo_io_fd *iofd, int res,
+                                  struct msgb *msg, const struct msghdr *msgh);
+               void (*sendmsg_cb)(struct osmo_io_fd *iofd, int res, struct 
msgb *msg);
+       };
 };

 void osmo_iofd_init(void);

 struct osmo_io_fd *osmo_iofd_setup(const void *ctx, int fd, const char *name,
                  enum osmo_io_fd_mode mode, const struct osmo_io_ops *ioops, 
void *data);
+int osmo_iofd_set_cmsg_size(struct osmo_io_fd *iofd, size_t cmsg_size);
 int osmo_iofd_register(struct osmo_io_fd *iofd, int fd);
 int osmo_iofd_unregister(struct osmo_io_fd *iofd);
 unsigned int osmo_iofd_txqueue_len(struct osmo_io_fd *iofd);
@@ -83,6 +93,8 @@
 int osmo_iofd_write_msgb(struct osmo_io_fd *iofd, struct msgb *msg);
 int osmo_iofd_sendto_msgb(struct osmo_io_fd *iofd, struct msgb *msg, int 
sendto_flags,
                          const struct osmo_sockaddr *dest);
+int osmo_iofd_sendmsg_msgb(struct osmo_io_fd *iofd, struct msgb *msg, int 
sendmsg_flags,
+                          const struct msghdr *msgh);

 void osmo_iofd_set_alloc_info(struct osmo_io_fd *iofd, unsigned int size, 
unsigned int headroom);
 void osmo_iofd_set_txqueue_max_length(struct osmo_io_fd *iofd, unsigned int 
size);
diff --git a/src/core/libosmocore.map b/src/core/libosmocore.map
index b66e37d..a50a9ed 100644
--- a/src/core/libosmocore.map
+++ b/src/core/libosmocore.map
@@ -266,7 +266,10 @@
 osmo_iofd_ops;
 osmo_iofd_register;
 osmo_iofd_sendto_msgb;
+osmo_iofd_sctp_send_msgb;
+osmo_iofd_sendmsg_msgb;
 osmo_iofd_set_alloc_info;
+osmo_iofd_set_cmsg_size;
 osmo_iofd_set_data;
 osmo_iofd_set_ioops;
 osmo_iofd_set_priv_nr;
diff --git a/src/core/osmo_io.c b/src/core/osmo_io.c
index 71249cf..5d25f66 100644
--- a/src/core/osmo_io.c
+++ b/src/core/osmo_io.c
@@ -1,8 +1,8 @@
 /*! \file osmo_io.c
  * New osmocom async I/O API.
  *
- * (C) 2022 by Harald Welte <lafo...@osmocom.org>
- * (C) 2022-2023 by sysmocom - s.f.m.c. GmbH <i...@sysmocom.de>
+ * (C) 2022-2024 by Harald Welte <lafo...@osmocom.org>
+ * (C) 2022-2024 by sysmocom - s.f.m.c. GmbH <i...@sysmocom.de>
  * Author: Daniel Willmann <dwillm...@sysmocom.de>
  *
  * All Rights Reserved.
@@ -105,8 +105,10 @@
  *  \param[in] iofd the osmo_io file structure
  *  \param[in] action the action this msg(hdr) is for (read, write, ..)
  *  \param[in] msg the msg buffer to use. Will allocate a new one if NULL
+ *  \param[in] cmsg_size size (in bytes) of iofd_msghdr.cmsg buffer. Can be 0 
if cmsg is not used.
  *  \returns the newly allocated msghdr or NULL in case of error */
-struct iofd_msghdr *iofd_msghdr_alloc(struct osmo_io_fd *iofd, enum 
iofd_msg_action action, struct msgb *msg)
+struct iofd_msghdr *iofd_msghdr_alloc(struct osmo_io_fd *iofd, enum 
iofd_msg_action action, struct msgb *msg,
+                                     size_t cmsg_size)
 {
        bool free_msg = false;
        struct iofd_msghdr *hdr;
@@ -120,7 +122,7 @@
                talloc_steal(iofd, msg);
        }

-       hdr = talloc_zero(iofd, struct iofd_msghdr);
+       hdr = talloc_zero_size(iofd, sizeof(struct iofd_msghdr) + cmsg_size);
        if (!hdr) {
                if (free_msg)
                        talloc_free(msg);
@@ -339,8 +341,10 @@
        case OSMO_IO_FD_MODE_RECVFROM_SENDTO:
                iofd->io_ops.recvfrom_cb(iofd, rc, msg, &hdr->osa);
                break;
-       case OSMO_IO_FD_MODE_SCTP_RECVMSG_SEND:
-               /* TODO Implement */
+       case OSMO_IO_FD_MODE_RECVMSG_SENDMSG:
+               iofd->io_ops.recvmsg_cb(iofd, rc, msg, &hdr->hdr);
+               break;
+       default:
                OSMO_ASSERT(false);
                break;
        }
@@ -378,6 +382,9 @@
        case IOFD_ACT_SENDTO:
                iofd->io_ops.sendto_cb(iofd, rc, msg, &msghdr->osa);
                break;
+       case IOFD_ACT_SENDMSG:
+               iofd->io_ops.sendmsg_cb(iofd, rc, msg);
+               break;
        default:
                OSMO_ASSERT(0);
        }
@@ -408,7 +415,7 @@
                return -EINVAL;
        }
 
-       struct iofd_msghdr *msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_WRITE, 
msg);
+       struct iofd_msghdr *msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_WRITE, 
msg, 0);
        if (!msghdr)
                return -ENOMEM;

@@ -450,7 +457,7 @@
                return -EINVAL;
        }

-       struct iofd_msghdr *msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_SENDTO, 
msg);
+       struct iofd_msghdr *msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_SENDTO, 
msg, 0);
        if (!msghdr)
                return -ENOMEM;

@@ -475,16 +482,97 @@
        return 0;
 }

+/*! ismo_io equivalent of the sendmsg(2) socket API call
+ *
+ *  Appends the message to the internal transmit queue.
+ *  If the function returns success (0), it will take ownership of the msgb and
+ *  internally call msgb_free() after the write request completes.
+ *  In case of an error the msgb needs to be freed by the caller.
+ *  \param[in] iofd file descriptor to write to
+ *  \param[in] msg message buffer to send; is used to fill msgh->iov[]
+ *  \param[in] sendmsg_flags Flags to pass to the send call
+ *  \param[in] msgh 'struct msghdr' for name/control/flags. iov must be empty!
+ *  \returns 0 in case of success; a negative value in case of error
+ */
+int osmo_iofd_sendmsg_msgb(struct osmo_io_fd *iofd, struct msgb *msg, int 
sendmsg_flags, const struct msghdr *msgh)
+{
+       int rc;
+       struct iofd_msghdr *msghdr;
+
+       OSMO_ASSERT(iofd->mode == OSMO_IO_FD_MODE_RECVMSG_SENDMSG);
+       if (OSMO_UNLIKELY(!iofd->io_ops.sendmsg_cb)) {
+               LOGPIO(iofd, LOGL_ERROR, "sendmsg_cb not set, Rejecting 
msgb\n");
+               return -EINVAL;
+       }
+
+       if (OSMO_UNLIKELY(msgh->msg_namelen > sizeof(msghdr->osa))) {
+               LOGPIO(iofd, LOGL_ERROR, "osmo_iofd_sendmsg msg_namelen (%u) > 
supported %zu bytes\n",
+                       msgh->msg_namelen, sizeof(msghdr->osa));
+               return -EINVAL;
+       }
+
+       if (OSMO_UNLIKELY(msgh->msg_iovlen)) {
+               LOGPIO(iofd, LOGL_ERROR, "osmo_iofd_sendmsg must have all in 
'struct msgb', not in 'msg_iov'\n");
+               return -EINVAL;
+       }
+
+       msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_SENDMSG, msg, 
msgh->msg_controllen);
+       if (!msghdr)
+               return -ENOMEM;
+
+       /* copy over optional address */
+       if (msgh->msg_name) {
+               memcpy(&msghdr->osa, msgh->msg_name, msgh->msg_namelen);
+               msghdr->hdr.msg_name = &msghdr->osa.u.sa;
+               msghdr->hdr.msg_namelen = msgh->msg_namelen;
+       }
+
+       /* build iov from msgb */
+       msghdr->iov[0].iov_base = msgb_data(msghdr->msg);
+       msghdr->iov[0].iov_len = msgb_length(msghdr->msg);
+       msghdr->hdr.msg_iov = &msghdr->iov[0];
+       msghdr->hdr.msg_iovlen = 1;
+
+       /* copy over the cmsg from the msghdr */
+       if (msgh->msg_control && msgh->msg_controllen) {
+               msghdr->hdr.msg_control = msghdr->cmsg;
+               msghdr->hdr.msg_controllen = msgh->msg_controllen;
+               memcpy(msghdr->cmsg, msgh->msg_control, msgh->msg_controllen);
+       }
+
+       /* copy over msg_flags */
+       msghdr->hdr.msg_flags = sendmsg_flags;
+
+       rc = iofd_txqueue_enqueue(iofd, msghdr);
+       if (rc < 0) {
+               iofd_msghdr_free(msghdr);
+               LOGPIO(iofd, LOGL_ERROR, "enqueueing message failed (%d). 
Rejecting msgb\n", rc);
+               return rc;
+       }
+
+       return 0;
+}
+
 static int check_mode_callback_compat(enum osmo_io_fd_mode mode, const struct 
osmo_io_ops *ops)
 {
        switch (mode) {
        case OSMO_IO_FD_MODE_READ_WRITE:
                if (ops->recvfrom_cb || ops->sendto_cb)
                        return false;
+               if (ops->recvmsg_cb || ops->sendmsg_cb)
+                       return false;
                break;
        case OSMO_IO_FD_MODE_RECVFROM_SENDTO:
                if (ops->read_cb || ops->write_cb)
                        return false;
+               if (ops->recvmsg_cb || ops->sendmsg_cb)
+                       return false;
+               break;
+       case OSMO_IO_FD_MODE_RECVMSG_SENDMSG:
+               if (ops->recvfrom_cb || ops->sendto_cb)
+                       return false;
+               if (ops->read_cb || ops->write_cb)
+                       return false;
                break;
        default:
                break;
@@ -511,6 +599,7 @@
        switch (mode) {
        case OSMO_IO_FD_MODE_READ_WRITE:
        case OSMO_IO_FD_MODE_RECVFROM_SENDTO:
+       case OSMO_IO_FD_MODE_RECVMSG_SENDMSG:
                break;
        default:
                return NULL;
@@ -547,6 +636,16 @@
        return iofd;
 }

+/*! Set the size of the control message buffer allocated when submitting 
recvmsg */
+int osmo_iofd_set_cmsg_size(struct osmo_io_fd *iofd, size_t cmsg_size)
+{
+       if (iofd->mode != OSMO_IO_FD_MODE_RECVMSG_SENDMSG)
+               return -EINVAL;
+
+       iofd->cmsg_size = cmsg_size;
+       return 0;
+}
+
 /*! Register the fd with the underlying backend.
  *
  *  \param[in] iofd the iofd file descriptor
@@ -567,7 +666,8 @@

        IOFD_FLAG_UNSET(iofd, IOFD_FLAG_CLOSED);
        if ((iofd->mode == OSMO_IO_FD_MODE_READ_WRITE && iofd->io_ops.read_cb) 
||
-           (iofd->mode == OSMO_IO_FD_MODE_RECVFROM_SENDTO && 
iofd->io_ops.recvfrom_cb)) {
+           (iofd->mode == OSMO_IO_FD_MODE_RECVFROM_SENDTO && 
iofd->io_ops.recvfrom_cb) ||
+           (iofd->mode == OSMO_IO_FD_MODE_RECVMSG_SENDMSG && 
iofd->io_ops.recvmsg_cb)) {
                osmo_iofd_ops.read_enable(iofd);
        }

@@ -767,7 +867,12 @@
                else
                        osmo_iofd_ops.read_disable(iofd);
                break;
-       case OSMO_IO_FD_MODE_SCTP_RECVMSG_SEND:
+       case OSMO_IO_FD_MODE_RECVMSG_SENDMSG:
+               if (iofd->io_ops.recvmsg_cb)
+                       osmo_iofd_ops.read_enable(iofd);
+               else
+                       osmo_iofd_ops.read_disable(iofd);
+               break;
        default:
                OSMO_ASSERT(0);
        }
@@ -780,7 +885,8 @@
  *  \param[in] iofd the file descriptor */
 void osmo_iofd_notify_connected(struct osmo_io_fd *iofd)
 {
-       OSMO_ASSERT(iofd->mode == OSMO_IO_FD_MODE_READ_WRITE);
+       OSMO_ASSERT(iofd->mode == OSMO_IO_FD_MODE_READ_WRITE ||
+                   iofd->mode == OSMO_IO_FD_MODE_RECVMSG_SENDMSG);
        OSMO_ASSERT(osmo_iofd_ops.notify_connected);
        osmo_iofd_ops.notify_connected(iofd);
 }
diff --git a/src/core/osmo_io_internal.h b/src/core/osmo_io_internal.h
index 9c86e05..af47a3d 100644
--- a/src/core/osmo_io_internal.h
+++ b/src/core/osmo_io_internal.h
@@ -4,6 +4,7 @@

 #include <unistd.h>
 #include <stdbool.h>
+#include <netinet/sctp.h>

 #include <osmocom/core/osmo_io.h>
 #include <osmocom/core/linuxlist.h>
@@ -72,6 +73,9 @@
        /*! private number, extending \a data */
        unsigned int priv_nr;

+       /*! size of iofd_msghdr.cmsg[] when allocated in recvmsg path */
+       size_t cmsg_size;
+
        struct {
                /*! talloc context from which to allocate msgb when reading */
                const void *ctx;
@@ -109,7 +113,8 @@
        IOFD_ACT_WRITE,
        IOFD_ACT_RECVFROM,
        IOFD_ACT_SENDTO,
-       // TODO: SCTP_*
+       IOFD_ACT_RECVMSG,
+       IOFD_ACT_SENDMSG,
 };


@@ -132,6 +137,9 @@
        struct msgb *msg;
        /*! I/O file descriptor on which we perform this I/O operation */
        struct osmo_io_fd *iofd;
+
+       /*! control message buffer for passing sctp_sndrcvinfo along */
+       char cmsg[0]; /* size is determined by iofd->cmsg_size on recvmsg, and 
by mcghdr->msg_controllen on sendmsg */
 };

 enum iofd_seg_act {
@@ -140,7 +148,7 @@
        IOFD_SEG_ACT_DEFER,
 };

-struct iofd_msghdr *iofd_msghdr_alloc(struct osmo_io_fd *iofd, enum 
iofd_msg_action action, struct msgb *msg);
+struct iofd_msghdr *iofd_msghdr_alloc(struct osmo_io_fd *iofd, enum 
iofd_msg_action action, struct msgb *msg, size_t cmsg_size);
 void iofd_msghdr_free(struct iofd_msghdr *msghdr);

 struct msgb *iofd_msgb_alloc(struct osmo_io_fd *iofd);
diff --git a/src/core/osmo_io_poll.c b/src/core/osmo_io_poll.c
index 8398a30..52e806d 100644
--- a/src/core/osmo_io_poll.c
+++ b/src/core/osmo_io_poll.c
@@ -49,6 +49,7 @@

        if (what & OSMO_FD_READ) {
                struct iofd_msghdr hdr;
+
                msg = iofd_msgb_pending_or_alloc(iofd);
                if (!msg) {
                        LOGPIO(iofd, LOGL_ERROR, "Could not allocate msgb for 
reading\n");
@@ -64,6 +65,10 @@
                        .msg_name = &hdr.osa.u.sa,
                        .msg_namelen = sizeof(struct osmo_sockaddr),
                };
+               if (iofd->mode == OSMO_IO_FD_MODE_RECVMSG_SENDMSG) {
+                       hdr.hdr.msg_control = alloca(iofd->cmsg_size);
+                       hdr.hdr.msg_controllen = iofd->cmsg_size;
+               }

                rc = recvmsg(ofd->fd, &hdr.hdr, flags);
                if (rc > 0)
@@ -90,13 +95,15 @@
                        case OSMO_IO_FD_MODE_RECVFROM_SENDTO:
                                iofd->io_ops.sendto_cb(iofd, 0, NULL, NULL);
                                break;
+                       case OSMO_IO_FD_MODE_RECVMSG_SENDMSG:
+                               iofd->io_ops.sendmsg_cb(iofd, 0, NULL);
+                               break;
                        default:
                                break;
                        }
                        if (osmo_iofd_txqueue_len(iofd) == 0)
                                iofd_poll_ops.write_disable(iofd);
                }
-
        }
 }

diff --git a/src/core/osmo_io_uring.c b/src/core/osmo_io_uring.c
index 24d1e08..aa9df85 100644
--- a/src/core/osmo_io_uring.c
+++ b/src/core/osmo_io_uring.c
@@ -3,6 +3,7 @@
  *
  * (C) 2022-2023 by sysmocom s.f.m.c.
  * Author: Daniel Willmann <dan...@sysmocom.de>
+ * (C) 2023-2024 by Harald Welte <lafo...@osmocom.org>
  *
  * All Rights Reserved.
  *
@@ -35,6 +36,8 @@
 #include <stdbool.h>
 #include <errno.h>

+#include <netinet/in.h>
+#include <netinet/sctp.h>
 #include <sys/eventfd.h>
 #include <liburing.h>

@@ -114,7 +117,7 @@
                OSMO_ASSERT(0);
        }

-       msghdr = iofd_msghdr_alloc(iofd, action, msg);
+       msghdr = iofd_msghdr_alloc(iofd, action, msg, iofd->cmsg_size);
        if (!msghdr) {
                LOGPIO(iofd, LOGL_ERROR, "Could not allocate msghdr for 
reading\n");
                OSMO_ASSERT(0);
@@ -126,6 +129,10 @@
        switch (action) {
        case IOFD_ACT_READ:
                break;
+       case IOFD_ACT_RECVMSG:
+               msghdr->hdr.msg_control = msghdr->cmsg;
+               msghdr->hdr.msg_controllen = iofd->cmsg_size;
+               /* fall-through */
        case IOFD_ACT_RECVFROM:
                msghdr->hdr.msg_iov = &msghdr->iov[0];
                msghdr->hdr.msg_iovlen = 1;
@@ -146,6 +153,7 @@
        case IOFD_ACT_READ:
                io_uring_prep_readv(sqe, iofd->fd, msghdr->iov, 1, 0);
                break;
+       case IOFD_ACT_RECVMSG:
        case IOFD_ACT_RECVFROM:
                io_uring_prep_recvmsg(sqe, iofd->fd, &msghdr->hdr, 
msghdr->flags);
                break;
@@ -210,10 +218,12 @@
        switch (msghdr->action) {
        case IOFD_ACT_READ:
        case IOFD_ACT_RECVFROM:
+       case IOFD_ACT_RECVMSG:
                iofd_uring_handle_recv(msghdr, res);
                break;
        case IOFD_ACT_WRITE:
        case IOFD_ACT_SENDTO:
+       case IOFD_ACT_SENDMSG:
                iofd_uring_handle_tx(msghdr, res);
                break;
        default:
@@ -273,6 +283,7 @@
        switch (msghdr->action) {
        case IOFD_ACT_WRITE:
        case IOFD_ACT_SENDTO:
+       case IOFD_ACT_SENDMSG:
                io_uring_prep_sendmsg(sqe, msghdr->iofd->fd, &msghdr->hdr, 
msghdr->flags);
                break;
        default:
@@ -334,21 +345,20 @@
                        LOGPIO(iofd, LOGL_ERROR, "Could not allocate msgb for 
writing\n");
                        OSMO_ASSERT(0);
                }
-               msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_WRITE, msg);
+               msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_WRITE, msg, 0);
                if (!msghdr) {
                        LOGPIO(iofd, LOGL_ERROR, "Could not allocate msghdr for 
writing\n");
                        OSMO_ASSERT(0);
                }

                msghdr->iov[0].iov_base = msgb_data(msg);
-               msghdr->iov[0].iov_len = msgb_tailroom(msg);
+               msghdr->iov[0].iov_len = msgb_length(msg);

                sqe = io_uring_get_sqe(&g_ring.ring);
                if (!sqe) {
                        LOGPIO(iofd, LOGL_ERROR, "Could not get 
io_uring_sqe\n");
                        OSMO_ASSERT(0);
                }
-               // Prep msgb/iov
                io_uring_prep_writev(sqe, iofd->fd, msghdr->iov, 1, 0);
                io_uring_sqe_set_data(sqe, msghdr);

@@ -376,6 +386,9 @@
        case OSMO_IO_FD_MODE_RECVFROM_SENDTO:
                iofd_uring_submit_recv(iofd, IOFD_ACT_RECVFROM);
                break;
+       case OSMO_IO_FD_MODE_RECVMSG_SENDMSG:
+               iofd_uring_submit_recv(iofd, IOFD_ACT_RECVMSG);
+               break;
        default:
                OSMO_ASSERT(0);
        }

--
To view, visit https://gerrit.osmocom.org/c/libosmocore/+/35068?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://gerrit.osmocom.org/settings

Gerrit-Project: libosmocore
Gerrit-Branch: master
Gerrit-Change-Id: I89eb519b22d21011d61a7855b2364bc3c295df82
Gerrit-Change-Number: 35068
Gerrit-PatchSet: 12
Gerrit-Owner: laforge <lafo...@osmocom.org>
Gerrit-Reviewer: Jenkins Builder
Gerrit-Reviewer: daniel <dwillm...@sysmocom.de>
Gerrit-Reviewer: laforge <lafo...@osmocom.org>
Gerrit-Reviewer: pespin <pes...@sysmocom.de>
Gerrit-CC: fixeria <vyanits...@sysmocom.de>
Gerrit-CC: jolly <andr...@eversberg.eu>
Gerrit-MessageType: merged

Reply via email to