From: zhaoyifan <[email protected]>

The current S3 remote implementation uses the `curl_easy*` API family to
interact with the backend, which operates serially. Let’s migrate to the
`curl_multi*` APIs to leverage libcurl’s multiplexing capabilities.

Now following `ListObjects`, `GetObject` are concurrently executed with a
sliding window limiting maximum concurrency.

Signed-off-by: Yifan Zhao <[email protected]>
---
 lib/remotes/s3.c | 400 +++++++++++++++++++++++++++++++++--------------
 1 file changed, 281 insertions(+), 119 deletions(-)

diff --git a/lib/remotes/s3.c b/lib/remotes/s3.c
index 0296ef4..3c900e0 100644
--- a/lib/remotes/s3.c
+++ b/lib/remotes/s3.c
@@ -25,6 +25,8 @@
 #define S3EROFS_URL_LEN                        8192
 #define S3EROFS_CANONICAL_QUERY_LEN    2048
 
+#define S3EROFS_MAX_GETOBJECT_CONCUR   16
+
 #define BASE64_ENCODE_LEN(len) (((len + 2) / 3) * 4)
 
 struct s3erofs_query_params {
@@ -101,8 +103,6 @@ static int s3erofs_prepare_url(struct s3erofs_curl_request 
*req,
        return 0;
 }
 
-static char *get_canonical_headers(const struct curl_slist *list) { return ""; 
}
-
 // See: 
https://docs.aws.amazon.com/AmazonS3/latest/API/RESTAuthentication.html#ConstructingTheAuthenticationHeader
 static char *s3erofs_sigv2_header(const struct curl_slist *headers,
                const char *method, const char *content_md5,
@@ -112,7 +112,6 @@ static char *s3erofs_sigv2_header(const struct curl_slist 
*headers,
        u8 hmac_signature[EVP_MAX_MD_SIZE];
        char *str, *output = NULL;
        unsigned int len, pos, output_len;
-       const char *canonical_headers = get_canonical_headers(headers);
        const char *prefix = "Authorization: AWS ";
 
        if (!method || !date || !ak || !sk)
@@ -126,7 +125,7 @@ static char *s3erofs_sigv2_header(const struct curl_slist 
*headers,
                canonical_query = "/";
 
        pos = asprintf(&str, "%s\n%s\n%s\n%s\n%s%s", method, content_md5,
-                      content_type, date, canonical_headers, canonical_query);
+                      content_type, date, "", canonical_query);
        if (pos < 0)
                return ERR_PTR(-ENOMEM);
 
@@ -164,6 +163,11 @@ struct s3erofs_curl_response {
        size_t size;
 };
 
+struct s3erofs_curl_getobject_resp {
+       struct erofs_vfile vf;
+       erofs_off_t pos, end;
+};
+
 static size_t s3erofs_request_write_memory_cb(void *contents, size_t size,
                                              size_t nmemb, void *userp)
 {
@@ -256,11 +260,48 @@ err_header:
        return ret;
 }
 
+static CURL *s3erofs_curl_easy_init()
+{
+       CURL *curl;
+
+       curl = curl_easy_init();
+       if (!curl)
+               return ERR_PTR(-ENOMEM);
+
+       if (curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L) != CURLE_OK)
+               goto out_cleanup;
+
+       if (curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, 30L) != CURLE_OK)
+               goto out_cleanup;
+
+       if (curl_easy_setopt(curl, CURLOPT_USERAGENT,
+                            "s3erofs/" PACKAGE_VERSION) != CURLE_OK)
+               goto out_cleanup;
+
+       return curl;
+
+out_cleanup:
+       curl_easy_cleanup(curl);
+       return ERR_PTR(-EFAULT);
+}
+
 struct s3erofs_object_info {
        char *key;
        u64 size;
        time_t mtime;
        u32 mtime_ns;
+
+       bool downloaded;
+       struct erofs_diskbuf *diskbuf;
+};
+
+struct s3erofs_getobject_task {
+       CURL *easy_handle;
+       struct curl_slist *request_headers;
+       struct s3erofs_curl_request req;
+       struct s3erofs_curl_getobject_resp resp;
+       struct s3erofs_object_info *obj;
+       int index;
 };
 
 struct s3erofs_object_iterator {
@@ -273,8 +314,106 @@ struct s3erofs_object_iterator {
 
        char *next_marker;
        bool is_truncated;
+
+       /* fields for getobject */
+       CURLM *multi_handle;
+       struct s3erofs_getobject_task *getobj_tasks;
+       int getobj_idx;
 };
 
+static size_t s3erofs_remote_getobject_cb(void *contents, size_t size, size_t 
nmemb,
+                                         void *userp)
+{
+       struct s3erofs_curl_getobject_resp *resp = userp;
+       size_t realsize = size * nmemb;
+
+       if (resp->pos + realsize > resp->end ||
+           erofs_io_pwrite(&resp->vf, contents, resp->pos, realsize) != 
realsize)
+               return 0;
+
+       resp->pos += realsize;
+       return realsize;
+}
+
+static int s3erofs_add_getobject_task(struct s3erofs_object_iterator *it,
+                                    struct s3erofs_object_info *obj,
+                                    int task_index)
+{
+       struct s3erofs_query_params params = {0};
+       struct s3erofs_getobject_task *task;
+       int ret;
+
+       task = &it->getobj_tasks[task_index];
+       if (!task->easy_handle) {
+               task->easy_handle = s3erofs_curl_easy_init();
+               if (!task->easy_handle)
+                       return PTR_ERR(task->easy_handle);
+
+               if (curl_easy_setopt(task->easy_handle, CURLOPT_PRIVATE, task) 
!=
+                   CURLE_OK) {
+                       curl_easy_cleanup(task->easy_handle);
+                       return -EFAULT;
+               }
+       }
+
+       task->index = task_index;
+       task->obj = obj;
+
+       obj->diskbuf = calloc(1, sizeof(struct erofs_diskbuf));
+       if (!obj->diskbuf)
+               return -ENOMEM;
+       task->resp.vf.fd = erofs_diskbuf_reserve(obj->diskbuf, 0, 
&task->resp.pos);
+       if (task->resp.vf.fd < 0) {
+               ret = -EBADF;
+               goto err;
+       }
+       erofs_diskbuf_commit(obj->diskbuf, obj->size);
+       task->resp.end = task->resp.pos + obj->size;
+
+       task->req.method = "GET";
+       ret = s3erofs_prepare_url(&task->req, it->s3->endpoint, it->bucket, 
obj->key,
+                                 &params, it->s3->url_style);
+       if (ret < 0)
+               goto err;
+
+       curl_easy_setopt(task->easy_handle, CURLOPT_URL, task->req.url);
+       curl_easy_setopt(task->easy_handle, CURLOPT_WRITEFUNCTION,
+                        s3erofs_remote_getobject_cb);
+       curl_easy_setopt(task->easy_handle, CURLOPT_WRITEDATA, &task->resp);
+
+       /* Add authentication headers */
+       if (it->s3->access_key[0]) {
+               if (task->request_headers) {
+                       curl_slist_free_all(task->request_headers);
+                       task->request_headers = NULL;
+               }
+
+               ret = s3erofs_request_insert_auth(&task->request_headers, 
task->req.method,
+                                                 task->req.canonical_query,
+                                                 it->s3->access_key, 
it->s3->secret_key);
+               if (ret < 0) {
+                       erofs_err("failed to insert auth headers");
+                       goto err;
+               }
+               curl_easy_setopt(task->easy_handle, CURLOPT_HTTPHEADER, 
task->request_headers);
+       }
+
+       /* Add to multi handle */
+       ret = curl_multi_add_handle(it->multi_handle, task->easy_handle);
+       if (ret != CURLM_OK) {
+               erofs_err("failed to add getobject task to multi handle: %s",
+                         curl_multi_strerror(ret));
+               ret = -EIO;
+               goto err;
+       }
+
+       return 0;
+
+err:
+       free(obj->diskbuf);
+       return ret;
+}
+
 static int s3erofs_parse_list_objects_one(xmlNodePtr node,
                                          struct s3erofs_object_info *info)
 {
@@ -312,6 +451,8 @@ static int s3erofs_parse_list_objects_one(xmlNodePtr node,
                        xmlFree(str);
                }
        }
+
+       info->downloaded = false;
        return 0;
 }
 
@@ -319,7 +460,7 @@ static int s3erofs_parse_list_objects_result(const char 
*data, int len,
                                             struct s3erofs_object_iterator *it)
 {
        xmlNodePtr root = NULL, node, next;
-       int ret, i, contents_count;
+       int ret, i, j, contents_count;
        xmlDocPtr doc = NULL;
        xmlChar *str;
        void *tmp;
@@ -398,6 +539,7 @@ static int s3erofs_parse_list_objects_result(const char 
*data, int len,
                it->objects[0].key = NULL;
        }
        it->cur = 0;
+       it->getobj_idx = 0;
 
        ret = 0;
        for (i = 0, node = root->children; node; node = node->next) {
@@ -428,6 +570,26 @@ static int s3erofs_parse_list_objects_result(const char 
*data, int len,
                        ret = -ENOMEM;
        }
 
+       if (!ret && i && it->multi_handle) {
+               j = 0;
+               while (it->getobj_idx < i && j < S3EROFS_MAX_GETOBJECT_CONCUR) {
+                       if (it->objects[it->getobj_idx].size == 0) {
+                               it->getobj_idx++;
+                               continue;
+                       }
+
+                       ret = s3erofs_add_getobject_task(it, it->objects + 
it->getobj_idx,
+                                                        j++);
+                       if (ret < 0) {
+                               erofs_err("failed to add download task for 
object %s: %s",
+                                         it->objects[it->getobj_idx].key,
+                                         erofs_strerror(ret));
+                               goto out;
+                       }
+                       it->getobj_idx++;
+               }
+       }
+
        if (!ret)
                ret = i;
 out:
@@ -490,10 +652,11 @@ static int s3erofs_list_objects(struct 
s3erofs_object_iterator *it)
 
 static struct s3erofs_object_iterator *
 s3erofs_create_object_iterator(struct erofs_s3 *s3, const char *path,
-                              const char *delimiter)
+                              const char *delimiter, bool fillzero)
 {
        struct s3erofs_object_iterator *iter;
        char *prefix;
+       int ret = 0;
 
        iter = calloc(1, sizeof(struct s3erofs_object_iterator));
        if (!iter)
@@ -501,8 +664,10 @@ s3erofs_create_object_iterator(struct erofs_s3 *s3, const 
char *path,
        iter->s3 = s3;
        prefix = strchr(path, '/');
        if (prefix) {
-               if (++prefix - path > S3EROFS_PATH_MAX)
-                       return ERR_PTR(-EINVAL);
+               if (++prefix - path > S3EROFS_PATH_MAX) {
+                       ret = -EINVAL;
+                       goto err_iter;
+               }
                iter->bucket = strndup(path, prefix - path);
                iter->prefix = strdup(prefix);
        } else {
@@ -511,18 +676,60 @@ s3erofs_create_object_iterator(struct erofs_s3 *s3, const 
char *path,
        }
        iter->delimiter = delimiter;
        iter->is_truncated = true;
+       if (!fillzero) {
+               iter->getobj_idx = 0;
+               iter->getobj_tasks = calloc(S3EROFS_MAX_GETOBJECT_CONCUR,
+                                           sizeof(struct 
s3erofs_getobject_task));
+               if (!iter->getobj_tasks) {
+                       ret = -ENOMEM;
+                       goto err_prefix;
+               }
+               iter->multi_handle = curl_multi_init();
+               if (!iter->multi_handle) {
+                       ret = -EIO;
+                       free(iter->getobj_tasks);
+                       goto err_prefix;
+               }
+       }
        return iter;
+
+err_prefix:
+       if (iter->bucket)
+               free(iter->bucket);
+       if (iter->prefix)
+               free(iter->prefix);
+err_iter:
+       free(iter);
+       return ERR_PTR(ret);
 }
 
 static void s3erofs_destroy_object_iterator(struct s3erofs_object_iterator *it)
 {
+       struct s3erofs_getobject_task *task;
        int i;
 
+       if (it->getobj_tasks) {
+               for (i = 0; i < S3EROFS_MAX_GETOBJECT_CONCUR; i++) {
+                       task = &it->getobj_tasks[i];
+                       if (task->easy_handle)
+                               curl_easy_cleanup(task->easy_handle);
+                       if (task->request_headers)
+                               curl_slist_free_all(task->request_headers);
+               }
+               free(it->getobj_tasks);
+       }
+       if (it->multi_handle)
+               curl_multi_cleanup(it->multi_handle);
        if (it->next_marker)
                free(it->next_marker);
        if (it->objects) {
-               for (i = 0; it->objects[i].key; ++i)
+               for (i = 0; it->objects[i].key; ++i) {
                        free(it->objects[i].key);
+                       if (it->objects[i].diskbuf) {
+                               erofs_diskbuf_close(it->objects[i].diskbuf);
+                               free(it->objects[i].diskbuf);
+                       }
+               }
                free(it->objects);
        }
        free(it->prefix);
@@ -547,125 +754,79 @@ s3erofs_get_next_object(struct s3erofs_object_iterator 
*it)
        return NULL;
 }
 
-static int s3erofs_curl_easy_init(struct erofs_s3 *s3)
-{
-       CURL *curl;
-
-       curl = curl_easy_init();
-       if (!curl)
-               return -ENOMEM;
-
-       if (curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L) != CURLE_OK)
-               goto out_cleanup;
-
-       if (curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, 30L) != CURLE_OK)
-               goto out_cleanup;
-
-       if (curl_easy_setopt(curl, CURLOPT_USERAGENT,
-                            "s3erofs/" PACKAGE_VERSION) != CURLE_OK)
-               goto out_cleanup;
-
-       s3->easy_curl = curl;
-       return 0;
-out_cleanup:
-       curl_easy_cleanup(curl);
-       return -EFAULT;
-}
-
-static void s3erofs_curl_easy_exit(struct erofs_s3 *s3)
+static void s3erofs_handover_object_data(struct erofs_inode *inode,
+                                        struct s3erofs_object_info *obj)
 {
-       if (!s3->easy_curl)
-               return;
-       curl_easy_cleanup(s3->easy_curl);
-       s3->easy_curl = NULL;
+       inode->datasource = EROFS_INODE_DATA_SOURCE_DISKBUF;
+       inode->i_diskbuf = obj->diskbuf;
+       obj->diskbuf = NULL;
 }
 
-struct s3erofs_curl_getobject_resp {
-       struct erofs_vfile *vf;
-       erofs_off_t pos, end;
-};
-
-static size_t s3erofs_remote_getobject_cb(void *contents, size_t size,
-                                         size_t nmemb, void *userp)
+static int s3erofs_remote_getobject(struct s3erofs_object_iterator *it,
+                                   struct s3erofs_object_info *obj,
+                                   struct erofs_inode *inode)
 {
-       struct s3erofs_curl_getobject_resp *resp = userp;
-       size_t realsize = size * nmemb;
+       int running_handles, msgs_in_queue;
+       CURLMsg *msg;
+       CURL *easy_handle;
+       struct s3erofs_getobject_task *task;
+       bool found = false;
+       int ret = 0;
 
-       if (resp->pos + realsize > resp->end ||
-           erofs_io_pwrite(resp->vf, contents, resp->pos, realsize) != 
realsize)
+       if (obj->downloaded) {
+               s3erofs_handover_object_data(inode, obj);
                return 0;
+       }
 
-       resp->pos += realsize;
-       return realsize;
-}
+       curl_multi_perform(it->multi_handle, &running_handles);
+       while (!found) {
+               ret = curl_multi_wait(it->multi_handle, NULL, 0, 1000, NULL);
+               if (ret != CURLM_OK) {
+                       erofs_err("curl_multi_wait() failed: %s",
+                                 curl_multi_strerror(ret));
+                       return -EIO;
+               }
 
-static int s3erofs_remote_getobject(struct erofs_importer *im,
-                                   struct erofs_s3 *s3,
-                                   struct erofs_inode *inode,
-                                   const char *bucket, const char *key)
-{
-       struct erofs_sb_info *sbi = inode->sbi;
-       struct s3erofs_curl_request req = {};
-       struct s3erofs_curl_getobject_resp resp;
-       struct s3erofs_query_params params;
-       struct erofs_vfile vf;
-       int ret;
+               curl_multi_perform(it->multi_handle, &running_handles);
 
-       params.num = 0;
-       req.method = "GET";
-       ret = s3erofs_prepare_url(&req, s3->endpoint, bucket, key,
-                                 &params, s3->url_style);
-       if (ret < 0)
-               return ret;
+               while ((msg = curl_multi_info_read(it->multi_handle, 
&msgs_in_queue))) {
+                       if (msg->msg != CURLMSG_DONE)
+                               continue;
 
-       if (curl_easy_setopt(s3->easy_curl, CURLOPT_WRITEFUNCTION,
-                            s3erofs_remote_getobject_cb) != CURLE_OK)
-               return -EIO;
+                       easy_handle = msg->easy_handle;
 
-       resp.pos = 0;
-       if (!cfg.c_compr_opts[0].alg && im->params->no_datainline) {
-               inode->datalayout = EROFS_INODE_FLAT_PLAIN;
-               inode->idata_size = 0;
-               ret = erofs_allocate_inode_bh_data(inode,
-                               DIV_ROUND_UP(inode->i_size, 1U << 
sbi->blkszbits));
-               if (ret)
-                       return ret;
-               resp.vf = &sbi->bdev;
-               resp.pos = erofs_pos(inode->sbi, inode->u.i_blkaddr);
-               inode->datasource = EROFS_INODE_DATA_SOURCE_NONE;
-       } else {
-               u64 off;
+                       task = NULL;
+                       curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &task);
+                       if (!task) {
+                               erofs_err("failed to get private data from curl 
handle");
+                               curl_multi_remove_handle(it->multi_handle, 
easy_handle);
+                               return -EIO;
+                       }
 
-               if (!inode->i_diskbuf) {
-                       inode->i_diskbuf = calloc(1, sizeof(*inode->i_diskbuf));
-                       if (!inode->i_diskbuf)
-                               return -ENOSPC;
-               } else {
-                       erofs_diskbuf_close(inode->i_diskbuf);
-               }
+                       ret = msg->data.result;
+                       if (ret != CURLE_OK) {
+                               erofs_err("getobject failed for object %s: %s",
+                                         task->obj->key, 
curl_easy_strerror(ret));
+                               curl_multi_remove_handle(it->multi_handle, 
easy_handle);
+                               return -EIO;
+                       }
 
-               vf = (struct erofs_vfile) {.fd =
-                       erofs_diskbuf_reserve(inode->i_diskbuf, 0, &off)};
-               if (vf.fd < 0)
-                       return -EBADF;
-               resp.pos = off;
-               resp.vf = &vf;
-               inode->datasource = EROFS_INODE_DATA_SOURCE_DISKBUF;
-       }
-       resp.end = resp.pos + inode->i_size;
+                       curl_multi_remove_handle(it->multi_handle, easy_handle);
+                       task->obj->downloaded = true;
+                       
+                       if (strcmp(task->obj->key, obj->key) == 0) {
+                               s3erofs_handover_object_data(inode, task->obj);
+                               found = true;
+                       }
 
-       ret = s3erofs_request_perform(s3, &req, &resp);
-       if (resp.vf == &vf) {
-               erofs_diskbuf_commit(inode->i_diskbuf, resp.end - resp.pos);
-               if (ret) {
-                       erofs_diskbuf_close(inode->i_diskbuf);
-                       inode->i_diskbuf = NULL;
-                       inode->datasource = EROFS_INODE_DATA_SOURCE_NONE;
+                       if (it->objects[it->getobj_idx].key) {
+                               s3erofs_add_getobject_task(
+                                       it, &it->objects[it->getobj_idx++], 
task->index);
+                       }
                }
        }
-       if (ret)
-               return ret;
-       return resp.pos != resp.end ? -EIO : 0;
+
+       return ret;
 }
 
 int s3erofs_build_trees(struct erofs_importer *im, struct erofs_s3 *s3,
@@ -685,13 +846,14 @@ int s3erofs_build_trees(struct erofs_importer *im, struct 
erofs_s3 *s3,
        st.st_uid = root->i_uid;
        st.st_gid = root->i_gid;
 
-       ret = s3erofs_curl_easy_init(s3);
-       if (ret) {
+       s3->easy_curl = s3erofs_curl_easy_init();
+       if (!s3->easy_curl) {
+               ret = PTR_ERR(s3->easy_curl);
                erofs_err("failed to initialize s3erofs: %s", 
erofs_strerror(ret));
                return ret;
        }
 
-       iter = s3erofs_create_object_iterator(s3, path, NULL);
+       iter = s3erofs_create_object_iterator(s3, path, NULL, fillzero);
        if (IS_ERR(iter)) {
                erofs_err("failed to create object iterator");
                ret = PTR_ERR(iter);
@@ -747,11 +909,10 @@ int s3erofs_build_trees(struct erofs_importer *im, struct 
erofs_s3 *s3,
                ret = __erofs_fill_inode(im, inode, &st, obj->key);
                if (!ret && S_ISREG(inode->i_mode)) {
                        inode->i_size = obj->size;
-                       if (fillzero)
+                       if (fillzero || !inode->i_size)
                                ret = erofs_write_zero_inode(inode);
                        else
-                               ret = s3erofs_remote_getobject(im, s3, inode,
-                                               iter->bucket, obj->key);
+                               ret = s3erofs_remote_getobject(iter, obj, 
inode);
                }
                if (ret)
                        goto err_iter;
@@ -760,6 +921,7 @@ int s3erofs_build_trees(struct erofs_importer *im, struct 
erofs_s3 *s3,
 err_iter:
        s3erofs_destroy_object_iterator(iter);
 err_global:
-       s3erofs_curl_easy_exit(s3);
+       curl_easy_cleanup(s3->easy_curl);
+       s3->easy_curl = NULL;
        return ret;
 }
-- 
2.46.0


Reply via email to