On Wed, Nov 17, 2021 at 08:04:41PM +0000, Jacob Champion wrote:
> On Wed, 2021-11-17 at 14:44 -0500, Jaime Casanova wrote:
> > I'm trying to add more parallelism by copying individual segments
> > of a relfilenode in different processes. Does anyone one see a big
> > problem in trying to do that? I'm asking because no one did it before,
> > that could not be a good sign.
> 
> I looked into speeding this up a while back, too. For the use case I
> was looking at -- Greenplum, which has huge numbers of relfilenodes --
> spinning disk I/O was absolutely the bottleneck and that is typically
> not easily parallelizable. (In fact I felt at the time that Andres'
> work on async I/O might be a better way forward, at least for some
> filesystems.)
> 
> But you mentioned that you were seeing disks that weren't saturated, so
> maybe some CPU optimization is still valuable? I am a little skeptical
> that more parallelism is the way to do that, but numbers trump my
> skepticism.
> 

Sorry for being unresponsive too long. I did add a new --jobs-per-disk
option, this is a simple patch I made for the customer and ignored all
WIN32 parts because I don't know anything about that part. I was wanting
to complete that part but it has been in the same state two months now.

AFAIU, it seems there is a different struct for the parameters of the
function that will be called on the thread.

I also decided to create a new reap_*_child() function for using with
the new parameter.

Now, the customer went from copy 25Tb in 6 hours to 4h 45min, which is
an improvement of 20%!


> > - why we read()/write() at all? is not a faster way of copying the file?
> >   i'm asking that because i don't actually know.
> 
> I have idly wondered if something based on splice() would be faster,
> but I haven't actually tried it.
> 

I tried and got no better result.

> But there is now support for copy-on-write with the clone mode, isn't
> there? Or are you not able to take advantage of it?
> 

That's sadly not possible because those are different disks, and yes I
know that's something that pg_upgrade normally doesn't allow but is not
difficult to make it happen.

-- 
Jaime Casanova
Director de Servicios Profesionales
SystemGuards - Consultores de PostgreSQL
>From 0d04f79cb51d6be0ced9c6561cfca5bfe18c4bdd Mon Sep 17 00:00:00 2001
From: Jaime Casanova <jcasa...@systemguards.com.ec>
Date: Wed, 15 Dec 2021 12:14:44 -0500
Subject: [PATCH] Add --jobs-per-disk option to allow multiple processes per
 tablespace

This option is independent of the --jobs one. It's will fork new processes
to copy the different segments of a relfilenode in parallel.
---
 src/bin/pg_upgrade/option.c      |  8 ++-
 src/bin/pg_upgrade/parallel.c    | 93 ++++++++++++++++++++++++++++++++
 src/bin/pg_upgrade/pg_upgrade.h  |  4 ++
 src/bin/pg_upgrade/relfilenode.c | 59 +++++++++++---------
 4 files changed, 139 insertions(+), 25 deletions(-)

diff --git a/src/bin/pg_upgrade/option.c b/src/bin/pg_upgrade/option.c
index 66fe16964e..46b1913a42 100644
--- a/src/bin/pg_upgrade/option.c
+++ b/src/bin/pg_upgrade/option.c
@@ -54,6 +54,7 @@ parseCommandLine(int argc, char *argv[])
 		{"link", no_argument, NULL, 'k'},
 		{"retain", no_argument, NULL, 'r'},
 		{"jobs", required_argument, NULL, 'j'},
+		{"jobs-per-disks", required_argument, NULL, 'J'},
 		{"socketdir", required_argument, NULL, 's'},
 		{"verbose", no_argument, NULL, 'v'},
 		{"clone", no_argument, NULL, 1},
@@ -103,7 +104,7 @@ parseCommandLine(int argc, char *argv[])
 	if (os_user_effective_id == 0)
 		pg_fatal("%s: cannot be run as root\n", os_info.progname);
 
