daniel has uploaded this change for review. ( https://gerrit.osmocom.org/c/libosmocore/+/30934 )
Change subject: Add osmo_io with initial poll backend ...................................................................... Add osmo_io with initial poll backend * make backend configurable for later * segmentation callback for chunked streams * logging target for osmo_io * support partial writes Change-Id: I50d73cf550d6ce8154bf827bf47408131cf5b0a0 Related: SYS#5094, OS#5751 --- M TODO-RELEASE M include/Makefile.am M include/osmocom/core/logging.h A include/osmocom/core/osmo_io.h M src/Makefile.am M src/logging.c A src/osmo_io.c A src/osmo_io_internal.h A src/osmo_io_poll.c 9 files changed, 851 insertions(+), 1 deletion(-) git pull ssh://gerrit.osmocom.org:29418/libosmocore refs/changes/34/30934/1 diff --git a/TODO-RELEASE b/TODO-RELEASE index 6ac8db8..ef629c7 100644 --- a/TODO-RELEASE +++ b/TODO-RELEASE @@ -9,3 +9,4 @@ #library what description / commit summary line libosmocore new API osmo_sockaddr_is_any() libosmocore ABI breakage OSMO_NUM_DLIB change affecting internal_cat[] +libosmocore new API osmo_io_* diff --git a/include/Makefile.am b/include/Makefile.am index a234014..b59009d 100644 --- a/include/Makefile.am +++ b/include/Makefile.am @@ -37,6 +37,7 @@ osmocom/core/log2.h \ osmocom/core/logging.h \ osmocom/core/loggingrb.h \ + osmocom/core/osmo_io.h \ osmocom/core/stats.h \ osmocom/core/macaddr.h \ osmocom/core/msgb.h \ diff --git a/include/osmocom/core/logging.h b/include/osmocom/core/logging.h index 5ae38c7..8e30a20 100644 --- a/include/osmocom/core/logging.h +++ b/include/osmocom/core/logging.h @@ -154,7 +154,8 @@ #define DLCSN1 -26 /*!< CSN.1 (Concrete Syntax Notation 1) codec */ #define DLM2PA -27 /*!< Osmocom M2PA (libosmo-sigtran) */ #define DLM2UA -28 /*!< Reserved for future Osmocom M2UA (libosmo-sigtran) */ -#define OSMO_NUM_DLIB 28 /*!< Number of logging sub-systems in libraries */ +#define DLIO -29 /*!< Osmocom IO sub-system */ +#define OSMO_NUM_DLIB 29 /*!< Number of logging sub-systems in libraries */ /* Colors that can be used in log_info_cat.color */ #define OSMO_LOGCOLOR_NORMAL NULL diff --git a/include/osmocom/core/osmo_io.h b/include/osmocom/core/osmo_io.h new file mode 100644 index 0000000..4fac3f1 --- /dev/null +++ b/include/osmocom/core/osmo_io.h @@ -0,0 +1,56 @@ +/*! \file osmo_io.h + * io(_uring) abstraction osmo fd compatibility + */ + +#pragma once + +#include <osmocom/core/linuxlist.h> +#include <osmocom/core/msgb.h> +#include <osmocom/core/socket.h> + +struct osmo_io_fd; + +enum osmo_io_fd_mode { + /*! use read() / write() calls */ + OSMO_IO_FD_MODE_READ_WRITE, + /*! use recvfrom() / sendto() calls */ + OSMO_IO_FD_MODE_RECVFROM_SENDTO, + /*! emulate sctp_recvmsg() and sctp_sendmsg() */ + OSMO_IO_FD_MODE_SCTP_RECVMSG_SENDMSG, +}; + +enum osmo_io_backend { + OSMO_IO_BACKEND_POLL, +}; + +struct osmo_io_ops { + /*! call-back function when something was read from fd */ + void (*read_cb)(struct osmo_io_fd *, int res, struct msgb *); + /*! call-back function when write has completed on fd */ + void (*write_cb)(struct osmo_io_fd *, int res, struct msgb *); + + /*! call-back function emulating sendto */ + void (*sendmsg_cb)(struct osmo_io_fd *, int res, struct msgb *); + /*! call-back function emulating recvfrom */ + void (*recvmsg_cb)(struct osmo_io_fd *, int res, struct msgb *, struct osmo_sockaddr *); + + /*! call-back function to segment the data returned by read_cb */ + int (*segmentation_cb)(struct msgb *, int data_len); +}; + +void osmo_io_init(); + +struct osmo_io_fd *osmo_iofd_setup(const void *ctx, unsigned int size, unsigned int headroom, + int fd, const char *name, enum osmo_io_fd_mode mode, + const struct osmo_io_ops *ioops, void *data, unsigned int priv_nr); +void osmo_iofd_close(struct osmo_io_fd *iofd); +int osmo_iofd_write_msgb(struct osmo_io_fd *iofd, struct msgb *msg); +void osmo_iofd_read_enable(struct osmo_io_fd *iofd); +void osmo_iofd_read_enable(struct osmo_io_fd *iofd); + +int osmo_iofd_sendto_msgb(struct osmo_io_fd *iofd, struct msgb *msg, + const struct osmo_sockaddr *dest); + +void *osmo_iofd_get_data(struct osmo_io_fd *iofd); +unsigned int osmo_iofd_get_priv(struct osmo_io_fd *iofd); +int osmo_iofd_get_fd(struct osmo_io_fd *iofd); diff --git a/src/Makefile.am b/src/Makefile.am index 2c73af6..ba0926e 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -34,6 +34,7 @@ it_q.c \ probes.d \ base64.c \ + osmo_io.c osmo_io_poll.c \ $(NULL) if HAVE_SSSE3 diff --git a/src/logging.c b/src/logging.c index ce42e4c..196ca37 100644 --- a/src/logging.c +++ b/src/logging.c @@ -313,6 +313,12 @@ .enabled = 1, .loglevel = LOGL_NOTICE, .color = "\033[38;5;11m", }, + [INT2IDX(DLIO)] = { + .name = "DLIO", + .description = "libosmocore IO Subsystem", + .enabled = 1, .loglevel = LOGL_NOTICE, + .color = "\033[38;5;67m", + }, }; void assert_loginfo(const char *src) diff --git a/src/osmo_io.c b/src/osmo_io.c new file mode 100644 index 0000000..a896888 --- /dev/null +++ b/src/osmo_io.c @@ -0,0 +1,440 @@ +/*! \file osmo_io.c + * New osmocom async I/O API. + * + * (C) 2022 by Harald Welte <lafo...@osmocom.org> + * (C) 2022 by sysmocom - s.f.m.c. GmbH <i...@sysmocom.de> + * Author: Daniel Willmann <dwillm...@sysmocom.de> + * + * All Rights Reserved. + * + * SPDX-License-Identifier: GPL-2.0+ + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + */ + + +#include <fcntl.h> +#include <stdio.h> +#include <stdlib.h> +#include <talloc.h> +#include <unistd.h> +#include <string.h> +#include <stdbool.h> +#include <errno.h> + +#include <osmocom/core/osmo_io.h> +#include <osmocom/core/linuxlist.h> +#include <osmocom/core/logging.h> +#include <osmocom/core/msgb.h> +#include <osmocom/core/socket.h> +#include <osmocom/core/talloc.h> +#include <osmocom/core/utils.h> + +#include "../config.h" +#include "osmo_io_internal.h" + +extern struct iofd_backend_ops iofd_poll_ops; +#define OSMO_IO_BACKEND_DEFAULT "POLL" + +static enum osmo_io_backend g_backend; + +/* Used by some tests, can't be static */ +struct iofd_backend_ops g_iofd_ops; + +/*! initialize osmo_io for the current thread */ +void osmo_iofd_init() +{ + switch (g_backend) { + case OSMO_IO_BACKEND_POLL: + break; + default: + OSMO_ASSERT(0); + break; + } +} + +/*! ensure main thread always has pre-initialized osmo_io */ +static __attribute__((constructor)) void on_dso_load_select(void) +{ + char *backend = getenv("LIBOSMO_IO_BACKEND"); + if (backend == NULL) + backend = OSMO_IO_BACKEND_DEFAULT; + + if (!strcmp("POLL", backend)) { + g_backend = OSMO_IO_BACKEND_POLL; + g_iofd_ops = iofd_poll_ops; + } else { + OSMO_ASSERT(0); + } + + osmo_iofd_init(); +} + +/*! Allocate the msghdr + * \param[in] iofd the osmo_io file structure + * \param[in] action the action this msg(hdr) is for (read, write, ..) + * \param[in] msg the msg buffer to use. Will allocate a new one if NULL + * \returns the newly allocated msghdr or NULL in case of error */ +struct iofd_msghdr *iofd_msghdr_alloc(struct osmo_io_fd *iofd, enum iofd_msg_action action, struct msgb *msg) +{ + struct iofd_msghdr *hdr = talloc_zero(iofd, struct iofd_msghdr); + if (!hdr) + return NULL; + if (!msg) { + msg = iofd_msgb_alloc(iofd); + if (!msg) { + talloc_free(hdr); + return NULL; + } + } + + hdr->action = action; + hdr->iofd = iofd; + hdr->msg = msg; + + return hdr; +} + +/*! Free the msghdr + * \param[in] msghdr the msghdr to free + */ +void iofd_msghdr_free(struct iofd_msghdr *msghdr) +{ + talloc_free(msghdr); +} + +/*! convenience wrapper to call msgb_alloc with parameters from osmo_io_fd */ +struct msgb *iofd_msgb_alloc(struct osmo_io_fd *iofd) +{ + uint16_t headroom = iofd->msgb_alloc.headroom; + + OSMO_ASSERT(iofd->msgb_alloc.size < 0xffff - headroom); + return msgb_alloc_headroom_c(iofd->msgb_alloc.ctx, + iofd->msgb_alloc.size + headroom, headroom, iofd->name); +} + +/*! return the pending msgb in iofd or NULL if there is none*/ +struct msgb *iofd_msgb_pending(struct osmo_io_fd *iofd) +{ + struct msgb *msg = NULL; + + msg = iofd->pending; + iofd->pending = NULL; + + return msg; +} + +/*! Return the pending msgb or allocate and return a new one */ +struct msgb *iofd_msgb_pending_or_alloc(struct osmo_io_fd *iofd) +{ + struct msgb *msg = NULL; + + msg = iofd->pending; + iofd->pending = NULL; + + if (!msg) { + msg = iofd_msgb_alloc(iofd); + } + + return msg; +} + +/*! Enqueue a message to be sent + * + * Enqueues the message at the back of the queue provided there is enough space. + * \param[in] iofd the file descriptor + * \param[in] msghdr the message to enqueue + * \returns 0 if the message was enqueued succcessfully, + * -ENOSPC if the queue already contains the maximum number of messages + */ +int iofd_txqueue_enqueue(struct osmo_io_fd *iofd, struct iofd_msghdr *msghdr) +{ + if (iofd->tx_queue.current_length >= iofd->tx_queue.max_length) + return -ENOSPC; + + llist_add_tail(&msghdr->list, &iofd->tx_queue.msg_queue); + iofd->tx_queue.current_length++; + + return 0; +} + +/*! Enqueue a message at the front + * + * Used to enqueue a msgb from a partial send again. This function will allways + * enqueue the message, even if the maximum number of messages is reached. + * \param[in] iofd the file descriptor + * \param[in] msghdr the message to enqueue + */ +void iofd_txqueue_enqueue_front(struct osmo_io_fd *iofd, struct iofd_msghdr *msghdr) +{ + llist_add(&msghdr->list, &iofd->tx_queue.msg_queue); + iofd->tx_queue.current_length++; +} + +/*! Dequeue a message from the front + * + * \param[in] iofd the file descriptor + * \returns the msghdr from the front of the queue or NULL if the queue is empty + */ +struct iofd_msghdr *iofd_txqueue_dequeue(struct osmo_io_fd *iofd) +{ + struct llist_head *lh; + + if (iofd->tx_queue.current_length == 0) + return NULL; + + lh = iofd->tx_queue.msg_queue.next; + + OSMO_ASSERT(lh); + iofd->tx_queue.current_length--; + llist_del(lh); + + return llist_entry(lh, struct iofd_msghdr, list); +} + +enum iofd_seg_act iofd_handle_segmentation(struct osmo_io_fd *iofd, struct msgb *msg) +{ + int pending_len, msg_len; + struct msgb *msg_pending; + + msg_len = msgb_length(msg); + + if (!iofd->io_ops.segmentation_cb) + return IOFD_SEG_ACT_HANDLE_ONE; + + int len = iofd->io_ops.segmentation_cb(msg, msg_len); + + pending_len = msg_len - len; + /* No segmentation needed, return */ + if (pending_len == 0) { + return IOFD_SEG_ACT_HANDLE_ONE; + } else if (pending_len < 0) { + iofd->pending = msg; + return IOFD_SEG_ACT_DEFER; + } + + /* Copy the superfluous data over */ + msg_pending = iofd_msgb_alloc(iofd); + memcpy(msgb_data(msg_pending), msgb_data(msg)+ len, pending_len); + msgb_put(msg_pending, pending_len); + iofd->pending = msg_pending; + + /* Trim the original msgb to size */ + msgb_trim(msg, len); + return IOFD_SEG_ACT_HANDLE_MORE; +} + +/*! Restore message boundaries on read() and pass individual messages to the read callback + */ +void iofd_handle_segmented_read(struct osmo_io_fd *iofd, struct msgb *msg, int rc) { + int res; + + if (rc <= 0) { + iofd->io_ops.read_cb(iofd, rc, msg); + return; + } + + do { + res = iofd_handle_segmentation(iofd, msg); + if (res != IOFD_SEG_ACT_DEFER || rc < 0) + iofd->io_ops.read_cb(iofd, rc, msg); + if (res == IOFD_SEG_ACT_HANDLE_MORE) + msg = iofd_msgb_pending(iofd); + } while (res == IOFD_SEG_ACT_HANDLE_MORE); +} + +/* Public functions */ + +/*! Send a message through a connected socket + * + * Appends the message to the internal transmit queue. + * If the function returns success (0) it will take ownership of the msgb and + * internally call msgb_free() after the write request completes. + * In case of an error the msgb needs to be freed by the caller. + * \param[in] iofd file descriptor to write to + * \param[in] msg message buffer to write + * \returns 0 in case of success; a negative value in case of error + */ +int osmo_iofd_write_msgb(struct osmo_io_fd *iofd, struct msgb *msg) +{ + int rc; + struct iofd_msghdr *msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_WRITE, msg); + if (!msghdr) { + return -ENOMEM; + } + + + rc = iofd_txqueue_enqueue(iofd, msghdr); + if (rc < 0) { + iofd_msghdr_free(msghdr); + LOGP(DLIO, LOGL_ERROR, "iofd(%s) enqueueing message failed (%i). Rejecting msgb\n", iofd->name, rc); + return rc; + } + + g_iofd_ops.write_enable(iofd); + + return 0; +} + +/*! Send a message through an unconnected socket + * + * Appends the message to the internal transmit queue. + * If the function returns success (0), it will take ownership of the msgb and + * internally call msgb_free() after the write request completes. + * In case of an error the msgb needs to be freed by the caller. + * \param[in] iofd file descriptor to write to + * \param[in] msg message buffer to send + * \param[in] dest destination address to send the message to + * \returns 0 in case of success; a negative value in case of error + */ +int osmo_iofd_sendto_msgb(struct osmo_io_fd *iofd, struct msgb *msg, const struct osmo_sockaddr *dest) +{ + int rc; + + OSMO_ASSERT(iofd->mode == OSMO_IO_FD_MODE_RECVFROM_SENDTO); + + struct iofd_msghdr *msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_SENDTO, msg); + if (!msghdr) { + return -ENOMEM; + } + + msghdr->sa = *dest; + + rc = iofd_txqueue_enqueue(iofd, msghdr); + if (rc < 0) { + iofd_msghdr_free(msghdr); + LOGP(DLIO, LOGL_ERROR, "iofd(%s) enqueueing message failed (%i). Rejecting msgb\n", iofd->name, rc); + return rc; + } + + g_iofd_ops.write_enable(iofd); + + return 0; +} + +/*! Enable reading from this iofd + * + * \param[in] iofd the file descriptor + */ +void osmo_iofd_read_enable(struct osmo_io_fd *iofd) +{ + g_iofd_ops.read_enable(iofd); +} + +/*! Disable reading from this iofd + * + * The read_cb can still be called for messages that arrived at the socket + * before this function was called. + * \param[in] iofd the file descriptor + */ +void osmo_iofd_read_disable(struct osmo_io_fd *iofd) +{ + g_iofd_ops.read_disable(iofd); +} + +/*! Allocate and setup a new iofd + * \param[in] ctx the parent context from which to allocate + * \param[in] size the size of the msgb when receiving data + * \param[in] headroom the headroom of the msgb when receiving data + * \param[in] fd the underlying system file descriptor + * \param[in] name the name of the iofd + * \param[in] mode the mode of the iofd, whether it should use read()/write(), sendto()/recvfrom() + * \param[in] ioops structure with read/write/send/recv callbacks + * \param[in] data user data pointer accessible by the ioops callbacks + * \param[in] priv_nr user data storage accessible by the ioops callbacks + * \returns The newly allocated osmo_io_fd struct or NULL on failure + */ +struct osmo_io_fd *osmo_iofd_setup(const void *ctx, unsigned int size, unsigned int headroom, + int fd, const char *name, enum osmo_io_fd_mode mode, + const struct osmo_io_ops *ioops, void *data, unsigned int priv_nr) +{ + struct osmo_io_fd *iofd = talloc_zero(ctx, struct osmo_io_fd); + if (!iofd) + return NULL; + + iofd->fd = fd; + iofd->mode = mode; + + iofd->name = talloc_strdup(iofd, name); + + if (ioops) + iofd->io_ops = *ioops; + + iofd->pending = NULL; + + iofd->data = data; + iofd->priv_nr = priv_nr; + + iofd->msgb_alloc.ctx = ctx; + iofd->msgb_alloc.size = size; + iofd->msgb_alloc.headroom = headroom; + + iofd->tx_queue.max_length = 32; + INIT_LLIST_HEAD(&iofd->tx_queue.msg_queue); + + if (g_iofd_ops.setup) { + int rc = g_iofd_ops.setup(iofd); + if (rc < 0) + goto err; + } + LOGP(DLIO, LOGL_DEBUG, "iofd(%s) using backend %s\n", iofd->name, g_backend == OSMO_IO_BACKEND_POLL ? "poll" : "uring"); + + return iofd; +err: + talloc_free(iofd); + return NULL; +} + +/*! Close the iofd + * + * This function is safe to use in the read/write callbacks and will defer freeing it until safe to do so. + * \param[in] iofd the file descriptor + */ +void osmo_iofd_close(struct osmo_io_fd* iofd) +{ + iofd->closing = true; + + // Free pending msgs in tx queue + msgb_queue_free(&iofd->tx_queue.msg_queue); + if (iofd->pending) + msgb_free(iofd->pending); + + iofd->pending = NULL; + + if (g_iofd_ops.close && g_iofd_ops.close(iofd)) + talloc_free(iofd); +} + +/*! Get the associated user-data from an iofd + * \param[in] iofd the file descriptor + * \returns the data that was previously set with \ref osmo_iofd_setup() + */ +void *osmo_iofd_get_data(struct osmo_io_fd *iofd) +{ + return iofd->data; +} + +/*! Get the private number from an iofd + * \param[in] iofd the file descriptor + * \returns the private number that was previously set with \ref osmo_iofd_setup() + */ +unsigned int osmo_iofd_get_priv(struct osmo_io_fd *iofd) +{ + return iofd->priv_nr; +} + +/*! Get the underlying file descriptor from an iofd + * \param[in] iofd the file descriptor + * \returns the underlying file descriptor number */ +int osmo_iofd_get_fd(struct osmo_io_fd *iofd) +{ + return iofd->fd; +} \ No newline at end of file diff --git a/src/osmo_io_internal.h b/src/osmo_io_internal.h new file mode 100644 index 0000000..f0fc791 --- /dev/null +++ b/src/osmo_io_internal.h @@ -0,0 +1,125 @@ +/*! \file osmo_io_internal.h */ + +#pragma once + +#include <unistd.h> +#include <stdbool.h> + +#include <osmocom/core/osmo_io.h> +#include <osmocom/core/linuxlist.h> +#include <osmocom/core/msgb.h> +#include <osmocom/core/select.h> +#include <osmocom/core/socket.h> + +#include "../config.h" + +struct iofd_backend_ops { + int (*setup)(struct osmo_io_fd *iofd); + int (*close)(struct osmo_io_fd *iofd); + void (*write_enable)(struct osmo_io_fd *iofd); + void (*write_disable)(struct osmo_io_fd *iofd); + void (*read_enable)(struct osmo_io_fd *iofd); + void (*read_disable)(struct osmo_io_fd *iofd); +}; + +struct osmo_io_fd { + /*! linked list for internal management */ + struct llist_head list; + /*! actual operating-system level file decriptor */ + int fd; + /*! type of read/write mode to use */ + enum osmo_io_fd_mode mode; + + /*! flags to guard closing/freeing of iofd */ + bool closing; + bool in_callback; + + /*! human-readable name to associte with fd */ + const char *name; + + /*! send/recv (msg) callback functions */ + struct osmo_io_ops io_ops; + /*! Pending msgb to keep partial data during segmentation */ + struct msgb *pending; + + /*! data pointer passed through to call-back function */ + void *data; + /*! private number, extending \a data */ + unsigned int priv_nr; + + struct { + /*! talloc context from which to allocate msgb when reading */ + const void *ctx; + /*! size of msgb to allocte (excluding headroom) */ + unsigned int size; + /*! headroom to allocate when allocating msgb's */ + unsigned int headroom; + } msgb_alloc; + + struct { + /*! maximum length of write queue */ + unsigned int max_length; + /*! current length of write queue */ + unsigned int current_length; + /*! actual linked list implementing the transmit queue */ + struct llist_head msg_queue; + } tx_queue; + + union { + struct { + struct osmo_fd ofd; + } poll; + struct { + bool read_enabled; + bool read_pending; + bool write_pending; + bool write_enabled; + /* TODO: index into array of registered fd's? */ + } uring; + } u; +}; + +enum iofd_msg_action { + IOFD_ACT_READ, + IOFD_ACT_WRITE, + IOFD_ACT_RECVFROM, + IOFD_ACT_SENDTO, + // TODO: SCTP_* +}; + + +/* serialized version of 'struct msghdr' employed by sendmsg/recvmsg */ +struct iofd_msghdr { + struct llist_head list; + enum iofd_msg_action action; + struct msghdr hdr; + struct osmo_sockaddr sa; + struct iovec iov[1]; + int flags; + + struct msgb *msg; + struct osmo_io_fd *iofd; +}; + +enum iofd_seg_act { + IOFD_SEG_ACT_HANDLE_ONE, + IOFD_SEG_ACT_HANDLE_MORE, + IOFD_SEG_ACT_DEFER, +}; + +struct iofd_msghdr *iofd_msghdr_alloc(struct osmo_io_fd *iofd, enum iofd_msg_action action, struct msgb *msg); +void iofd_msghdr_free(struct iofd_msghdr *msghdr); + +struct msgb *iofd_msgb_alloc(struct osmo_io_fd *iofd); +struct msgb *iofd_msgb_pending(struct osmo_io_fd *iofd); +struct msgb *iofd_msgb_pending_or_alloc(struct osmo_io_fd *iofd); + +enum iofd_seg_act iofd_handle_segmentation(struct osmo_io_fd *iofd, struct msgb *msg); +void iofd_handle_segmented_read(struct osmo_io_fd *iofd, struct msgb *msg, int rc); + +int iofd_txqueue_enqueue(struct osmo_io_fd *iofd, struct iofd_msghdr *msghdr); +void iofd_txqueue_enqueue_front(struct osmo_io_fd *iofd, struct iofd_msghdr *msghdr); +struct iofd_msghdr *iofd_txqueue_dequeue(struct osmo_io_fd *iofd); + +/* poll backend */ +int iofd_poll_ofd_cb_dispatch(struct osmo_fd *ofd, unsigned int what); diff --git a/src/osmo_io_poll.c b/src/osmo_io_poll.c new file mode 100644 index 0000000..448ffe0 --- /dev/null +++ b/src/osmo_io_poll.c @@ -0,0 +1,219 @@ +/*! \file osmo_io_poll.c + * New osmocom async I/O API. + * + * (C) 2022 by Harald Welte <lafo...@osmocom.org> + * (C) 2022 by sysmocom - s.f.m.c. GmbH <i...@sysmocom.de> + * Author: Daniel Willmann <dwillm...@sysmocom.de> + * + * All Rights Reserved. + * + * SPDX-License-Identifier: GPL-2.0+ + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + */ + +#include <stdio.h> +#include <talloc.h> +#include <unistd.h> +#include <stdbool.h> +#include <errno.h> + +#include <osmocom/core/osmo_io.h> +#include <osmocom/core/linuxlist.h> +#include <osmocom/core/logging.h> +#include <osmocom/core/msgb.h> +#include <osmocom/core/select.h> +#include <osmocom/core/talloc.h> +#include <osmocom/core/utils.h> + +#include "../config.h" +#include "osmo_io_internal.h" + +static void iofd_poll_ofd_cb_read_write(struct osmo_fd *ofd, unsigned int what) +{ + struct osmo_io_fd *iofd = ofd->data; + struct iofd_msghdr *msghdr; + struct msgb *msg; + int rc; + + OSMO_ASSERT(iofd->mode == OSMO_IO_FD_MODE_READ_WRITE); + + if (what & OSMO_FD_READ) { + msg = iofd_msgb_pending_or_alloc(iofd); + if (!msg) { + LOGP(DLIO, LOGL_ERROR, "iofd(%s): Could not get msgb for reading\n", iofd->name); + OSMO_ASSERT(0); + } + + rc = read(ofd->fd, msg->tail, msgb_tailroom(msg)); + if (rc > 0) + msgb_put(msg, rc); + + iofd_handle_segmented_read(iofd, msg, rc); + } + + if (iofd->closing) { + return; + } + + if (what & OSMO_FD_WRITE) { + msghdr = iofd_txqueue_dequeue(iofd); + if (msghdr) { + msg = msghdr->msg; + rc = write(ofd->fd, msgb_data(msg), msgb_length(msg)); + if (rc < msgb_length(msg)) { + msgb_pull(msg, rc); + iofd_txqueue_enqueue_front(iofd, msghdr); + return; + } + iofd->io_ops.write_cb(iofd, rc, msg); + msgb_free(msg); + } else { + osmo_fd_write_disable(ofd); + } + } + + /* TODO: FD_EXCEPT handling? However: Rarely used in existing osmo-* */ + return; +} + +static void iofd_poll_ofd_cb_recvfrom_sendto(struct osmo_fd *ofd, unsigned int what) +{ + struct osmo_io_fd *iofd = ofd->data; + struct msgb *msg; + struct iofd_msghdr *msghdr; + int rc; + + OSMO_ASSERT(iofd->mode == OSMO_IO_FD_MODE_RECVFROM_SENDTO); + + if (what & OSMO_FD_READ) { + msg = iofd_msgb_pending_or_alloc(iofd); + if (!msg) { + LOGP(DLIO, LOGL_ERROR, "iofd(%s): Could not get msgb for reading\n", iofd->name); + OSMO_ASSERT(0); + } + struct osmo_sockaddr saddr; + struct sockaddr *sa = &saddr.u.sa; + socklen_t addrlen = sizeof(struct sockaddr); + + rc = recvfrom(ofd->fd, msgb_data(msg), msgb_tailroom(msg), 0, + sa, &addrlen); + if (rc > 0) + msgb_put(msg, rc); + + iofd_handle_segmentation(iofd, msg); + + iofd->io_ops.recvmsg_cb(iofd, rc, msg, &saddr); + + } + + if (iofd->closing) { + return; + } + + if (what & OSMO_FD_WRITE) { + msghdr = (struct iofd_msghdr *)msgb_dequeue_count(&iofd->tx_queue.msg_queue, &iofd->tx_queue.current_length); + if (msghdr) { + struct osmo_sockaddr *dest = &msghdr->sa; + msg = msghdr->msg; + + rc = sendto(ofd->fd, msgb_data(msg), msgb_length(msg), 0, + &dest->u.sa, sizeof(*dest)); + if (rc < msgb_length(msg)) { + msgb_pull(msg, rc); + iofd_txqueue_enqueue_front(iofd, msghdr); + return; + } + iofd->io_ops.sendmsg_cb(iofd, rc, msg); + + talloc_free(msghdr); + msgb_free(msg); + } else { + osmo_fd_write_disable(ofd); + } + } + + /* TODO: FD_EXCEPT handling? However: Rarely used in existing osmo-* */ + return; +} + +int iofd_poll_ofd_cb_dispatch(struct osmo_fd *ofd, unsigned int what) +{ + struct osmo_io_fd *iofd = ofd->data; + + iofd->in_callback = true; + + switch (iofd->mode) { + case OSMO_IO_FD_MODE_READ_WRITE: + iofd_poll_ofd_cb_read_write(ofd, what); + break; + case OSMO_IO_FD_MODE_RECVFROM_SENDTO: + iofd_poll_ofd_cb_recvfrom_sendto(ofd, what); + break; + default: + OSMO_ASSERT(0); + } + + iofd->in_callback = false; + + if (iofd->closing) { + talloc_free(iofd); + return 0; + } + + return 0; +} + +int iofd_poll_setup(struct osmo_io_fd *iofd) +{ + struct osmo_fd *ofd = &iofd->u.poll.ofd; + osmo_fd_setup(ofd, iofd->fd, 0, &iofd_poll_ofd_cb_dispatch, iofd, 0); + return osmo_fd_register(ofd); +} + +int iofd_poll_close(struct osmo_io_fd *iofd) +{ + osmo_fd_close(&iofd->u.poll.ofd); + + if (iofd->in_callback) + return 0; + else + return 1; +} + +void iofd_poll_read_enable(struct osmo_io_fd *iofd) +{ + osmo_fd_read_enable(&iofd->u.poll.ofd); +} + +void iofd_poll_read_disable(struct osmo_io_fd *iofd) +{ + osmo_fd_read_disable(&iofd->u.poll.ofd); +} + +void iofd_poll_write_enable(struct osmo_io_fd *iofd) +{ + osmo_fd_write_enable(&iofd->u.poll.ofd); +} + +void iofd_poll_write_disable(struct osmo_io_fd *iofd) +{ + osmo_fd_write_disable(&iofd->u.poll.ofd); +} + +struct iofd_backend_ops iofd_poll_ops = { + .setup = iofd_poll_setup, + .close = iofd_poll_close, + .write_enable = iofd_poll_write_enable, + .write_disable = iofd_poll_write_disable, + .read_enable = iofd_poll_read_enable, + .read_disable = iofd_poll_read_disable, +}; \ No newline at end of file -- To view, visit https://gerrit.osmocom.org/c/libosmocore/+/30934 To unsubscribe, or for help writing mail filters, visit https://gerrit.osmocom.org/settings Gerrit-Project: libosmocore Gerrit-Branch: master Gerrit-Change-Id: I50d73cf550d6ce8154bf827bf47408131cf5b0a0 Gerrit-Change-Number: 30934 Gerrit-PatchSet: 1 Gerrit-Owner: daniel <dwillm...@sysmocom.de> Gerrit-CC: laforge <lafo...@osmocom.org> Gerrit-MessageType: newchange