Am 28.10.2025 um 17:33 hat Hanna Czenczek geschrieben:
> If we wake a coroutine from a different context, we must ensure that it
> will yield exactly once (now or later), awaiting that wake.
> 
> curl’s current .ret == -EINPROGRESS loop may lead to the coroutine not
> yielding if the request finishes before the loop gets run.  To fix it,
> drop the loop and just yield exactly once, unless the request is served
> from the cache or failed before it was submitted – that last part makes
> it a bit complicated, as the result of curl_find_buf() now needs to be a
> tristate.
> 
> (Can be reproduced with multiqueue by adding a usleep(100000) before the
> `while (acb.ret == -EINPROGRESS)` loop.)
> 
> Also, add a comment why aio_co_wake() is safe regardless of whether the
> coroutine and curl_multi_check_completion() run in the same context.
> 
> Cc: [email protected]
> Signed-off-by: Hanna Czenczek <[email protected]>
> ---
>  block/curl.c | 55 +++++++++++++++++++++++++++++++++++-----------------
>  1 file changed, 37 insertions(+), 18 deletions(-)
> 
> diff --git a/block/curl.c b/block/curl.c
> index 68cf83ce55..65996a8866 100644
> --- a/block/curl.c
> +++ b/block/curl.c
> @@ -124,6 +124,16 @@ typedef struct BDRVCURLState {
>      char *proxypassword;
>  } BDRVCURLState;
>  
> +/** Possible result states of curl_find_buf() */
> +typedef enum {
> +    /* No buffer found, need to create new request */
> +    CURL_NO_BUF_FOUND,
> +    /* Buffer found, request filled and done */
> +    CURL_REQUEST_FILLED,
> +    /* Ongoing request found, need to yield */
> +    CURL_REQUEST_ONGOING,
> +} CURLFindBufResult;
> +
>  static void curl_clean_state(CURLState *s);
>  static void curl_multi_do(void *arg);
>  
> @@ -258,8 +268,8 @@ read_end:
>  }
>  
>  /* Called with s->mutex held.  */
> -static bool curl_find_buf(BDRVCURLState *s, uint64_t start, uint64_t len,
> -                          CURLAIOCB *acb)
> +static CURLFindBufResult curl_find_buf(BDRVCURLState *s, uint64_t start,
> +                                       uint64_t len, CURLAIOCB *acb)
>  {
>      int i;
>      uint64_t end = start + len;
> @@ -289,7 +299,7 @@ static bool curl_find_buf(BDRVCURLState *s, uint64_t 
> start, uint64_t len,
>                  qemu_iovec_memset(acb->qiov, clamped_len, 0, len - 
> clamped_len);
>              }
>              acb->ret = 0;
> -            return true;
> +            return CURL_REQUEST_FILLED;
>          }
>  
>          // Wait for unfinished chunks
> @@ -307,13 +317,13 @@ static bool curl_find_buf(BDRVCURLState *s, uint64_t 
> start, uint64_t len,
>              for (j=0; j<CURL_NUM_ACB; j++) {
>                  if (!state->acb[j]) {
>                      state->acb[j] = acb;
> -                    return true;
> +                    return CURL_REQUEST_ONGOING;
>                  }
>              }
>          }
>      }
>  
> -    return false;
> +    return CURL_NO_BUF_FOUND;
>  }
>  
>  /* Called with s->mutex held.  */
> @@ -378,6 +388,16 @@ static void curl_multi_check_completion(BDRVCURLState *s)
>                  acb->ret = error ? -EIO : 0;
>                  state->acb[i] = NULL;
>                  qemu_mutex_unlock(&s->mutex);
> +                /*
> +                 * Current AioContext is the BDS context, which may or may 
> not
> +                 * be the request (coroutine) context.
> +                 * - If it is, the coroutine must have yielded or the FD 
> handler
> +                 *   (curl_multi_do()/curl_multi_timeout_do()) could not have
> +                 *   been called and we would not be here
> +                 * - If it is not, it doesn't matter whether it has already
> +                 *   yielded or not; it will be scheduled once it does yield
> +                 * So aio_co_wake() is safe to call.
> +                 */
>                  aio_co_wake(acb->co);
>                  qemu_mutex_lock(&s->mutex);
>              }
> @@ -868,7 +888,8 @@ out_noclean:
>      return -EINVAL;
>  }
>  
> -static void coroutine_fn curl_setup_preadv(BlockDriverState *bs, CURLAIOCB 
> *acb)
> +/* Return whether a request was submitted that requires yielding */
> +static bool coroutine_fn curl_setup_preadv(BlockDriverState *bs, CURLAIOCB 
> *acb)
>  {
>      CURLState *state;
>      int running;
> @@ -877,13 +898,15 @@ static void coroutine_fn 
> curl_setup_preadv(BlockDriverState *bs, CURLAIOCB *acb)
>  
>      uint64_t start = acb->offset;
>      uint64_t end;
> +    CURLFindBufResult find_buf_res;
>  
> -    qemu_mutex_lock(&s->mutex);
> +    QEMU_LOCK_GUARD(&s->mutex);
>  
>      // In case we have the requested data already (e.g. read-ahead),
>      // we can just call the callback and be done.
> -    if (curl_find_buf(s, start, acb->bytes, acb)) {
> -        goto out;
> +    find_buf_res = curl_find_buf(s, start, acb->bytes, acb);
> +    if (find_buf_res != CURL_NO_BUF_FOUND) {
> +        return find_buf_res == CURL_REQUEST_ONGOING;
>      }
>  
>      // No cache found, so let's start a new request
> @@ -898,7 +921,7 @@ static void coroutine_fn 
> curl_setup_preadv(BlockDriverState *bs, CURLAIOCB *acb)
>      if (curl_init_state(s, state) < 0) {
>          curl_clean_state(state);
>          acb->ret = -EIO;
> -        goto out;
> +        return false;
>      }
>  
>      acb->start = 0;
> @@ -913,7 +936,7 @@ static void coroutine_fn 
> curl_setup_preadv(BlockDriverState *bs, CURLAIOCB *acb)
>      if (state->buf_len && state->orig_buf == NULL) {
>          curl_clean_state(state);
>          acb->ret = -ENOMEM;
> -        goto out;
> +        return false;
>      }
>      state->acb[0] = acb;
>  
> @@ -925,14 +948,12 @@ static void coroutine_fn 
> curl_setup_preadv(BlockDriverState *bs, CURLAIOCB *acb)
>          acb->ret = -EIO;
>  
>          curl_clean_state(state);
> -        goto out;
> +        return false;
>      }
>  
>      /* Tell curl it needs to kick things off */
>      curl_multi_socket_action(s->multi, CURL_SOCKET_TIMEOUT, 0, &running);
> -
> -out:
> -    qemu_mutex_unlock(&s->mutex);
> +    return true;
>  }
>  
>  static int coroutine_fn curl_co_preadv(BlockDriverState *bs,
> @@ -941,14 +962,12 @@ static int coroutine_fn curl_co_preadv(BlockDriverState 
> *bs,
>  {
>      CURLAIOCB acb = {
>          .co = qemu_coroutine_self(),
> -        .ret = -EINPROGRESS,
>          .qiov = qiov,
>          .offset = offset,
>          .bytes = bytes
>      };

Let's leave -EINPROGRESS here even if no other code checks for this
value any more. It can be helpful for debugging when you can distinguish
"completed successfully" from "still running".

>  
> -    curl_setup_preadv(bs, &acb);
> -    while (acb.ret == -EINPROGRESS) {
> +    if (curl_setup_preadv(bs, &acb)) {
>          qemu_coroutine_yield();
>      }
>      return acb.ret;

That whole pattern of returning true and false or even a new enum
everywhere to tell if we are waiting for something felt strange to me.
Took me a while, but I think now I know what I expected instead: Why
don't these places just yield immediately instead of requiring the outer
layer to understand what happened in the functions it called?

Kevin


Reply via email to