On Sat, Feb 07, 2026 at 08:08:58PM +0800, Brian Song wrote:
> This patch implements the CQE handler for FUSE-over-io_uring. Upon
> receiving a FUSE request via a Completion Queue Entry (CQE), the
> handler processes the request and submits the response back to the
> kernel via the FUSE_IO_URING_CMD_COMMIT_AND_FETCH command.
> 
> Additionally, the request processing logic shared between legacy and
> io_uring modes has been extracted into fuse_co_process_request_common().
> The execution flow now dispatches requests to the appropriate
> mode-specific logic based on the uring_started flag.
> 
> Suggested-by: Kevin Wolf <[email protected]>
> Suggested-by: Stefan Hajnoczi <[email protected]>
> Signed-off-by: Brian Song <[email protected]>
> ---
>  block/export/fuse.c | 400 +++++++++++++++++++++++++++++++++-----------
>  1 file changed, 301 insertions(+), 99 deletions(-)
> 
> diff --git a/block/export/fuse.c b/block/export/fuse.c
> index 867752555a..c117e081cd 100644
> --- a/block/export/fuse.c
> +++ b/block/export/fuse.c
> @@ -138,8 +138,8 @@ struct FuseQueue {
>       * FUSE_MIN_READ_BUFFER (from linux/fuse.h) bytes.
>       * This however is just the first part of the buffer; every read is given
>       * a vector of this buffer (which should be enough for all normal 
> requests,
> -     * which we check via the static assertion in FUSE_IN_OP_STRUCT()) and 
> the
> -     * spill-over buffer below.
> +     * which we check via the static assertion in FUSE_IN_OP_STRUCT_LEGACY())
> +     * and the spill-over buffer below.
>       * Therefore, the size of this buffer plus FUSE_SPILLOVER_BUF_SIZE must 
> be
>       * FUSE_MIN_READ_BUFFER or more (checked via static assertion below).
>       */
> @@ -912,6 +912,7 @@ static void coroutine_fn co_read_from_fuse_fd(void 
> *opaque)
>      }
> 
>      fuse_co_process_request(q, spillover_buf);
> +    qemu_vfree(spillover_buf);
> 
>  no_request:
>      fuse_dec_in_flight(exp);
> @@ -1684,100 +1685,75 @@ static int fuse_write_buf_response(int fd, uint32_t 
> req_id,
>  }
> 
>  /*
> - * For use in fuse_co_process_request():
> + * For use in fuse_co_process_request_common():
>   * Returns a pointer to the parameter object for the given operation (inside 
> of
> - * queue->request_buf, which is assumed to hold a fuse_in_header first).
> - * Verifies that the object is complete (queue->request_buf is large enough 
> to
> - * hold it in one piece, and the request length includes the whole object).
> + * in_buf, which is assumed to hold a fuse_in_header first).
> + * Verifies that the object is complete (in_buf is large enough to hold it in
> + * one piece, and the request length includes the whole object).
> + * Only performs verification for legacy FUSE.
>   *
>   * Note that queue->request_buf may be overwritten after yielding, so the
>   * returned pointer must not be used across a function that may yield!
>   */
> -#define FUSE_IN_OP_STRUCT(op_name, queue) \
> +#define FUSE_IN_OP_STRUCT_LEGACY(op_name, queue) \
>      ({ \
>          const struct fuse_in_header *__in_hdr = \
>              (const struct fuse_in_header *)(queue)->request_buf; \
>          const struct fuse_##op_name##_in *__in = \
>              (const struct fuse_##op_name##_in *)(__in_hdr + 1); \
>          const size_t __param_len = sizeof(*__in_hdr) + sizeof(*__in); \
> -        uint32_t __req_len; \
>          \
> -        QEMU_BUILD_BUG_ON(sizeof((queue)->request_buf) < __param_len); \
> +        QEMU_BUILD_BUG_ON(sizeof((queue)->request_buf) < \
> +                          (sizeof(struct fuse_in_header) + \
> +                           sizeof(struct fuse_##op_name##_in))); \
>          \
> -        __req_len = __in_hdr->len; \
> +        uint32_t __req_len = __in_hdr->len; \
>          if (__req_len < __param_len) { \
>              warn_report("FUSE request truncated (%" PRIu32 " < %zu)", \
>                          __req_len, __param_len); \
>              ret = -EINVAL; \
> -            break; \
> +            __in = NULL; \
>          } \
>          __in; \
>      })
> 
>  /*
> - * For use in fuse_co_process_request():
> + * For use in fuse_co_process_request_common():
>   * Returns a pointer to the return object for the given operation (inside of
>   * out_buf, which is assumed to hold a fuse_out_header first).
> - * Verifies that out_buf is large enough to hold the whole object.
> + * Only performs verification for legacy FUSE.
> + * Note: Buffer size verification is done via static assertions in the caller
> + * (fuse_co_process_request) where out_buf is a local array.
>   *
> - * (out_buf should be a char[] array.)
> + * (out_buf should be a char[] array in the caller.)
>   */
> -#define FUSE_OUT_OP_STRUCT(op_name, out_buf) \
> +#define FUSE_OUT_OP_STRUCT_LEGACY(op_name, out_buf) \
>      ({ \
>          struct fuse_out_header *__out_hdr = \
>              (struct fuse_out_header *)(out_buf); \
>          struct fuse_##op_name##_out *__out = \
>              (struct fuse_##op_name##_out *)(__out_hdr + 1); \
>          \
> -        QEMU_BUILD_BUG_ON(sizeof(*__out_hdr) + sizeof(*__out) > \
> -                          sizeof(out_buf)); \
> -        \
>          __out; \
>      })
> 
>  /**
> - * Process a FUSE request, incl. writing the response.
> - *
> - * Note that yielding in any request-processing function can overwrite the
> - * contents of q->request_buf.  Anything that takes a buffer needs to take
> - * care that the content is copied before yielding.
> - *
> - * @spillover_buf can contain the tail of a write request too large to fit 
> into
> - * q->request_buf.  This function takes ownership of it (i.e. will free it),
> - * which assumes that its contents will not be overwritten by concurrent
> - * requests (as opposed to q->request_buf).
> + * Shared helper for FUSE request processing. Handles both legacy and 
> io_uring
> + * paths.
>   */
> -static void coroutine_fn
> -fuse_co_process_request(FuseQueue *q, void *spillover_buf)
> +static void coroutine_fn fuse_co_process_request_common(
> +    FuseExport *exp,
> +    uint32_t opcode,
> +    uint64_t req_id,
> +    void *in_buf,
> +    void *spillover_buf,
> +    void *out_buf,
> +    void (*send_response)(void *opaque, uint32_t req_id, int ret,
> +                          const void *buf, void *out_buf),
> +    void *opaque /* FuseQueue* or FuseUringEnt* */)
>  {
> -    FuseExport *exp = q->exp;
> -    uint32_t opcode;
> -    uint64_t req_id;
> -    /*
> -     * Return buffer.  Must be large enough to hold all return headers, but 
> does
> -     * not include space for data returned by read requests.
> -     * (FUSE_IN_OP_STRUCT() verifies at compile time that out_buf is indeed
> -     * large enough.)
> -     */
> -    char out_buf[sizeof(struct fuse_out_header) +
> -                 MAX_CONST(sizeof(struct fuse_init_out),
> -                 MAX_CONST(sizeof(struct fuse_open_out),
> -                 MAX_CONST(sizeof(struct fuse_attr_out),
> -                 MAX_CONST(sizeof(struct fuse_write_out),
> -                           sizeof(struct fuse_lseek_out)))))];
> -    struct fuse_out_header *out_hdr = (struct fuse_out_header *)out_buf;
> -    /* For read requests: Data to be returned */
>      void *out_data_buffer = NULL;
> -    ssize_t ret;
> -
> -    /* Limit scope to ensure pointer is no longer used after yielding */
> -    {
> -        const struct fuse_in_header *in_hdr =
> -            (const struct fuse_in_header *)q->request_buf;
> -
> -        opcode = in_hdr->opcode;
> -        req_id = in_hdr->unique;
> -    }
> +    int ret = 0;
> 
>  #ifdef CONFIG_LINUX_IO_URING
>      /*
> @@ -1794,15 +1770,32 @@ fuse_co_process_request(FuseQueue *q, void 
> *spillover_buf)
> 
>      switch (opcode) {
>      case FUSE_INIT: {
> -        const struct fuse_init_in *in = FUSE_IN_OP_STRUCT(init, q);
> -        ret = fuse_co_init(exp, FUSE_OUT_OP_STRUCT(init, out_buf),
> -                           in->max_readahead, in);
> +        FuseQueue *q = opaque;
> +        const struct fuse_init_in *in =
> +            FUSE_IN_OP_STRUCT_LEGACY(init, q);
> +        if (!in) {
> +            break;
> +        }
> +
> +        struct fuse_init_out *out =
> +            FUSE_OUT_OP_STRUCT_LEGACY(init, out_buf);
> +
> +        ret = fuse_co_init(exp, out, in->max_readahead, in);
>          break;
>      }
> 
> -    case FUSE_OPEN:
> -        ret = fuse_co_open(exp, FUSE_OUT_OP_STRUCT(open, out_buf));
> +    case FUSE_OPEN: {
> +        struct fuse_open_out *out;
> +
> +        if (exp->uring_started) {
> +            out = out_buf;
> +        } else {
> +            out = FUSE_OUT_OP_STRUCT_LEGACY(open, out_buf);
> +        }

It would be nice to avoid these repetitive code changes. How about
moving the if (exp->uring_started) logic inside FUSE_IN_OP_STRUCT() and
FUSE_OUT_OP_STRUCT()?

Also, is it really necessary to make FUSE_IN_OP_STRUCT() return NULL
instead of using the break statement on error?

> +
> +        ret = fuse_co_open(exp, out);
>          break;
> +    }
> 
>      case FUSE_RELEASE:
>          ret = 0;
> @@ -1812,37 +1805,105 @@ fuse_co_process_request(FuseQueue *q, void 
> *spillover_buf)
>          ret = -ENOENT; /* There is no node but the root node */
>          break;
> 
> -    case FUSE_GETATTR:
> -        ret = fuse_co_getattr(exp, FUSE_OUT_OP_STRUCT(attr, out_buf));
> +    case FUSE_GETATTR: {
> +        struct fuse_attr_out *out;
> +
> +        if (exp->uring_started) {
> +            out = out_buf;
> +        } else {
> +            out = FUSE_OUT_OP_STRUCT_LEGACY(attr, out_buf);
> +        }
> +
> +        ret = fuse_co_getattr(exp, out);
>          break;
> +    }
> 
>      case FUSE_SETATTR: {
> -        const struct fuse_setattr_in *in = FUSE_IN_OP_STRUCT(setattr, q);
> -        ret = fuse_co_setattr(exp, FUSE_OUT_OP_STRUCT(attr, out_buf),
> -                              in->valid, in->size, in->mode, in->uid, 
> in->gid);
> +        const struct fuse_setattr_in *in;
> +        struct fuse_attr_out *out;
> +
> +        if (exp->uring_started) {
> +            in = in_buf;
> +            out = out_buf;
> +        } else {
> +            FuseQueue *q = opaque;
> +            in = FUSE_IN_OP_STRUCT_LEGACY(setattr, q);
> +            if (!in) {
> +                break;
> +            }
> +
> +            out = FUSE_OUT_OP_STRUCT_LEGACY(attr, out_buf);
> +        }
> +
> +        ret = fuse_co_setattr(exp, out, in->valid, in->size, in->mode,
> +                              in->uid, in->gid);
>          break;
>      }
> 
>      case FUSE_READ: {
> -        const struct fuse_read_in *in = FUSE_IN_OP_STRUCT(read, q);
> +        const struct fuse_read_in *in;
> +
> +        if (exp->uring_started) {
> +            in = in_buf;
> +        } else {
> +            FuseQueue *q = opaque;
> +            in = FUSE_IN_OP_STRUCT_LEGACY(read, q);
> +            if (!in) {
> +                break;
> +            }
> +        }
> +
>          ret = fuse_co_read(exp, &out_data_buffer, in->offset, in->size);
>          break;
>      }
> 
>      case FUSE_WRITE: {
> -        const struct fuse_write_in *in = FUSE_IN_OP_STRUCT(write, q);
> -        uint32_t req_len;
> -
> -        req_len = ((const struct fuse_in_header *)q->request_buf)->len;
> -        if (unlikely(req_len < sizeof(struct fuse_in_header) + sizeof(*in) +
> -                               in->size)) {
> -            warn_report("FUSE WRITE truncated; received %zu bytes of %" 
> PRIu32,
> -                        req_len - sizeof(struct fuse_in_header) - 
> sizeof(*in),
> -                        in->size);
> -            ret = -EINVAL;
> -            break;
> -        }
> +        const struct fuse_write_in *in;
> +        struct fuse_write_out *out;
> +        const void *in_place_buf;
> +        const void *spill_buf;
> +
> +        if (exp->uring_started) {
> +            FuseUringEnt *ent = opaque;
> +
> +            in = in_buf;
> +            out = out_buf;
> +
> +            assert(in->size <= ent->req_header.ring_ent_in_out.payload_sz);
> 
> +            /*
> +             * In uring mode, the "out_buf" (ent->payload) actually holds the
> +             * input data for WRITE requests.
> +             */
> +            in_place_buf = NULL;
> +            spill_buf = out_buf;
> +        } else {
> +            FuseQueue *q = opaque;
> +            in = FUSE_IN_OP_STRUCT_LEGACY(write, q);
> +            if (!in) {
> +                break;
> +            }
> +
> +            out = FUSE_OUT_OP_STRUCT_LEGACY(write, out_buf);
> +
> +            /* Additional check for WRITE: verify the request includes data 
> */
> +            uint32_t req_len =
> +                ((const struct fuse_in_header *)(q->request_buf))->len;
> +
> +            if (unlikely(req_len < sizeof(struct fuse_in_header) + 
> sizeof(*in) +
> +                        in->size)) {
> +                warn_report("FUSE WRITE truncated; received %zu bytes of %"
> +                    PRIu32,
> +                    req_len - sizeof(struct fuse_in_header) - sizeof(*in),
> +                    in->size);
> +                ret = -EINVAL;
> +                break;
> +            }
> +
> +            /* Legacy buffer setup */
> +            in_place_buf = in + 1;
> +            spill_buf = spillover_buf;
> +        }
>          /*
>           * poll_fuse_fd() has checked that in_hdr->len matches the number of
>           * bytes read, which cannot exceed the max_write value we set
> @@ -1856,13 +1917,24 @@ fuse_co_process_request(FuseQueue *q, void 
> *spillover_buf)
>           * fuse_co_write() takes care to copy its contents before potentially
>           * yielding.
>           */
> -        ret = fuse_co_write(exp, FUSE_OUT_OP_STRUCT(write, out_buf),
> -                            in->offset, in->size, in + 1, spillover_buf);
> +        ret = fuse_co_write(exp, out, in->offset, in->size,
> +                            in_place_buf, spill_buf);
>          break;
>      }
> 
>      case FUSE_FALLOCATE: {
> -        const struct fuse_fallocate_in *in = FUSE_IN_OP_STRUCT(fallocate, q);
> +        const struct fuse_fallocate_in *in;
> +
> +        if (exp->uring_started) {
> +            in = in_buf;
> +        } else {
> +            FuseQueue *q = opaque;
> +            in = FUSE_IN_OP_STRUCT_LEGACY(fallocate, q);
> +            if (!in) {
> +                break;
> +            }
> +        }
> +
>          ret = fuse_co_fallocate(exp, in->offset, in->length, in->mode);
>          break;
>      }
> @@ -1877,9 +1949,23 @@ fuse_co_process_request(FuseQueue *q, void 
> *spillover_buf)
> 
>  #ifdef CONFIG_FUSE_LSEEK
>      case FUSE_LSEEK: {
> -        const struct fuse_lseek_in *in = FUSE_IN_OP_STRUCT(lseek, q);
> -        ret = fuse_co_lseek(exp, FUSE_OUT_OP_STRUCT(lseek, out_buf),
> -                            in->offset, in->whence);
> +        const struct fuse_lseek_in *in;
> +        struct fuse_lseek_out *out;
> +
> +        if (exp->uring_started) {
> +            in = in_buf;
> +            out = out_buf;
> +        } else {
> +            FuseQueue *q = opaque;
> +            in = FUSE_IN_OP_STRUCT_LEGACY(lseek, q);
> +            if (!in) {
> +                break;
> +            }
> +
> +            out = FUSE_OUT_OP_STRUCT_LEGACY(lseek, out_buf);
> +        }
> +
> +        ret = fuse_co_lseek(exp, out, in->offset, in->whence);
>          break;
>      }
>  #endif
> @@ -1888,20 +1974,12 @@ fuse_co_process_request(FuseQueue *q, void 
> *spillover_buf)
>          ret = -ENOSYS;
>      }
> 
> -    /* Ignore errors from fuse_write*(), nothing we can do anyway */
> +    send_response(opaque, req_id, ret, out_data_buffer, out_buf);
> +
>      if (out_data_buffer) {
> -        assert(ret >= 0);
> -        fuse_write_buf_response(q->fuse_fd, req_id, out_hdr,
> -                                out_data_buffer, ret);
>          qemu_vfree(out_data_buffer);
> -    } else {
> -        fuse_write_response(q->fuse_fd, req_id, out_hdr,
> -                            ret < 0 ? ret : 0,
> -                            ret < 0 ? 0 : ret);
>      }
> 
> -    qemu_vfree(spillover_buf);
> -
>  #ifdef CONFIG_LINUX_IO_URING
>      if (unlikely(opcode == FUSE_INIT) && uring_initially_enabled) {
>          if (exp->is_uring && !exp->uring_started) {
> @@ -1910,7 +1988,8 @@ fuse_co_process_request(FuseQueue *q, void 
> *spillover_buf)
>               * If io_uring mode was requested for this export but it has not
>               * been started yet, start it now.
>               */
> -            struct fuse_init_out *out = FUSE_OUT_OP_STRUCT(init, out_buf);
> +            struct fuse_init_out *out =
> +                FUSE_OUT_OP_STRUCT_LEGACY(init, out_buf);
>              fuse_uring_start(exp, out);
>          } else if (ret == -EOPNOTSUPP) {
>              /*
> @@ -1923,12 +2002,135 @@ fuse_co_process_request(FuseQueue *q, void 
> *spillover_buf)
>      }
>  #endif
>  }
> +/* Helper to send response for legacy */
> +static void send_response_legacy(void *opaque, uint32_t req_id, int ret,
> +                                 const void *buf, void *out_buf)
> +{
> +    FuseQueue *q = (FuseQueue *)opaque;
> +    struct fuse_out_header *out_hdr = (struct fuse_out_header *)out_buf;
> +    if (buf) {
> +        assert(ret >= 0);
> +        fuse_write_buf_response(q->fuse_fd, req_id, out_hdr, buf, ret);
> +    } else {
> +        fuse_write_response(q->fuse_fd, req_id, out_hdr,
> +                            ret < 0 ? ret : 0,
> +                            ret < 0 ? 0 : ret);
> +    }
> +}
> +
> +static void coroutine_fn
> +fuse_co_process_request(FuseQueue *q, void *spillover_buf)
> +{
> +    FuseExport *exp = q->exp;
> +    uint32_t opcode;
> +    uint64_t req_id;
> +
> +    /*
> +     * Return buffer.  Must be large enough to hold all return headers, but 
> does
> +     * not include space for data returned by read requests.
> +     */
> +    char out_buf[sizeof(struct fuse_out_header) +
> +        MAX_CONST(sizeof(struct fuse_init_out),
> +        MAX_CONST(sizeof(struct fuse_open_out),
> +        MAX_CONST(sizeof(struct fuse_attr_out),
> +        MAX_CONST(sizeof(struct fuse_write_out),
> +                  sizeof(struct fuse_lseek_out)))))] = {0};
> +
> +    /* Verify that out_buf is large enough for all output structures */
> +    QEMU_BUILD_BUG_ON(sizeof(struct fuse_out_header) +
> +        sizeof(struct fuse_init_out) > sizeof(out_buf));
> +    QEMU_BUILD_BUG_ON(sizeof(struct fuse_out_header) +
> +        sizeof(struct fuse_open_out) > sizeof(out_buf));
> +    QEMU_BUILD_BUG_ON(sizeof(struct fuse_out_header) +
> +        sizeof(struct fuse_attr_out) > sizeof(out_buf));
> +    QEMU_BUILD_BUG_ON(sizeof(struct fuse_out_header) +
> +        sizeof(struct fuse_write_out) > sizeof(out_buf));
> +#ifdef CONFIG_FUSE_LSEEK
> +    QEMU_BUILD_BUG_ON(sizeof(struct fuse_out_header) +
> +        sizeof(struct fuse_lseek_out) > sizeof(out_buf));
> +#endif
> +
> +    /* Limit scope to ensure pointer is no longer used after yielding */
> +    {
> +        const struct fuse_in_header *in_hdr =
> +            (const struct fuse_in_header *)q->request_buf;
> +
> +        opcode = in_hdr->opcode;
> +        req_id = in_hdr->unique;
> +    }
> +
> +    fuse_co_process_request_common(exp, opcode, req_id, NULL, spillover_buf,
> +                                   out_buf, send_response_legacy, q);
> +}
> 
>  #ifdef CONFIG_LINUX_IO_URING
> +static void fuse_uring_prep_sqe_commit(struct io_uring_sqe *sqe, void 
> *opaque)
> +{
> +    FuseUringEnt *ent = opaque;
> +    struct fuse_uring_cmd_req *req = (void *)&sqe->cmd[0];
> +
> +    ent->last_cmd = FUSE_IO_URING_CMD_COMMIT_AND_FETCH;
> +
> +    fuse_uring_sqe_prepare(sqe, ent->rq->q, ent->last_cmd);
> +    fuse_uring_sqe_set_req_data(req, ent->rq->rqid, ent->req_commit_id);
> +}
> +
> +static void
> +fuse_uring_send_response(FuseUringEnt *ent, uint32_t req_id, int ret,
> +                         const void *out_data_buffer)
> +{
> +    FuseExport *exp = ent->rq->q->exp;
> +
> +    struct fuse_uring_req_header *rrh = &ent->req_header;
> +    struct fuse_out_header *out_header = (struct fuse_out_header 
> *)&rrh->in_out;
> +    struct fuse_uring_ent_in_out *ent_in_out =
> +        (struct fuse_uring_ent_in_out *)&rrh->ring_ent_in_out;
> +
> +    /* FUSE_READ */
> +    if (out_data_buffer && ret > 0) {
> +        memcpy(ent->req_payload, out_data_buffer, ret);
> +    }
> +
> +    out_header->error  = ret < 0 ? ret : 0;
> +    out_header->unique = req_id;
> +    ent_in_out->payload_sz = ret > 0 ? ret : 0;
> +
> +    /* Commit and fetch a uring entry */
> +    blk_exp_ref(&exp->common);
> +    aio_add_sqe(fuse_uring_prep_sqe_commit, ent, &ent->fuse_cqe_handler);
> +}
> +
> +/* Helper to send response for uring */
> +static void send_response_uring(void *opaque, uint32_t req_id, int ret,
> +                                const void *out_data_buffer, void *payload)
> +{
> +    FuseUringEnt *ent = (FuseUringEnt *)opaque;
> +
> +    fuse_uring_send_response(ent, req_id, ret, out_data_buffer);
> +}
> +
>  static void coroutine_fn fuse_uring_co_process_request(FuseUringEnt *ent)
>  {
> -    /* TODO */
> -    (void)ent;
> +    FuseExport *exp = ent->rq->q->exp;
> +    struct fuse_uring_req_header *rrh = &ent->req_header;
> +    struct fuse_uring_ent_in_out *ent_in_out =
> +        (struct fuse_uring_ent_in_out *)&rrh->ring_ent_in_out;
> +    struct fuse_in_header *in_hdr =
> +        (struct fuse_in_header *)&rrh->in_out;
> +    uint32_t opcode = in_hdr->opcode;
> +    uint64_t req_id = in_hdr->unique;
> +
> +    ent->req_commit_id = ent_in_out->commit_id;
> +
> +    if (unlikely(ent->req_commit_id == 0)) {
> +        error_report("If this happens kernel will not find the response - "
> +            "it will be stuck forever - better to abort immediately.");
> +        fuse_export_halt(exp);
> +        return;
> +    }
> +
> +    fuse_co_process_request_common(exp, opcode, req_id, &rrh->op_in,
> +        NULL, ent->req_payload, send_response_uring, ent);
>  }
>  #endif /* CONFIG_LINUX_IO_URING */
> 
> --
> 2.43.0
> 

Attachment: signature.asc
Description: PGP signature

Reply via email to