-	while ((option = getopt_long(argc, argv, "d:D:b:B:cj:kNo:O:p:P:rs:U:v",
+	while ((option = getopt_long(argc, argv, "d:D:b:B:cj:J:kNo:O:p:P:rs:U:v",
 								 long_options, &optindex)) != -1)
 	{
 		switch (option)
@@ -132,6 +133,10 @@ parseCommandLine(int argc, char *argv[])
 				user_opts.jobs = atoi(optarg);
 				break;
 
+			case 'J':
+				user_opts.jobs_per_disk = atoi(optarg);
+				break;
+
 			case 'k':
 				user_opts.transfer_mode = TRANSFER_MODE_LINK;
 				break;
@@ -291,6 +296,7 @@ usage(void)
 	printf(_("  -d, --old-datadir=DATADIR     old cluster data directory\n"));
 	printf(_("  -D, --new-datadir=DATADIR     new cluster data directory\n"));
 	printf(_("  -j, --jobs=NUM                number of simultaneous processes or threads to use\n"));
+	printf(_("  -J, --jobs_per_disk=NUM       number of simultaneous processes or threads to use per tablespace\n"));
 	printf(_("  -k, --link                    link instead of copying files to new cluster\n"));
 	printf(_("  -N, --no-sync                 do not wait for changes to be written safely to disk\n"));
 	printf(_("  -o, --old-options=OPTIONS     old cluster options to pass to the server\n"));
diff --git a/src/bin/pg_upgrade/parallel.c b/src/bin/pg_upgrade/parallel.c
index ee7364da3b..82f698a9ab 100644
--- a/src/bin/pg_upgrade/parallel.c
+++ b/src/bin/pg_upgrade/parallel.c
@@ -17,6 +17,9 @@
 #include "pg_upgrade.h"
 
 static int	parallel_jobs;
+static int	current_jobs = 0;
+
+static bool      reap_subchild(bool wait_for_child);
 
 #ifdef WIN32
 /*
@@ -277,6 +280,60 @@ win32_transfer_all_new_dbs(transfer_thread_arg *args)
 #endif
 
 
+
+/*
+ * parallel_process_relfile_segment()
+ *
+ * Copy or link file from old cluster to new one.  If vm_must_add_frozenbit
+ * is true, visibility map forks are converted and rewritten, even in link
+ * mode.
+ */
+void
+parallel_process_relfile_segment(FileNameMap *map, const char *type_suffix, bool vm_must_add_frozenbit, const char *old_file, const char *new_file)
+{
+#ifndef WIN32
+	pid_t		child;
+#else
+	HANDLE		child;
+	transfer_thread_arg *new_arg;
+#endif
+	if (user_opts.jobs <= 1 || user_opts.jobs_per_disk <= 1)
+		process_relfile_segment(map, type_suffix, vm_must_add_frozenbit, old_file, new_file);
+	else
+	{
+		/* parallel */
+
+		/* harvest any dead children */
+		while (reap_subchild(false) == true)
+			;
+
+		/* must we wait for a dead child? use a maximum of 3 childs per tablespace */
+		if (current_jobs >= user_opts.jobs_per_disk)
+			reap_subchild(true);
+
+		/* set this before we start the job */
+		current_jobs++;
+
+		/* Ensure stdio state is quiesced before forking */
+		fflush(NULL);
+
+#ifndef WIN32
+		child = fork();
+		if (child == 0)
+		{
+			process_relfile_segment(map, type_suffix, vm_must_add_frozenbit, old_file, new_file);
+			/* use _exit to skip atexit() functions */
+			_exit(0);
+		}
+		else if (child < 0)
+			/* fork failed */
+			pg_fatal("could not create worker process: %s\n", strerror(errno));
+#endif
+	}
+}
+
+
+
 /*
  *	collect status from a completed worker child
  */
@@ -345,3 +402,39 @@ reap_child(bool wait_for_child)
 
 	return true;
 }
+
+
+
+
+/*
+ *	collect status from a completed worker subchild
+ */
+static bool
+reap_subchild(bool wait_for_child)
+{
+#ifndef WIN32
+	int			work_status;
+	pid_t		child;
+#else
+	int			thread_num;
+	DWORD		res;
+#endif
+
+	if (user_opts.jobs <= 1 || current_jobs == 0)
+		return false;
+
+#ifndef WIN32
+	child = waitpid(-1, &work_status, wait_for_child ? 0 : WNOHANG);
+	if (child == (pid_t) -1)
+		pg_fatal("waitpid() failed: %s\n", strerror(errno));
+	if (child == 0)
+		return false;			/* no children, or no dead children */
+	if (work_status != 0)
+		pg_fatal("child process exited abnormally: status %d\n", work_status);
+#endif
+
+	/* do this after job has been removed */
+	current_jobs--;
+
+	return true;
+}
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index 22169f1002..adcb24ffea 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -282,6 +282,7 @@ typedef struct
 	bool		do_sync;		/* flush changes to disk */
 	transferMode transfer_mode; /* copy files or link them? */
 	int			jobs;			/* number of processes/threads to use */
+	int			jobs_per_disk;			/* number of processes/threads to use */
 	char	   *socketdir;		/* directory to use for Unix sockets */
 } UserOpts;
 
@@ -450,4 +451,7 @@ void		parallel_exec_prog(const char *log_file, const char *opt_log_file,
 void		parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr,
 										  char *old_pgdata, char *new_pgdata,
 										  char *old_tablespace);
+
+void 		process_relfile_segment(FileNameMap *map, const char *suffix, bool vm_must_add_frozenbit, const char *old_file, const char *new_file);
+void 		parallel_process_relfile_segment(FileNameMap *map, const char *suffix, bool vm_must_add_frozenbit, const char *old_file, const char *new_file);
 bool		reap_child(bool wait_for_child);
diff --git a/src/bin/pg_upgrade/relfilenode.c b/src/bin/pg_upgrade/relfilenode.c
index 5dbefbceaf..8a7c49efaa 100644
--- a/src/bin/pg_upgrade/relfilenode.c
+++ b/src/bin/pg_upgrade/relfilenode.c
@@ -17,6 +17,7 @@
 
 static void transfer_single_new_db(FileNameMap *maps, int size, char *old_tablespace);
 static void transfer_relfile(FileNameMap *map, const char *suffix, bool vm_must_add_frozenbit);
+void process_relfile_segment(FileNameMap *map, const char *suffix, bool vm_must_add_frozenbit, const char *old_file, const char *new_file);
 
 
 /*
@@ -232,30 +233,40 @@ transfer_relfile(FileNameMap *map, const char *type_suffix, bool vm_must_add_fro
 		/* Copying files might take some time, so give feedback. */
 		pg_log(PG_STATUS, "%s", old_file);
 
-		if (vm_must_add_frozenbit && strcmp(type_suffix, "_vm") == 0)
+		parallel_process_relfile_segment(map, type_suffix, vm_must_add_frozenbit, old_file, new_file);
+	}
+}
+
+
+
+void
+process_relfile_segment(FileNameMap *map, const char *type_suffix, bool vm_must_add_frozenbit, const char *old_file, const char *new_file)
+{
+
+	if (vm_must_add_frozenbit && strcmp(type_suffix, "_vm") == 0)
+	{
+		/* Need to rewrite visibility map format */
+		pg_log(PG_VERBOSE, "rewriting \"%s\" to \"%s\"\n",
+			   old_file, new_file);
+		rewriteVisibilityMap(old_file, new_file, map->nspname, map->relname);
+	}
+	else
+		switch (user_opts.transfer_mode)
 		{
-			/* Need to rewrite visibility map format */
-			pg_log(PG_VERBOSE, "rewriting \"%s\" to \"%s\"\n",
-				   old_file, new_file);
-			rewriteVisibilityMap(old_file, new_file, map->nspname, map->relname);
+			case TRANSFER_MODE_CLONE:
+				pg_log(PG_VERBOSE, "cloning \"%s\" to \"%s\"\n",
+					   old_file, new_file);
+				cloneFile(old_file, new_file, map->nspname, map->relname);
+				break;
+			case TRANSFER_MODE_COPY:
+				pg_log(PG_VERBOSE, "copying \"%s\" to \"%s\"\n",
+					   old_file, new_file);
+				copyFile(old_file, new_file, map->nspname, map->relname);
+				break;
+			case TRANSFER_MODE_LINK:
+				pg_log(PG_VERBOSE, "linking \"%s\" to \"%s\"\n",
+					   old_file, new_file);
+				linkFile(old_file, new_file, map->nspname, map->relname);
+				break;
 		}
-		else
-			switch (user_opts.transfer_mode)
-			{
-				case TRANSFER_MODE_CLONE:
-					pg_log(PG_VERBOSE, "cloning \"%s\" to \"%s\"\n",
-						   old_file, new_file);
-					cloneFile(old_file, new_file, map->nspname, map->relname);
-					break;
-				case TRANSFER_MODE_COPY:
-					pg_log(PG_VERBOSE, "copying \"%s\" to \"%s\"\n",
-						   old_file, new_file);
-					copyFile(old_file, new_file, map->nspname, map->relname);
-					break;
-				case TRANSFER_MODE_LINK:
-					pg_log(PG_VERBOSE, "linking \"%s\" to \"%s\"\n",
-						   old_file, new_file);
-					linkFile(old_file, new_file, map->nspname, map->relname);
-			}
-	}
 }
-- 
2.20.1

Reply via email to