On Fri, Jul 28, 2023 at 06:17:52PM +0100, Richard W.M. Jones wrote:
> See the comment at the top of plugins/curl/pool.c for general
> information about how this works.
>
> This makes a very large difference to performance over the previous
> implementation. Note for the tests below I also applied the next
> commit changing the behaviour of the connections parameter.
>
> Using this test case:
>
> $
> uri=https://cloud-images.ubuntu.com/lunar/current/lunar-server-cloudimg-amd64.img
> $ nbdkit -r -U - curl $uri ipresolve=v4 --run 'nbdcopy -p $uri null'
>
> The times are as follows:
>
> multi, connections=64 21.5s
> multi, connections=32 30.2s
> multi, connections=16 56.0s
> before this commit 166s
Awesome performance improvements! As painful as this series has been
for you to write and debug, it is showing its worth.
> ---
> plugins/curl/curldefs.h | 35 ++--
> plugins/curl/config.c | 246 ---------------------------
> plugins/curl/curl.c | 366 +++++++++++++++++++++++++++++++++++-----
> plugins/curl/pool.c | 346 ++++++++++++++++++++++++++++---------
> 4 files changed, 616 insertions(+), 377 deletions(-)
Finally taking time to review this, even though it is already in-tree.
> @@ -98,8 +88,30 @@ struct curl_handle {
> const char *read_buf;
> uint32_t read_count;
>
> + /* This field is used by curl_get_size. */
> + bool accept_range;
> +
> /* Used by scripts.c */
> struct curl_slist *headers_copy;
> +
> + /* Used by pool.c */
> + struct command *cmd;
> +};
> +
> +/* Asynchronous commands that can be sent to the pool thread. */
> +enum command_type { EASY_HANDLE, STOP };
> +struct command {
> + /* These fields are set by the caller. */
> + enum command_type type; /* command */
> + struct curl_handle *ch; /* for EASY_HANDLE, the easy handle */
> +
> + /* This field is set to a unique value by send_command_and_wait. */
> + uint64_t id; /* serial number */
> +
> + /* These fields are used to signal back that the command finished. */
> + pthread_mutex_t mutex; /* completion mutex */
> + pthread_cond_t cond; /* completion condition */
> + CURLcode status; /* status code (CURLE_OK = succeeded) */
> };
Makes sense. The two types are mutually recursive (curl_handle
includes a struct command *; command includes a struct curl_handle *);
hopefully you have proper locking when altering multiple objects to
adjust how they point to one another.
> +++ b/plugins/curl/config.c
> +++ b/plugins/curl/curl.c
>
> +/* Get the file size. */
> +static int get_content_length_accept_range (struct curl_handle *ch);
> +static bool try_fallback_GET_method (struct curl_handle *ch);
> +static size_t header_cb (void *ptr, size_t size, size_t nmemb, void *opaque);
> +static size_t error_cb (char *ptr, size_t size, size_t nmemb, void *opaque);
> +
> +static int64_t
> +curl_get_size (void *handle)
> +{
> + struct curl_handle *ch;
> + CURLcode r;
> + long code;
> +#ifdef HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T
> + curl_off_t o;
> +#else
> + double d;
> +#endif
> + int64_t exportsize;
> +
> + /* Get a curl easy handle. */
> + ch = allocate_handle ();
> + if (ch == NULL) goto err;
> +
> + /* Prepare to read the headers. */
> + if (get_content_length_accept_range (ch) == -1)
> + goto err;
> +
> + /* Send the command to the worker thread and wait. */
> + struct command cmd = {
> + .type = EASY_HANDLE,
> + .ch = ch,
> + };
> +
> + r = send_command_and_wait (&cmd);
> + update_times (ch->c);
> + if (r != CURLE_OK) {
> + display_curl_error (ch, r,
> + "problem doing HEAD request to fetch size of URL
> [%s]",
> + url);
> +
> + /* Get the HTTP status code, if available. */
> + r = curl_easy_getinfo (ch->c, CURLINFO_RESPONSE_CODE, &code);
> + if (r == CURLE_OK)
> + nbdkit_debug ("HTTP status code: %ld", code);
> + else
> + code = -1;
> +
> + /* See comment on try_fallback_GET_method below. */
> + if (code != 403 || !try_fallback_GET_method (ch))
> + goto err;
> + }
> +
> + /* Get the content length.
> + *
> + * Note there is some subtlety here: For web servers using chunked
> + * encoding, either the Content-Length header will not be present,
> + * or if present it should be ignored. (For such servers the only
> + * way to find out the true length would be to read all of the
> + * content, which we don't want to do).
> + *
> + * Curl itself resolves this for us. It will ignore the
> + * Content-Length header if chunked encoding is used, returning the
> + * length as -1 which we check below (see also
> + * curl:lib/http.c:Curl_http_size).
> + */
> +#ifdef HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T
> + r = curl_easy_getinfo (ch->c, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &o);
> + if (r != CURLE_OK) {
> + display_curl_error (ch, r,
> + "could not get length of remote file [%s]", url);
> + goto err;
> + }
> +
> + if (o == -1) {
> + nbdkit_error ("could not get length of remote file [%s], "
> + "is the URL correct?", url);
> + goto err;
> + }
> +
> + exportsize = o;
> +#else
> + r = curl_easy_getinfo (ch->c, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &d);
> + if (r != CURLE_OK) {
> + display_curl_error (ch, r,
> + "could not get length of remote file [%s]", url);
> + goto err;
> + }
> +
> + if (d == -1) {
> + nbdkit_error ("could not get length of remote file [%s], "
> + "is the URL correct?", url);
> + goto err;
> + }
> +
> + exportsize = d;
Does curl guarantee that the double d will contain a value assignable
to int64_t without overflow/truncation? For particularly large sizes,
double has insufficient precision for all possible file sizes, but I
doubt someone is exposing such large files over HTTP.
> +#endif
> + nbdkit_debug ("content length: %" PRIi64, exportsize);
> +
> + /* If this is HTTP, check that byte ranges are supported. */
> + if (ascii_strncasecmp (url, "http://", strlen ("http://")) == 0 ||
> + ascii_strncasecmp (url, "https://", strlen ("https://")) == 0) {
> + if (!ch->accept_range) {
> + nbdkit_error ("server does not support 'range' (byte range) requests");
> + goto err;
> + }
> +
> + nbdkit_debug ("accept range supported (for HTTP/HTTPS)");
> + }
> +
> + free_handle (ch);
> + return exportsize;
> +
> + err:
> + if (ch)
> + free_handle (ch);
> + return -1;
> +}
> +
> +/* Get the file size and also whether the remote HTTP server
> + * supports byte ranges.
> + */
> +static int
> +get_content_length_accept_range (struct curl_handle *ch)
> +{
> + /* We must run the scripts if necessary and set headers in the
> + * handle.
> + */
> + if (do_scripts (ch) == -1)
> + return -1;
> +
> + /* Set this flag in the handle to false. The callback should set it
> + * to true if byte ranges are supported, which we check below.
> + */
> + ch->accept_range = false;
> +
> + /* No Body, not nobody! This forces a HEAD request. */
> + curl_easy_setopt (ch->c, CURLOPT_NOBODY, 1L);
> + curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, header_cb);
> + curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, ch);
> + curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, NULL);
> + curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, NULL);
> + curl_easy_setopt (ch->c, CURLOPT_READFUNCTION, NULL);
> + curl_easy_setopt (ch->c, CURLOPT_READDATA, NULL);
> + return 0;
> +}
> +
> +/* S3 servers can return 403 Forbidden for HEAD but still respond
> + * to GET, so we give it a second chance in that case.
> + * https://github.com/kubevirt/containerized-data-importer/issues/2737
> + *
> + * This function issues a GET request with a writefunction that always
> + * returns an error, thus effectively getting the headers but
> + * abandoning the transfer as soon as possible after.
> + */
> +static bool
> +try_fallback_GET_method (struct curl_handle *ch)
> +{
> + CURLcode r;
> +
> + nbdkit_debug ("attempting to fetch headers using GET method");
> +
> + curl_easy_setopt (ch->c, CURLOPT_HTTPGET, 1L);
> + curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, header_cb);
> + curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, ch);
> + curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, error_cb);
> + curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch);
> +
> + struct command cmd = {
> + .type = EASY_HANDLE,
> + .ch = ch,
> + };
> +
> + r = send_command_and_wait (&cmd);
> + update_times (ch->c);
> +
> + /* We expect CURLE_WRITE_ERROR here, but CURLE_OK is possible too
> + * (eg if the remote has zero length). Other errors might happen
> + * but we ignore them since it is a fallback path.
> + */
> + return r == CURLE_OK || r == CURLE_WRITE_ERROR;
> +}
> +
> +static size_t
> +header_cb (void *ptr, size_t size, size_t nmemb, void *opaque)
> +{
> + struct curl_handle *ch = opaque;
> + size_t realsize = size * nmemb;
> + const char *header = ptr;
> + const char *end = header + realsize;
> + const char *accept_ranges = "accept-ranges:";
> + const char *bytes = "bytes";
> +
> + if (realsize >= strlen (accept_ranges) &&
> + ascii_strncasecmp (header, accept_ranges, strlen (accept_ranges)) ==
> 0) {
> + const char *p = strchr (header, ':') + 1;
> +
> + /* Skip whitespace between the header name and value. */
> + while (p < end && *p && ascii_isspace (*p))
Technically, '*p && ascii_isspace (*p)' can be shortened to
'ascii_isspace (*p)', since the NUL byte is not ascii space. I don't
know if the compiler is smart enough to make that optimization on your
behalf.
> + p++;
> +
> + if (end - p >= strlen (bytes)
> + && strncmp (p, bytes, strlen (bytes)) == 0) {
> + /* Check that there is nothing but whitespace after the value. */
> + p += strlen (bytes);
> + while (p < end && *p && ascii_isspace (*p))
Another spot of the same.
> + p++;
> +
> + if (p == end || !*p)
> + ch->accept_range = true;
> + }
> + }
> +
> + return realsize;
> +}
> +
> +static size_t
> +error_cb (char *ptr, size_t size, size_t nmemb, void *opaque)
> +{
> +#ifdef CURL_WRITEFUNC_ERROR
> + return CURL_WRITEFUNC_ERROR;
> +#else
> + return 0; /* in older curl, any size < requested will also be an error */
> +#endif
> +}
> +
> /* Read data from the remote server. */
> +static size_t write_cb (char *ptr, size_t size, size_t nmemb, void *opaque);
> +
> static int
> curl_pread (void *handle, void *buf, uint32_t count, uint64_t offset)
> {
> CURLcode r;
> + struct curl_handle *ch;
> char range[128];
>
> - GET_HANDLE_FOR_CURRENT_SCOPE (ch);
> - if (ch == NULL)
> - return -1;
> + /* Get a curl easy handle. */
> + ch = allocate_handle ();
> + if (ch == NULL) goto err;
>
> /* Run the scripts if necessary and set headers in the handle. */
> - if (do_scripts (ch) == -1) return -1;
> + if (do_scripts (ch) == -1) goto err;
>
> /* Tell the write_cb where we want the data to be written. write_cb
> * will update this if the data comes in multiple sections.
> */
> + curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, write_cb);
> + curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch);
> ch->write_buf = buf;
> ch->write_count = count;
>
> @@ -183,11 +388,16 @@ curl_pread (void *handle, void *buf, uint32_t count,
> uint64_t offset)
> offset, offset + count);
> curl_easy_setopt (ch->c, CURLOPT_RANGE, range);
>
> - /* The assumption here is that curl will look after timeouts. */
> - r = curl_easy_perform (ch->c);
> + /* Send the command to the worker thread and wait. */
> + struct command cmd = {
> + .type = EASY_HANDLE,
> + .ch = ch,
> + };
> +
> + r = send_command_and_wait (&cmd);
> if (r != CURLE_OK) {
> - display_curl_error (ch, r, "pread: curl_easy_perform");
> - return -1;
> + display_curl_error (ch, r, "pread");
> + goto err;
> }
> update_times (ch->c);
>
> @@ -198,26 +408,67 @@ curl_pread (void *handle, void *buf, uint32_t count,
> uint64_t offset)
> /* As far as I understand the cURL API, this should never happen. */
> assert (ch->write_count == 0);
>
> + free_handle (ch);
> return 0;
> +
> + err:
> + if (ch)
> + free_handle (ch);
> + return -1;
> +}
> +
> +/* NB: The terminology used by libcurl is confusing!
> + *
> + * WRITEFUNCTION / write_cb is used when reading from the remote server
> + * READFUNCTION / read_cb is used when writing to the remote server.
> + *
> + * We use the same terminology as libcurl here.
> + */
> +static size_t
> +write_cb (char *ptr, size_t size, size_t nmemb, void *opaque)
> +{
> + struct curl_handle *ch = opaque;
> + size_t orig_realsize = size * nmemb;
> + size_t realsize = orig_realsize;
Do we have to worry about overflow when compiling on 32-bit machines?
Asked differently, should we be using off_t instead of size_t in any
of this code? Thankfully, for now, we know NBD .pread and .pwrite
requests are capped at 64M, so I think you're okay (we aren't ever
going to ask curl for gigabytes in one request), but maybe a comment
or assert() is worth it?
> +
> + assert (ch->write_buf);
> +
> + /* Don't read more than the requested amount of data, even if the
> + * server or libcurl sends more.
> + */
> + if (realsize > ch->write_count)
> + realsize = ch->write_count;
> +
> + memcpy (ch->write_buf, ptr, realsize);
> +
> + ch->write_count -= realsize;
> + ch->write_buf += realsize;
> +
> + return orig_realsize;
[1]
> }
>
> /* Write data to the remote server. */
> +static size_t read_cb (void *ptr, size_t size, size_t nmemb, void *opaque);
> +
> static int
> curl_pwrite (void *handle, const void *buf, uint32_t count, uint64_t offset)
> {
> CURLcode r;
> + struct curl_handle *ch;
> char range[128];
>
> - GET_HANDLE_FOR_CURRENT_SCOPE (ch);
> - if (ch == NULL)
> - return -1;
> + /* Get a curl easy handle. */
> + ch = allocate_handle ();
> + if (ch == NULL) goto err;
>
> /* Run the scripts if necessary and set headers in the handle. */
> - if (do_scripts (ch) == -1) return -1;
> + if (do_scripts (ch) == -1) goto err;
>
> /* Tell the read_cb where we want the data to be read from. read_cb
> * will update this if the data comes in multiple sections.
> */
> + curl_easy_setopt (ch->c, CURLOPT_READFUNCTION, read_cb);
> + curl_easy_setopt (ch->c, CURLOPT_READDATA, ch);
> ch->read_buf = buf;
> ch->read_count = count;
>
> @@ -228,11 +479,16 @@ curl_pwrite (void *handle, const void *buf, uint32_t
> count, uint64_t offset)
> offset, offset + count);
> curl_easy_setopt (ch->c, CURLOPT_RANGE, range);
>
> - /* The assumption here is that curl will look after timeouts. */
> - r = curl_easy_perform (ch->c);
> + /* Send the command to the worker thread and wait. */
> + struct command cmd = {
> + .type = EASY_HANDLE,
> + .ch = ch,
> + };
> +
> + r = send_command_and_wait (&cmd);
> if (r != CURLE_OK) {
> - display_curl_error (ch, r, "pwrite: curl_easy_perform");
> - return -1;
> + display_curl_error (ch, r, "pwrite");
> + goto err;
> }
> update_times (ch->c);
>
> @@ -243,7 +499,31 @@ curl_pwrite (void *handle, const void *buf, uint32_t
> count, uint64_t offset)
> /* As far as I understand the cURL API, this should never happen. */
> assert (ch->read_count == 0);
>
> + free_handle (ch);
> return 0;
> +
> + err:
> + if (ch)
> + free_handle (ch);
> + return -1;
> +}
> +
> +static size_t
> +read_cb (void *ptr, size_t size, size_t nmemb, void *opaque)
> +{
> + struct curl_handle *ch = opaque;
> + size_t realsize = size * nmemb;
> +
> + assert (ch->read_buf);
> + if (realsize > ch->read_count)
> + realsize = ch->read_count;
> +
> + memcpy (ptr, ch->read_buf, realsize);
> +
> + ch->read_count -= realsize;
> + ch->read_buf += realsize;
> +
> + return realsize;
Why does write_cb in [1] above return orig_realsize, but read_cb
returns the potentially modified realsize?
> }
>
> static struct nbdkit_plugin plugin = {
> diff --git a/plugins/curl/pool.c b/plugins/curl/pool.c
> index eb2d330e1..2974cda3f 100644
> --- a/plugins/curl/pool.c
> +++ b/plugins/curl/pool.c
> @@ -30,11 +30,29 @@
> * SUCH DAMAGE.
> */
>
> -/* Curl handle pool.
> +/* Worker thread which processes the curl multi interface.
> *
> - * To get a libcurl handle, call get_handle(). When you hold the
> - * handle, it is yours exclusively to use. After you have finished
> - * with the handle, put it back into the pool by calling put_handle().
> + * The main nbdkit threads (see curl.c) create curl easy handles
> + * initialized with the work they want to carry out. Note there is
> + * one easy handle per task (eg. per pread/pwrite request). The easy
> + * handles are not reused.
> + *
> + * The commands + optional easy handle are submitted to the worker
> + * thread over a self-pipe (it's easy to use a pipe here because the
> + * way curl multi works is it can listen on an extra fd, but not on
> + * anything else like a pthread condition). The curl multi performs
> + * the work of the outstanding easy handles.
> + *
> + * When an easy handle finishes work or errors, we retire the command
> + * by signalling back to the waiting nbdkit thread using a pthread
> + * condition.
> + *
> + * In my experiments, we're almost always I/O bound so I haven't seen
> + * any strong need to use more than one curl multi / worker thread,
> + * although it would be possible to add more in future.
> + *
> + * See also this extremely useful thread:
> + * https://curl.se/mail/lib-2019-03/0100.html
Very useful comment (and link).
> */
>
> #include <config.h>
> @@ -45,9 +63,19 @@
> #include <stdint.h>
> #include <inttypes.h>
> #include <string.h>
> +#include <unistd.h>
> #include <assert.h>
> #include <pthread.h>
>
> +#ifdef HAVE_STDATOMIC_H
> +#include <stdatomic.h>
> +#else
> +/* Some old platforms lack atomic types, but 32 bit ints are usually
> + * "atomic enough".
> + */
> +#define _Atomic /**/
> +#endif
> +
> #include <curl/curl.h>
>
> #include <nbdkit-plugin.h>
> @@ -62,115 +90,281 @@ NBDKIT_DLL_PUBLIC int curl_debug_pool = 0;
>
> unsigned connections = 4;
>
> -/* This lock protects access to the curl_handles vector below. */
> -static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
> +/* Pipe used to notify background thread that a command is pending in
> + * the queue. A pointer to the 'struct command' is sent over the
> + * pipe.
> + */
> +static int self_pipe[2] = { -1, -1 };
>
> -/* List of curl handles. This is allocated dynamically as more
> - * handles are requested. Currently it does not shrink. It may grow
> - * up to 'connections' in length.
> +/* The curl multi handle. */
> +static CURLM *multi;
> +
> +/* List of running easy handles. We only need to maintain this so we
> + * can remove them from the multi handle when cleaning up.
> */
> DEFINE_VECTOR_TYPE (curl_handle_list, struct curl_handle *);
> static curl_handle_list curl_handles = empty_vector;
>
> -/* The condition is used when the curl handles vector is full and
> - * we're waiting for a thread to put_handle.
> - */
> -static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
> -static size_t in_use = 0, waiting = 0;
> +static const char *
> +command_type_to_string (enum command_type type)
> +{
> + switch (type) {
> + case EASY_HANDLE: return "EASY_HANDLE";
> + case STOP: return "STOP";
> + default: abort ();
> + }
> +}
>
> int
> pool_get_ready (void)
> {
> + multi = curl_multi_init ();
> + if (multi == NULL) {
> + nbdkit_error ("curl_multi_init failed: %m");
> + return -1;
> + }
> +
> return 0;
> }
>
> +/* Start and stop the background thread. */
> +static pthread_t thread;
> +static bool thread_running;
> +static void *pool_worker (void *);
> +
> int
> pool_after_fork (void)
> {
> + int err;
> +
> + if (pipe (self_pipe) == -1) {
> + nbdkit_error ("pipe: %m");
> + return -1;
> + }
> +
> + /* Start the pool background thread where all the curl work is done. */
> + err = pthread_create (&thread, NULL, pool_worker, NULL);
> + if (err != 0) {
> + errno = err;
> + nbdkit_error ("pthread_create: %m");
> + return -1;
> + }
> + thread_running = true;
> +
> return 0;
> }
>
> -/* Close and free all handles in the pool. */
> +/* Unload the background thread. */
> void
> pool_unload (void)
> {
> - size_t i;
> + if (thread_running) {
> + /* Stop the background thread. */
> + struct command cmd = { .type = STOP };
> + send_command_and_wait (&cmd);
> + pthread_join (thread, NULL);
> + thread_running = false;
> + }
>
> - if (curl_debug_pool)
> - nbdkit_debug ("unload_pool: number of curl handles allocated: %zu",
> - curl_handles.len);
> + if (self_pipe[0] >= 0) {
> + close (self_pipe[0]);
> + self_pipe[0] = -1;
> + }
> + if (self_pipe[1] >= 0) {
> + close (self_pipe[1]);
> + self_pipe[1] = -1;
> + }
>
> - for (i = 0; i < curl_handles.len; ++i)
> - free_handle (curl_handles.ptr[i]);
> - curl_handle_list_reset (&curl_handles);
> + if (multi) {
> + size_t i;
> +
> + /* Remove and free any easy handles in the multi. */
> + for (i = 0; i < curl_handles.len; ++i) {
> + curl_multi_remove_handle (multi, curl_handles.ptr[i]->c);
> + free_handle (curl_handles.ptr[i]);
> + }
> +
> + curl_multi_cleanup (multi);
> + multi = NULL;
> + }
> }
>
> -/* Get a handle from the pool.
> - *
> - * It is owned exclusively by the caller until they call put_handle.
> +/* Command queue. */
> +static _Atomic uint64_t id; /* next command ID */
> +
> +/* Send command to the background thread and wait for completion.
> + * This is only called by one of the nbdkit threads.
> */
> -struct curl_handle *
> -get_handle (void)
> +CURLcode
> +send_command_and_wait (struct command *cmd)
> {
> - ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&lock);
> - size_t i;
> - struct curl_handle *ch;
> -
> - again:
> - /* Look for a handle which is not in_use. */
> - for (i = 0; i < curl_handles.len; ++i) {
> - ch = curl_handles.ptr[i];
> - if (!ch->in_use) {
> - ch->in_use = true;
> - in_use++;
> + cmd->id = id++;
> +
> + /* CURLcode is 0 (CURLE_OK) or > 0, so use -1 as a sentinel to
> + * indicate that the command has not yet been completed and status
> + * set.
> + */
> + cmd->status = -1;
> +
> + /* This will be used to signal command completion back to us. */
> + pthread_mutex_init (&cmd->mutex, NULL);
> + pthread_cond_init (&cmd->cond, NULL);
> +
> + /* Send the command to the background thread. */
> + if (write (self_pipe[1], &cmd, sizeof cmd) != sizeof cmd)
> + abort ();
> +
> + /* Wait for the command to be completed by the background thread. */
> + {
> + ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&cmd->mutex);
> + while (cmd->status == -1) /* for -1, see above */
> + pthread_cond_wait (&cmd->cond, &cmd->mutex);
> + }
> +
> + pthread_mutex_destroy (&cmd->mutex);
> + pthread_cond_destroy (&cmd->cond);
> +
> + /* Note the main thread must call nbdkit_error on error! */
> + return cmd->status;
> +}
> +
> +/* The background thread. */
> +static void check_for_finished_handles (void);
> +static void retire_command (struct command *cmd, CURLcode code);
> +static void do_easy_handle (struct command *cmd);
> +
> +static void *
> +pool_worker (void *vp)
> +{
> + bool stop = false;
> +
> + if (curl_debug_pool)
> + nbdkit_debug ("curl: background thread started");
> +
> + while (!stop) {
> + struct command *cmd = NULL;
> + struct curl_waitfd extra_fds[1] =
> + { { .fd = self_pipe[0], .events = CURL_WAIT_POLLIN } };
> + CURLMcode mc;
> + int numfds, running_handles, repeats = 0;
> +
> + do {
> + /* Process the multi handle. */
> + mc = curl_multi_perform (multi, &running_handles);
> + if (mc != CURLM_OK) {
> + nbdkit_error ("curl_multi_perform: %s", curl_multi_strerror (mc));
Since nbdkit_error() stores its string in thread-local storage, is
there anything that ever extracts this error over to the nbdkit thread
that issued the original request to the worker thread?...
> + abort (); /* XXX We don't expect this to happen */
...Then again, if we abort, it doesn't matter.
> + }
> +
> + check_for_finished_handles ();
> +
> + mc = curl_multi_wait (multi, extra_fds, 1, 1000000, &numfds);
> + if (mc != CURLM_OK) {
> + nbdkit_error ("curl_multi_wait: %s", curl_multi_strerror (mc));
> + abort (); /* XXX We don't expect this to happen */
> + }
> +
> if (curl_debug_pool)
> - nbdkit_debug ("get_handle: %zu", ch->i);
> - return ch;
> - }
> - }
> + nbdkit_debug ("curl_multi_wait returned: running_handles=%d
> numfds=%d",
> + running_handles, numfds);
> +
> + if (numfds == 0) {
> + repeats++;
> + if (repeats > 1)
> + nbdkit_nanosleep (1, 0);
> + }
> + else {
> + repeats = 0;
> + if (extra_fds[0].revents == CURL_WAIT_POLLIN) {
> + /* There's a command waiting. */
> + if (read (self_pipe[0], &cmd, sizeof cmd) != sizeof cmd)
> + abort ();
> + }
> + }
> + } while (!cmd);
>
> - /* If more connections are allowed, then allocate a new handle. */
> - if (curl_handles.len < connections) {
> - ch = allocate_handle ();
> - if (ch == NULL)
> - return NULL;
> - if (curl_handle_list_append (&curl_handles, ch) == -1) {
> - free_handle (ch);
> - return NULL;
> - }
> - ch->i = curl_handles.len - 1;
> - ch->in_use = true;
> - in_use++;
> if (curl_debug_pool)
> - nbdkit_debug ("get_handle: %zu", ch->i);
> - return ch;
> - }
> + nbdkit_debug ("curl: dispatching %s command %" PRIu64,
> + command_type_to_string (cmd->type), cmd->id);
> +
> + switch (cmd->type) {
> + case STOP:
> + stop = true;
> + retire_command (cmd, CURLE_OK);
> + break;
>
> - /* Otherwise we have run out of connections so we must wait until
> - * another thread calls put_handle.
> - */
> - assert (in_use == connections);
> - waiting++;
> - while (in_use == connections)
> - pthread_cond_wait (&cond, &lock);
> - waiting--;
> + case EASY_HANDLE:
> + do_easy_handle (cmd);
> + break;
> + }
> + } /* while (!stop) */
>
> - goto again;
> + if (curl_debug_pool)
> + nbdkit_debug ("curl: background thread stopped");
> +
> + return NULL;
> }
>
> -/* Return the handle to the pool. */
> -void
> -put_handle (struct curl_handle *ch)
> +/* This checks if any easy handles in the multi have
> + * finished and retires the associated commands.
> + */
> +static void
> +check_for_finished_handles (void)
> {
> - ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&lock);
> + CURLMsg *msg;
> + int msgs_in_queue;
> +
> + while ((msg = curl_multi_info_read (multi, &msgs_in_queue)) != NULL) {
> + size_t i;
> + struct curl_handle *ch = NULL;
> +
> + if (msg->msg == CURLMSG_DONE) {
> + /* Find this curl_handle. */
> + for (i = 0; i < curl_handles.len; ++i) {
> + if (curl_handles.ptr[i]->c == msg->easy_handle) {
> + ch = curl_handles.ptr[i];
> + curl_handle_list_remove (&curl_handles, i);
> + }
> + }
> + if (ch == NULL) abort ();
> + curl_multi_remove_handle (multi, ch->c);
> +
> + retire_command (ch->cmd, msg->data.result);
> + }
> + }
> +}
>
> +/* Retire a command. status is a CURLcode. */
> +static void
> +retire_command (struct command *cmd, CURLcode status)
> +{
> if (curl_debug_pool)
> - nbdkit_debug ("put_handle: %zu", ch->i);
> + nbdkit_debug ("curl: retiring %s command %" PRIu64,
> + command_type_to_string (cmd->type), cmd->id);
> +
> + ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&cmd->mutex);
> + cmd->status = status;
> + pthread_cond_signal (&cmd->cond);
> +}
> +
> +static void
> +do_easy_handle (struct command *cmd)
> +{
> + CURLMcode mc;
> +
> + cmd->ch->cmd = cmd;
> +
> + /* Add the handle to the multi. */
> + mc = curl_multi_add_handle (multi, cmd->ch->c);
> + if (mc != CURLM_OK) {
> + nbdkit_error ("curl_multi_add_handle: %s", curl_multi_strerror (mc));
> + goto err;
> + }
>
> - ch->in_use = false;
> - in_use--;
> + if (curl_handle_list_append (&curl_handles, cmd->ch) == -1)
> + goto err;
> + return;
>
> - /* Signal the next thread which is waiting. */
> - if (waiting > 0)
> - pthread_cond_signal (&cond);
> + err:
> + retire_command (cmd, CURLE_OUT_OF_MEMORY);
> }
> --
> 2.41.0
Overall looks nice, and I learned more about curl in the process.
--
Eric Blake, Principal Software Engineer
Red Hat, Inc.
Virtualization: qemu.org | libguestfs.org
_______________________________________________
Libguestfs mailing list
[email protected]
https://listman.redhat.com/mailman/listinfo/libguestfs