On Wed, Nov 17, 2021 at 08:04:41PM +0000, Jacob Champion wrote: > On Wed, 2021-11-17 at 14:44 -0500, Jaime Casanova wrote: > > I'm trying to add more parallelism by copying individual segments > > of a relfilenode in different processes. Does anyone one see a big > > problem in trying to do that? I'm asking because no one did it before, > > that could not be a good sign. > > I looked into speeding this up a while back, too. For the use case I > was looking at -- Greenplum, which has huge numbers of relfilenodes -- > spinning disk I/O was absolutely the bottleneck and that is typically > not easily parallelizable. (In fact I felt at the time that Andres' > work on async I/O might be a better way forward, at least for some > filesystems.) > > But you mentioned that you were seeing disks that weren't saturated, so > maybe some CPU optimization is still valuable? I am a little skeptical > that more parallelism is the way to do that, but numbers trump my > skepticism. >
Sorry for being unresponsive too long. I did add a new --jobs-per-disk option, this is a simple patch I made for the customer and ignored all WIN32 parts because I don't know anything about that part. I was wanting to complete that part but it has been in the same state two months now. AFAIU, it seems there is a different struct for the parameters of the function that will be called on the thread. I also decided to create a new reap_*_child() function for using with the new parameter. Now, the customer went from copy 25Tb in 6 hours to 4h 45min, which is an improvement of 20%! > > - why we read()/write() at all? is not a faster way of copying the file? > > i'm asking that because i don't actually know. > > I have idly wondered if something based on splice() would be faster, > but I haven't actually tried it. > I tried and got no better result. > But there is now support for copy-on-write with the clone mode, isn't > there? Or are you not able to take advantage of it? > That's sadly not possible because those are different disks, and yes I know that's something that pg_upgrade normally doesn't allow but is not difficult to make it happen. -- Jaime Casanova Director de Servicios Profesionales SystemGuards - Consultores de PostgreSQL
>From 0d04f79cb51d6be0ced9c6561cfca5bfe18c4bdd Mon Sep 17 00:00:00 2001 From: Jaime Casanova <jcasa...@systemguards.com.ec> Date: Wed, 15 Dec 2021 12:14:44 -0500 Subject: [PATCH] Add --jobs-per-disk option to allow multiple processes per tablespace This option is independent of the --jobs one. It's will fork new processes to copy the different segments of a relfilenode in parallel. --- src/bin/pg_upgrade/option.c | 8 ++- src/bin/pg_upgrade/parallel.c | 93 ++++++++++++++++++++++++++++++++ src/bin/pg_upgrade/pg_upgrade.h | 4 ++ src/bin/pg_upgrade/relfilenode.c | 59 +++++++++++--------- 4 files changed, 139 insertions(+), 25 deletions(-) diff --git a/src/bin/pg_upgrade/option.c b/src/bin/pg_upgrade/option.c index 66fe16964e..46b1913a42 100644 --- a/src/bin/pg_upgrade/option.c +++ b/src/bin/pg_upgrade/option.c @@ -54,6 +54,7 @@ parseCommandLine(int argc, char *argv[]) {"link", no_argument, NULL, 'k'}, {"retain", no_argument, NULL, 'r'}, {"jobs", required_argument, NULL, 'j'}, + {"jobs-per-disks", required_argument, NULL, 'J'}, {"socketdir", required_argument, NULL, 's'}, {"verbose", no_argument, NULL, 'v'}, {"clone", no_argument, NULL, 1}, @@ -103,7 +104,7 @@ parseCommandLine(int argc, char *argv[]) if (os_user_effective_id == 0) pg_fatal("%s: cannot be run as root\n", os_info.progname); - while ((option = getopt_long(argc, argv, "d:D:b:B:cj:kNo:O:p:P:rs:U:v", + while ((option = getopt_long(argc, argv, "d:D:b:B:cj:J:kNo:O:p:P:rs:U:v", long_options, &optindex)) != -1) { switch (option) @@ -132,6 +133,10 @@ parseCommandLine(int argc, char *argv[]) user_opts.jobs = atoi(optarg); break; + case 'J': + user_opts.jobs_per_disk = atoi(optarg); + break; + case 'k': user_opts.transfer_mode = TRANSFER_MODE_LINK; break; @@ -291,6 +296,7 @@ usage(void) printf(_(" -d, --old-datadir=DATADIR old cluster data directory\n")); printf(_(" -D, --new-datadir=DATADIR new cluster data directory\n")); printf(_(" -j, --jobs=NUM number of simultaneous processes or threads to use\n")); + printf(_(" -J, --jobs_per_disk=NUM number of simultaneous processes or threads to use per tablespace\n")); printf(_(" -k, --link link instead of copying files to new cluster\n")); printf(_(" -N, --no-sync do not wait for changes to be written safely to disk\n")); printf(_(" -o, --old-options=OPTIONS old cluster options to pass to the server\n")); diff --git a/src/bin/pg_upgrade/parallel.c b/src/bin/pg_upgrade/parallel.c index ee7364da3b..82f698a9ab 100644 --- a/src/bin/pg_upgrade/parallel.c +++ b/src/bin/pg_upgrade/parallel.c @@ -17,6 +17,9 @@ #include "pg_upgrade.h" static int parallel_jobs; +static int current_jobs = 0; + +static bool reap_subchild(bool wait_for_child); #ifdef WIN32 /* @@ -277,6 +280,60 @@ win32_transfer_all_new_dbs(transfer_thread_arg *args) #endif + +/* + * parallel_process_relfile_segment() + * + * Copy or link file from old cluster to new one. If vm_must_add_frozenbit + * is true, visibility map forks are converted and rewritten, even in link + * mode. + */ +void +parallel_process_relfile_segment(FileNameMap *map, const char *type_suffix, bool vm_must_add_frozenbit, const char *old_file, const char *new_file) +{ +#ifndef WIN32 + pid_t child; +#else + HANDLE child; + transfer_thread_arg *new_arg; +#endif + if (user_opts.jobs <= 1 || user_opts.jobs_per_disk <= 1) + process_relfile_segment(map, type_suffix, vm_must_add_frozenbit, old_file, new_file); + else + { + /* parallel */ + + /* harvest any dead children */ + while (reap_subchild(false) == true) + ; + + /* must we wait for a dead child? use a maximum of 3 childs per tablespace */ + if (current_jobs >= user_opts.jobs_per_disk) + reap_subchild(true); + + /* set this before we start the job */ + current_jobs++; + + /* Ensure stdio state is quiesced before forking */ + fflush(NULL); + +#ifndef WIN32 + child = fork(); + if (child == 0) + { + process_relfile_segment(map, type_suffix, vm_must_add_frozenbit, old_file, new_file); + /* use _exit to skip atexit() functions */ + _exit(0); + } + else if (child < 0) + /* fork failed */ + pg_fatal("could not create worker process: %s\n", strerror(errno)); +#endif + } +} + + + /* * collect status from a completed worker child */ @@ -345,3 +402,39 @@ reap_child(bool wait_for_child) return true; } + + + + +/* + * collect status from a completed worker subchild + */ +static bool +reap_subchild(bool wait_for_child) +{ +#ifndef WIN32 + int work_status; + pid_t child; +#else + int thread_num; + DWORD res; +#endif + + if (user_opts.jobs <= 1 || current_jobs == 0) + return false; + +#ifndef WIN32 + child = waitpid(-1, &work_status, wait_for_child ? 0 : WNOHANG); + if (child == (pid_t) -1) + pg_fatal("waitpid() failed: %s\n", strerror(errno)); + if (child == 0) + return false; /* no children, or no dead children */ + if (work_status != 0) + pg_fatal("child process exited abnormally: status %d\n", work_status); +#endif + + /* do this after job has been removed */ + current_jobs--; + + return true; +} diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h index 22169f1002..adcb24ffea 100644 --- a/src/bin/pg_upgrade/pg_upgrade.h +++ b/src/bin/pg_upgrade/pg_upgrade.h @@ -282,6 +282,7 @@ typedef struct bool do_sync; /* flush changes to disk */ transferMode transfer_mode; /* copy files or link them? */ int jobs; /* number of processes/threads to use */ + int jobs_per_disk; /* number of processes/threads to use */ char *socketdir; /* directory to use for Unix sockets */ } UserOpts; @@ -450,4 +451,7 @@ void parallel_exec_prog(const char *log_file, const char *opt_log_file, void parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr, char *old_pgdata, char *new_pgdata, char *old_tablespace); + +void process_relfile_segment(FileNameMap *map, const char *suffix, bool vm_must_add_frozenbit, const char *old_file, const char *new_file); +void parallel_process_relfile_segment(FileNameMap *map, const char *suffix, bool vm_must_add_frozenbit, const char *old_file, const char *new_file); bool reap_child(bool wait_for_child); diff --git a/src/bin/pg_upgrade/relfilenode.c b/src/bin/pg_upgrade/relfilenode.c index 5dbefbceaf..8a7c49efaa 100644 --- a/src/bin/pg_upgrade/relfilenode.c +++ b/src/bin/pg_upgrade/relfilenode.c @@ -17,6 +17,7 @@ static void transfer_single_new_db(FileNameMap *maps, int size, char *old_tablespace); static void transfer_relfile(FileNameMap *map, const char *suffix, bool vm_must_add_frozenbit); +void process_relfile_segment(FileNameMap *map, const char *suffix, bool vm_must_add_frozenbit, const char *old_file, const char *new_file); /* @@ -232,30 +233,40 @@ transfer_relfile(FileNameMap *map, const char *type_suffix, bool vm_must_add_fro /* Copying files might take some time, so give feedback. */ pg_log(PG_STATUS, "%s", old_file); - if (vm_must_add_frozenbit && strcmp(type_suffix, "_vm") == 0) + parallel_process_relfile_segment(map, type_suffix, vm_must_add_frozenbit, old_file, new_file); + } +} + + + +void +process_relfile_segment(FileNameMap *map, const char *type_suffix, bool vm_must_add_frozenbit, const char *old_file, const char *new_file) +{ + + if (vm_must_add_frozenbit && strcmp(type_suffix, "_vm") == 0) + { + /* Need to rewrite visibility map format */ + pg_log(PG_VERBOSE, "rewriting \"%s\" to \"%s\"\n", + old_file, new_file); + rewriteVisibilityMap(old_file, new_file, map->nspname, map->relname); + } + else + switch (user_opts.transfer_mode) { - /* Need to rewrite visibility map format */ - pg_log(PG_VERBOSE, "rewriting \"%s\" to \"%s\"\n", - old_file, new_file); - rewriteVisibilityMap(old_file, new_file, map->nspname, map->relname); + case TRANSFER_MODE_CLONE: + pg_log(PG_VERBOSE, "cloning \"%s\" to \"%s\"\n", + old_file, new_file); + cloneFile(old_file, new_file, map->nspname, map->relname); + break; + case TRANSFER_MODE_COPY: + pg_log(PG_VERBOSE, "copying \"%s\" to \"%s\"\n", + old_file, new_file); + copyFile(old_file, new_file, map->nspname, map->relname); + break; + case TRANSFER_MODE_LINK: + pg_log(PG_VERBOSE, "linking \"%s\" to \"%s\"\n", + old_file, new_file); + linkFile(old_file, new_file, map->nspname, map->relname); + break; } - else - switch (user_opts.transfer_mode) - { - case TRANSFER_MODE_CLONE: - pg_log(PG_VERBOSE, "cloning \"%s\" to \"%s\"\n", - old_file, new_file); - cloneFile(old_file, new_file, map->nspname, map->relname); - break; - case TRANSFER_MODE_COPY: - pg_log(PG_VERBOSE, "copying \"%s\" to \"%s\"\n", - old_file, new_file); - copyFile(old_file, new_file, map->nspname, map->relname); - break; - case TRANSFER_MODE_LINK: - pg_log(PG_VERBOSE, "linking \"%s\" to \"%s\"\n", - old_file, new_file); - linkFile(old_file, new_file, map->nspname, map->relname); - } - } } -- 2.20.1