Here's v23.

I reworked a number of things.  First, I changed trivial stuff like
grouping all the vacuuming options in a struct, to avoid passing an
excessive number of arguments to functions.  full, freeze, analyze_only,
and_analyze and verbose are all in a single struct now.  Also, the
stage_commands and stage_messages was duplicated by your patch; I moved
them to a file-level static struct.

I made prepare_command reset the string buffer and receive an optional
table name, so that it can append it to the generated command, and
append the semicolon as well.  Forcing the callers to reset the string
before calling, and having them add the table name and semicolon
afterwards was awkward and unnecessarily verbose.

You had a new in_abort() function in common.c which seems an unnecessary
layer; in its place I just exported the inAbort boolean flag it was
returning, and renamed to CancelRequested.

I was then troubled by the fact that vacuum_one_database() was being
called in a loop by main() when multiple tables are vacuumed, but
vacuum_parallel() was doing the loop internally.  I found this
discrepancy confusing, so I renamed that new function to
vacuum_one_database_parallel and modified the original
vacuum_one_database to do the loop internally as well.  Now they are, in
essence, a mirror of each other, one doing the parallel stuff and one
doing it serially.  This seems to make more sense to me -- but see
below.

I also modified some underlying stuff like GetIdleSlot returning a
ParallelSlot pointer instead of an array index.  Since its caller always
has to dereference the array with the given index, it makes more sense
to return the right element pointer instead, so I made it do that.
Also, that way, instead of returning NO_SLOT in case of error it can
just return NULL; no need for extra cognitive burden.

I also changed select_loop.  In your patch it had two implementations,
one WIN32 and another one for the rest.  It looks nicer to me to have
only one with small exceptions in the places that need it.  (I haven't
tested the WIN32 path.)  Also, instead of returning ERROR_IN_ABORT I
made it set a boolean flag in case of error, which seems cleaner.

I changed GetQueryResult as I described in a previous message.


There are two things that continue to bother me and I would like you,
dear patch author, to change them before committing this patch:

1. I don't like having vacuum_one_database() and a separate
vacuum_one_database_parallel().  I think we should merge them into one
function, which does either thing according to parameters.  There's
plenty in there that's duplicated.

2. in particular, the above means that run_parallel_vacuum can no longer
exist as it is.  Right now vacuum_one_database_parallel relies on
run_parallel_vacuum to do the actual job parallellization.  I would like
to have that looping in the improved vacuum_one_database() function
instead.


Looking forward to v24,

-- 
Álvaro Herrera                http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
diff --git a/doc/src/sgml/ref/vacuumdb.sgml b/doc/src/sgml/ref/vacuumdb.sgml
index 3ecd999..211235a 100644
--- a/doc/src/sgml/ref/vacuumdb.sgml
+++ b/doc/src/sgml/ref/vacuumdb.sgml
@@ -204,6 +204,25 @@ PostgreSQL documentation
      </varlistentry>
 
      <varlistentry>
+      <term><option>-j <replaceable class="parameter">jobs</replaceable></option></term>
+      <term><option>--jobs=<replaceable class="parameter">njobs</replaceable></option></term>
+      <listitem>
+       <para>
+        This option will enable the vacuum operation to run on concurrent
+        connections. Maximum number of tables can be vacuumed concurrently
+        is equal to number of jobs. If number of jobs given is more than
+        number of tables then number of jobs will be set to number of tables.
+       </para>
+       <para>
+        <application>vacuumdb</application> will open
+        <replaceable class="parameter"> njobs</replaceable> connections to the
+        database, so make sure your <xref linkend="guc-max-connections">
+        setting is high enough to accommodate all connections.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
       <term><option>--analyze-in-stages</option></term>
       <listitem>
        <para>
diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c
index d942a75..1bf7611 100644
--- a/src/bin/pg_dump/parallel.c
+++ b/src/bin/pg_dump/parallel.c
@@ -1160,7 +1160,7 @@ select_loop(int maxFd, fd_set *workerset)
 		i = select(maxFd + 1, workerset, NULL, NULL, NULL);
 
 		/*
-		 * If we Ctrl-C the master process , it's likely that we interrupt
+		 * If we Ctrl-C the master process, it's likely that we interrupt
 		 * select() here. The signal handler will set wantAbort == true and
 		 * the shutdown journey starts from here. Note that we'll come back
 		 * here later when we tell all workers to terminate and read their
diff --git a/src/bin/scripts/common.c b/src/bin/scripts/common.c
index 6bfe2e6..da142aa 100644
--- a/src/bin/scripts/common.c
+++ b/src/bin/scripts/common.c
@@ -19,10 +19,9 @@
 
 #include "common.h"
 
-static void SetCancelConn(PGconn *conn);
-static void ResetCancelConn(void);
 
 static PGcancel *volatile cancelConn = NULL;
+bool CancelRequested = false;
 
 #ifdef WIN32
 static CRITICAL_SECTION cancelConnLock;
@@ -291,7 +290,7 @@ yesno_prompt(const char *question)
  *
  * Set cancelConn to point to the current database connection.
  */
-static void
+void
 SetCancelConn(PGconn *conn)
 {
 	PGcancel   *oldCancelConn;
@@ -321,7 +320,7 @@ SetCancelConn(PGconn *conn)
  *
  * Free the current cancel connection, if any, and set to NULL.
  */
-static void
+void
 ResetCancelConn(void)
 {
 	PGcancel   *oldCancelConn;
@@ -345,9 +344,8 @@ ResetCancelConn(void)
 
 #ifndef WIN32
 /*
- * Handle interrupt signals by canceling the current command,
- * if it's being executed through executeMaintenanceCommand(),
- * and thus has a cancelConn set.
+ * Handle interrupt signals by canceling the current command, if a cancelConn
+ * is set.
  */
 static void
 handle_sigint(SIGNAL_ARGS)
@@ -359,10 +357,15 @@ handle_sigint(SIGNAL_ARGS)
 	if (cancelConn != NULL)
 	{
 		if (PQcancel(cancelConn, errbuf, sizeof(errbuf)))
+		{
+			CancelRequested = true;
 			fprintf(stderr, _("Cancel request sent\n"));
+		}
 		else
 			fprintf(stderr, _("Could not send cancel request: %s"), errbuf);
 	}
+	else
+		CancelRequested = true;
 
 	errno = save_errno;			/* just in case the write changed it */
 }
@@ -392,10 +395,16 @@ consoleHandler(DWORD dwCtrlType)
 		if (cancelConn != NULL)
 		{
 			if (PQcancel(cancelConn, errbuf, sizeof(errbuf)))
+			{
 				fprintf(stderr, _("Cancel request sent\n"));
+				CancelRequested = true;
+			}
 			else
 				fprintf(stderr, _("Could not send cancel request: %s"), errbuf);
 		}
+		else
+			CancelRequested = true;
+
 		LeaveCriticalSection(&cancelConnLock);
 
 		return TRUE;
diff --git a/src/bin/scripts/common.h b/src/bin/scripts/common.h
index c0c1715..b5ce1ed 100644
--- a/src/bin/scripts/common.h
+++ b/src/bin/scripts/common.h
@@ -21,6 +21,8 @@ enum trivalue
 	TRI_YES
 };
 
+extern bool CancelRequested;
+
 typedef void (*help_handler) (const char *progname);
 
 extern void handle_help_version_opts(int argc, char *argv[],
@@ -49,4 +51,8 @@ extern bool yesno_prompt(const char *question);
 
 extern void setup_cancel_handler(void);
 
+extern void SetCancelConn(PGconn *conn);
+extern void ResetCancelConn(void);
+
+
 #endif   /* COMMON_H */
diff --git a/src/bin/scripts/vacuumdb.c b/src/bin/scripts/vacuumdb.c
index 957fdb6..89af9d5 100644
--- a/src/bin/scripts/vacuumdb.c
+++ b/src/bin/scripts/vacuumdb.c
@@ -11,24 +11,108 @@
  */
 
 #include "postgres_fe.h"
+
 #include "common.h"
 #include "dumputils.h"
 
 
-static void vacuum_one_database(const char *dbname, bool full, bool verbose,
-	bool and_analyze, bool analyze_only, bool analyze_in_stages, int stage, bool freeze,
-					const char *table, const char *host, const char *port,
+#define ERRCODE_UNDEFINED_TABLE  "42P01"
+
+/* Parallel vacuuming stuff */
+typedef struct ParallelSlot
+{
+	PGconn *connection;
+	pgsocket sock;
+	bool isFree;
+} ParallelSlot;
+
+/* vacuum options controlled by user flags */
+typedef struct vacuumingOptions
+{
+	bool	analyze_only;
+	bool	verbose;
+	bool	and_analyze;
+	bool	full;
+	bool	freeze;
+} vacuumingOptions;
+
+
+static void vacuum_one_database(const char *dbname, vacuumingOptions *vacopts,
+					bool analyze_in_stages, int stage,
+					SimpleStringList *tables,
+					const char *host, const char *port,
 					const char *username, enum trivalue prompt_password,
 					const char *progname, bool echo, bool quiet);
-static void vacuum_all_databases(bool full, bool verbose, bool and_analyze,
-					 bool analyze_only, bool analyze_in_stages, bool freeze,
+static void vacuum_one_database_parallel(const char *dbname, vacuumingOptions *vacopts,
+					 bool analyze_in_stages, int stage,
+					 SimpleStringList *tables,
+					 const char *host, const char *port,
+					 const char *username, enum trivalue prompt_password,
+					 int concurrentCons,
+					 const char *progname, bool echo, bool quiet);
+static void vacuum_all_databases(vacuumingOptions *vacopts,
+					 bool analyze_in_stages,
 					 const char *maintenance_db,
 					 const char *host, const char *port,
 					 const char *username, enum trivalue prompt_password,
+					 int concurrentCons,
 					 const char *progname, bool echo, bool quiet);
+static void vacuum_database_stage(const char *dbname, vacuumingOptions *vacopts,
+					  bool analyze_in_stages, int stage,
+					  SimpleStringList *tables,
+					  const char *host, const char *port, const char *username,
+					  enum trivalue prompt_password,
+					  int concurrentCons,
+					  const char *progname, bool echo, bool quiet);
 
 static void help(const char *progname);
 
+static void prepare_command(PQExpBuffer sql, PGconn *conn,
+				vacuumingOptions *vacopts, const char *table);
+static void run_parallel_vacuum(bool echo,
+					SimpleStringList *tables, vacuumingOptions *vacopts,
+					int concurrentCons, int analyze_stage,
+					ParallelSlot slots[],
+					const char *dbname, const char *progname);
+static ParallelSlot *GetIdleSlot(ParallelSlot slots[], int numslots,
+			const char *dbname, const char *progname);
+
+static bool GetQueryResult(PGconn *conn, const char *dbname,
+			   const char *progname);
+
+static int select_loop(int maxFd, fd_set *workerset, bool *aborting);
+
+static void DisconnectDatabase(ParallelSlot *slot);
+
+
+/*
+ * Preparatory commands and corresponding user-visible message for the
+ * analyze-in-stages feature.  Note the ANALYZE command itself must be sent
+ * separately.
+ */
+static const struct
+{
+	const char *prepcmd;
+	const char *message;
+}
+staged_analyze[3] =
+{
+	{
+		"SET default_statistics_target=1; SET vacuum_cost_delay=0;",
+		gettext_noop("Generating minimal optimizer statistics (1 target)")
+	},
+	{
+		"SET default_statistics_target=10; RESET vacuum_cost_delay;",
+		gettext_noop("Generating medium optimizer statistics (10 targets)")
+	},
+	{
+		"RESET default_statistics_target;",
+		gettext_noop("Generating default (full) optimizer statistics")
+	}
+};
+
+#define ANALYZE_ALL_STAGES	-1
+
 
 int
 main(int argc, char *argv[])
@@ -49,6 +133,7 @@ main(int argc, char *argv[])
 		{"table", required_argument, NULL, 't'},
 		{"full", no_argument, NULL, 'f'},
 		{"verbose", no_argument, NULL, 'v'},
+		{"jobs", required_argument, NULL, 'j'},
 		{"maintenance-db", required_argument, NULL, 2},
 		{"analyze-in-stages", no_argument, NULL, 3},
 		{NULL, 0, NULL, 0}
@@ -57,7 +142,6 @@ main(int argc, char *argv[])
 	const char *progname;
 	int			optindex;
 	int			c;
-
 	const char *dbname = NULL;
 	const char *maintenance_db = NULL;
 	char	   *host = NULL;
@@ -66,21 +150,23 @@ main(int argc, char *argv[])
 	enum trivalue prompt_password = TRI_DEFAULT;
 	bool		echo = false;
 	bool		quiet = false;
-	bool		and_analyze = false;
-	bool		analyze_only = false;
+	vacuumingOptions vacopts;
 	bool		analyze_in_stages = false;
-	bool		freeze = false;
 	bool		alldb = false;
-	bool		full = false;
-	bool		verbose = false;
 	SimpleStringList tables = {NULL, NULL};
+	int concurrentCons = 0;
+	int tbl_count = 0;
+
+	/* initialize options to all false */
+	memset(&vacopts, 0, sizeof(vacopts));
 
 	progname = get_progname(argv[0]);
+
 	set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pgscripts"));
 
 	handle_help_version_opts(argc, argv, "vacuumdb", help);
 
-	while ((c = getopt_long(argc, argv, "h:p:U:wWeqd:zZFat:fv", long_options, &optindex)) != -1)
+	while ((c = getopt_long(argc, argv, "h:p:U:wWeqd:zZFat:fvj:", long_options, &optindex)) != -1)
 	{
 		switch (c)
 		{
@@ -109,31 +195,43 @@ main(int argc, char *argv[])
 				dbname = pg_strdup(optarg);
 				break;
 			case 'z':
-				and_analyze = true;
+				vacopts.and_analyze = true;
 				break;
 			case 'Z':
-				analyze_only = true;
+				vacopts.analyze_only = true;
 				break;
 			case 'F':
-				freeze = true;
+				vacopts.freeze = true;
 				break;
 			case 'a':
 				alldb = true;
 				break;
 			case 't':
+			{
 				simple_string_list_append(&tables, optarg);
+				tbl_count++;
 				break;
+			}
 			case 'f':
-				full = true;
+				vacopts.full = true;
 				break;
 			case 'v':
-				verbose = true;
+				vacopts.verbose = true;
+				break;
+			case 'j':
+				concurrentCons = atoi(optarg);
+				if (concurrentCons <= 0)
+				{
+					fprintf(stderr, _("%s: number of parallel \"jobs\" must be at least 1\n"),
+							progname);
+					exit(1);
+				}
 				break;
 			case 2:
 				maintenance_db = pg_strdup(optarg);
 				break;
 			case 3:
-				analyze_in_stages = analyze_only = true;
+				analyze_in_stages = vacopts.analyze_only = true;
 				break;
 			default:
 				fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
@@ -141,7 +239,6 @@ main(int argc, char *argv[])
 		}
 	}
 
-
 	/*
 	 * Non-option argument specifies database name as long as it wasn't
 	 * already specified with -d / --dbname
@@ -160,18 +257,18 @@ main(int argc, char *argv[])
 		exit(1);
 	}
 
-	if (analyze_only)
+	if (vacopts.analyze_only)
 	{
-		if (full)
+		if (vacopts.full)
 		{
-			fprintf(stderr, _("%s: cannot use the \"full\" option when performing only analyze\n"),
-					progname);
+			fprintf(stderr, _("%s: cannot use the \"%s\" option when performing only analyze\n"),
+					progname, "full");
 			exit(1);
 		}
-		if (freeze)
+		if (vacopts.freeze)
 		{
-			fprintf(stderr, _("%s: cannot use the \"freeze\" option when performing only analyze\n"),
-					progname);
+			fprintf(stderr, _("%s: cannot use the \"%s\" option when performing only analyze\n"),
+					progname, "freeze");
 			exit(1);
 		}
 		/* allow 'and_analyze' with 'analyze_only' */
@@ -179,6 +276,10 @@ main(int argc, char *argv[])
 
 	setup_cancel_handler();
 
+	/* Avoid opening extra connections. */
+	if (tbl_count && (concurrentCons > tbl_count))
+		concurrentCons = tbl_count;
+
 	if (alldb)
 	{
 		if (dbname)
@@ -194,9 +295,12 @@ main(int argc, char *argv[])
 			exit(1);
 		}
 
-		vacuum_all_databases(full, verbose, and_analyze, analyze_only, analyze_in_stages, freeze,
-							 maintenance_db, host, port, username,
-							 prompt_password, progname, echo, quiet);
+		vacuum_all_databases(&vacopts,
+							 analyze_in_stages,
+							 maintenance_db,
+							 host, port, username, prompt_password,
+							 concurrentCons,
+							 progname, echo, quiet);
 	}
 	else
 	{
@@ -210,35 +314,35 @@ main(int argc, char *argv[])
 				dbname = get_user_name_or_exit(progname);
 		}
 
-		if (tables.head != NULL)
-		{
-			SimpleStringListCell *cell;
-
-			for (cell = tables.head; cell; cell = cell->next)
-			{
-				vacuum_one_database(dbname, full, verbose, and_analyze,
-									analyze_only, analyze_in_stages, -1,
-									freeze, cell->val,
-									host, port, username, prompt_password,
-									progname, echo, quiet);
-			}
-		}
-		else
-			vacuum_one_database(dbname, full, verbose, and_analyze,
-								analyze_only, analyze_in_stages, -1,
-								freeze, NULL,
-								host, port, username, prompt_password,
-								progname, echo, quiet);
+		vacuum_database_stage(dbname, &vacopts,
+							  analyze_in_stages, ANALYZE_ALL_STAGES,
+							  &tables,
+							  host, port, username, prompt_password,
+							  concurrentCons,
+							  progname, echo, quiet);
 	}
 
 	exit(0);
 }
 
-
+/*
+ * Execute a vacuum/analyze command to the server.
+ *
+ * Result status is checked only if 'async' is false.
+ */
 static void
-run_vacuum_command(PGconn *conn, const char *sql, bool echo, const char *dbname, const char *table, const char *progname)
+run_vacuum_command(PGconn *conn, const char *sql, bool echo,
+				   const char *dbname, const char *table,
+				   const char *progname, bool async)
 {
-	if (!executeMaintenanceCommand(conn, sql, echo))
+	if (async)
+	{
+		if (echo)
+			printf("%s\n", sql);
+
+		PQsendQuery(conn, sql);
+	}
+	else if (!executeMaintenanceCommand(conn, sql, echo))
 	{
 		if (table)
 			fprintf(stderr, _("%s: vacuuming of table \"%s\" in database \"%s\" failed: %s"),
@@ -251,172 +355,648 @@ run_vacuum_command(PGconn *conn, const char *sql, bool echo, const char *dbname,
 	}
 }
 
-
+/*
+ * vacuum_one_database
+ *
+ * Process tables in the given database.  If the 'tables' list is empty,
+ * process all tables in the database.  Note there is no paralellization here.
+ */
 static void
-vacuum_one_database(const char *dbname, bool full, bool verbose, bool and_analyze,
-	bool analyze_only, bool analyze_in_stages, int stage, bool freeze, const char *table,
+vacuum_one_database(const char *dbname, vacuumingOptions *vacopts,
+					bool analyze_in_stages, int stage,
+					SimpleStringList *tables,
 					const char *host, const char *port,
 					const char *username, enum trivalue prompt_password,
 					const char *progname, bool echo, bool quiet)
 {
 	PQExpBufferData sql;
-
 	PGconn	   *conn;
-
-	initPQExpBuffer(&sql);
+	SimpleStringListCell *cell;
 
 	conn = connectDatabase(dbname, host, port, username, prompt_password,
 						   progname, false);
 
-	if (analyze_only)
+	initPQExpBuffer(&sql);
+
+	cell = tables ? tables->head : NULL;
+	do
 	{
-		appendPQExpBufferStr(&sql, "ANALYZE");
-		if (verbose)
-			appendPQExpBufferStr(&sql, " VERBOSE");
-	}
-	else
-	{
-		appendPQExpBufferStr(&sql, "VACUUM");
-		if (PQserverVersion(conn) >= 90000)
+		const char *tabname;
+
+		tabname = cell ? cell->val : NULL;
+		prepare_command(&sql, conn, vacopts, tabname);
+
+		if (analyze_in_stages)
 		{
-			const char *paren = " (";
-			const char *comma = ", ";
-			const char *sep = paren;
+			if (stage == ANALYZE_ALL_STAGES)
+			{
+				int		i;
 
-			if (full)
-			{
-				appendPQExpBuffer(&sql, "%sFULL", sep);
-				sep = comma;
-			}
-			if (freeze)
-			{
-				appendPQExpBuffer(&sql, "%sFREEZE", sep);
-				sep = comma;
-			}
-			if (verbose)
-			{
-				appendPQExpBuffer(&sql, "%sVERBOSE", sep);
-				sep = comma;
+				/* Run all stages. */
+				for (i = 0; i < 3; i++)
+				{
+					if (!quiet)
+					{
+						puts(gettext(staged_analyze[i].message));
+						fflush(stdout);
+					}
+					executeCommand(conn, staged_analyze[i].prepcmd, progname, echo);
+					run_vacuum_command(conn, sql.data, echo, dbname, tabname, progname, false);
+				}
 			}
-			if (and_analyze)
+			else
 			{
-				appendPQExpBuffer(&sql, "%sANALYZE", sep);
-				sep = comma;
+				/*
+				 * Otherwise, we got a stage from vacuum_all_databases(), so run
+				 * only that one.
+				 */
+				if (!quiet)
+				{
+					puts(gettext(staged_analyze[stage].message));
+					fflush(stdout);
+				}
+				executeCommand(conn, staged_analyze[stage].prepcmd, progname, echo);
+				run_vacuum_command(conn, sql.data, echo, dbname, tabname, progname, false);
 			}
-			if (sep != paren)
-				appendPQExpBufferStr(&sql, ")");
 		}
 		else
+			run_vacuum_command(conn, sql.data, echo, dbname, NULL, progname, false);
+
+		cell = cell->next;
+	} while (cell != NULL);
+
+	PQfinish(conn);
+	termPQExpBuffer(&sql);
+}
+
+static void
+init_slot(ParallelSlot *slot, PGconn *conn)
+{
+	slot->connection = conn;
+	slot->isFree = true;
+	slot->sock = PQsocket(conn);
+}
+
+/*
+ * vacuum_one_database_parallel
+ *
+ * Like vacuum_one_database, but drive multiple connections in parallel.
+ * Another significant difference is that if the table list is empty, rather
+ * than running unadorned VACUUM commands (which would vacuum all the tables in
+ * the database, as vacuum_one_database does) we need to query the catalogs to
+ * obtain the list of tables first.
+ */
+static void
+vacuum_one_database_parallel(const char *dbname, vacuumingOptions *vacopts,
+							 bool analyze_in_stages, int stage,
+							 SimpleStringList *tables,
+							 const char *host, const char *port,
+							 const char *username, enum trivalue prompt_password,
+							 int concurrentCons,
+							 const char *progname, bool echo, bool quiet)
+{
+	PGconn	   *conn;
+	ParallelSlot *slots;
+	SimpleStringList dbtables = {NULL, NULL};
+	int			i;
+
+	conn = connectDatabase(dbname, host, port, username,
+						   prompt_password, progname, false);
+
+	/*
+	 * If a table list is not provided then we need to vacuum the whole
+	 * database; prepare the list of tables.
+	 */
+	if (!tables || !tables->head)
+	{
+		PQExpBufferData buf;
+		PGresult *res;
+		int		ntups;
+		int		i;
+
+		initPQExpBuffer(&buf);
+
+		res = executeQuery(conn,
+				"SELECT c.relname, ns.nspname FROM pg_class c, pg_namespace ns\n"
+				" WHERE relkind IN (\'r\', \'m\') AND c.relnamespace = ns.oid\n"
+				" ORDER BY c.relpages DESC",
+				progname, echo);
+
+		ntups = PQntuples(res);
+		for (i = 0; i < ntups; i++)
 		{
-			if (full)
-				appendPQExpBufferStr(&sql, " FULL");
-			if (freeze)
-				appendPQExpBufferStr(&sql, " FREEZE");
-			if (verbose)
-				appendPQExpBufferStr(&sql, " VERBOSE");
-			if (and_analyze)
-				appendPQExpBufferStr(&sql, " ANALYZE");
+			appendPQExpBuffer(&buf, "%s",
+							  fmtQualifiedId(PQserverVersion(conn),
+											 PQgetvalue(res, i, 1),
+											 PQgetvalue(res, i, 0)));
+
+			simple_string_list_append(&dbtables, buf.data);
+			resetPQExpBuffer(&buf);
 		}
+
+		termPQExpBuffer(&buf);
+		tables = &dbtables;
+
+		/*
+		 * If there are more connections than vacuumable relations, we don't
+		 * need to use them all.
+		 */
+		if (concurrentCons > ntups)
+			concurrentCons = ntups;
+	}
+
+	slots = (ParallelSlot *) pg_malloc(sizeof(ParallelSlot) * concurrentCons);
+	init_slot(slots, conn);
+
+	for (i = 1; i < concurrentCons; i++)
+	{
+		conn = connectDatabase(dbname, host, port, username, prompt_password,
+							   progname, false);
+		init_slot(slots + i, conn);
 	}
-	if (table)
-		appendPQExpBuffer(&sql, " %s", table);
-	appendPQExpBufferStr(&sql, ";");
 
 	if (analyze_in_stages)
 	{
-		const char *stage_commands[] = {
-			"SET default_statistics_target=1; SET vacuum_cost_delay=0;",
-			"SET default_statistics_target=10; RESET vacuum_cost_delay;",
-			"RESET default_statistics_target;"
-		};
-		const char *stage_messages[] = {
-			gettext_noop("Generating minimal optimizer statistics (1 target)"),
-			gettext_noop("Generating medium optimizer statistics (10 targets)"),
-			gettext_noop("Generating default (full) optimizer statistics")
-		};
-
-		if (stage == -1)
+		if (stage == ANALYZE_ALL_STAGES)
 		{
-			int		i;
+			int			i;
 
-			/* Run all stages. */
 			for (i = 0; i < 3; i++)
 			{
 				if (!quiet)
 				{
-					puts(gettext(stage_messages[i]));
+					puts(gettext(staged_analyze[i].message));
 					fflush(stdout);
 				}
-				executeCommand(conn, stage_commands[i], progname, echo);
-				run_vacuum_command(conn, sql.data, echo, dbname, table, progname);
+
+				run_parallel_vacuum(echo, tables, vacopts,
+									concurrentCons, i, slots,
+									dbname, progname);
 			}
 		}
 		else
 		{
-			/* Otherwise, we got a stage from vacuum_all_databases(), so run
-			 * only that one. */
+			/*
+			 * Otherwise, we got a stage from vacuum_all_databases(), so run
+			 * only that one.
+			 */
 			if (!quiet)
 			{
-				puts(gettext(stage_messages[stage]));
+				puts(gettext(staged_analyze[stage].message));
 				fflush(stdout);
 			}
-			executeCommand(conn, stage_commands[stage], progname, echo);
-			run_vacuum_command(conn, sql.data, echo, dbname, table, progname);
-		}
 
+			run_parallel_vacuum(echo, tables, vacopts,
+								concurrentCons, stage, slots,
+								dbname, progname);
+		}
 	}
 	else
-		run_vacuum_command(conn, sql.data, echo, dbname, NULL, progname);
+		run_parallel_vacuum(echo, tables, vacopts,
+							concurrentCons, ANALYZE_ALL_STAGES, slots,
+							dbname, progname);
 
-	PQfinish(conn);
-	termPQExpBuffer(&sql);
+	for (i = 0; i < concurrentCons; i++)
+		DisconnectDatabase(&slots[i]);
+
+	pfree(slots);
 }
 
-
 static void
-vacuum_all_databases(bool full, bool verbose, bool and_analyze, bool analyze_only,
-			 bool analyze_in_stages, bool freeze, const char *maintenance_db,
-					 const char *host, const char *port,
-					 const char *username, enum trivalue prompt_password,
+vacuum_all_databases(vacuumingOptions *vacopts,
+					 bool analyze_in_stages,
+					 const char *maintenance_db, const char *host,
+					 const char *port, const char *username,
+					 enum trivalue prompt_password,
+					 int concurrentCons,
 					 const char *progname, bool echo, bool quiet)
 {
 	PGconn	   *conn;
 	PGresult   *result;
 	int			stage;
+	int			i;
 
 	conn = connectMaintenanceDatabase(maintenance_db, host, port,
 									  username, prompt_password, progname);
-	result = executeQuery(conn, "SELECT datname FROM pg_database WHERE datallowconn ORDER BY 1;", progname, echo);
+	result = executeQuery(conn,
+						  "SELECT datname FROM pg_database WHERE datallowconn ORDER BY 1;",
+						  progname, echo);
 	PQfinish(conn);
 
-	/* If analyzing in stages, then run through all stages.  Otherwise just
-	 * run once, passing -1 as the stage. */
-	for (stage = (analyze_in_stages ? 0 : -1);
-		 stage < (analyze_in_stages ? 3 : 0);
-		 stage++)
+	if (analyze_in_stages)
 	{
-		int			i;
+		for (stage = 0; stage < 3; stage++)
+		{
+			for (i = 0; i < PQntuples(result); i++)
+			{
+				const char *dbname;
 
+				dbname = PQgetvalue(result, i, 0);
+				vacuum_database_stage(dbname, vacopts,
+									  analyze_in_stages, stage,
+									  NULL,
+									  host, port, username, prompt_password,
+									  concurrentCons,
+									  progname, echo, quiet);
+			}
+		}
+	}
+	else
+	{
 		for (i = 0; i < PQntuples(result); i++)
 		{
-			char	   *dbname = PQgetvalue(result, i, 0);
+			const char *dbname;
 
-			if (!quiet)
-			{
-				printf(_("%s: vacuuming database \"%s\"\n"), progname, dbname);
-				fflush(stdout);
-			}
-
-			vacuum_one_database(dbname, full, verbose, and_analyze, analyze_only,
-								analyze_in_stages, stage,
-							freeze, NULL, host, port, username, prompt_password,
-								progname, echo, quiet);
+			dbname = PQgetvalue(result, i, 0);
+			vacuum_database_stage(dbname, vacopts,
+								  analyze_in_stages, ANALYZE_ALL_STAGES,
+								  NULL,
+								  host, port, username, prompt_password,
+								  concurrentCons,
+								  progname, echo, quiet);
 		}
 	}
 
 	PQclear(result);
 }
 
+static void
+vacuum_database_stage(const char *dbname, vacuumingOptions *vacopts,
+					  bool analyze_in_stages, int stage,
+					  SimpleStringList *tables,
+					  const char *host, const char *port, const char *username,
+					  enum trivalue prompt_password,
+					  int concurrentCons,
+					  const char *progname, bool echo, bool quiet)
+{
+	if (!quiet)
+	{
+		printf(_("%s: vacuuming database \"%s\"\n"), progname, dbname);
+		fflush(stdout);
+	}
+
+	if (concurrentCons > 1)
+		vacuum_one_database_parallel(dbname, vacopts,
+									 analyze_in_stages, stage,
+									 tables,
+									 host, port, username, prompt_password,
+									 concurrentCons,
+									 progname, echo, quiet);
+	else
+		vacuum_one_database(dbname, vacopts,
+							analyze_in_stages, stage,
+							tables,
+							host, port, username, prompt_password,
+							progname, echo, quiet);
+}
+
+/*
+ * run_parallel_vacuum
+ *
+ * This function does the actual work for sending the jobs concurrently to
+ * server.
+ */
+static void
+run_parallel_vacuum(bool echo, SimpleStringList *tables,
+					vacuumingOptions *vacopts, int concurrentCons,
+					int analyze_stage, ParallelSlot slots[],
+					const char *dbname, const char *progname)
+{
+	PQExpBufferData sql;
+	SimpleStringListCell *cell;
+	int			i;
+
+	initPQExpBuffer(&sql);
+
+	if (analyze_stage >= 0)
+	{
+		for (i = 0; i < concurrentCons; i++)
+		{
+			executeCommand((slots + i)->connection,
+						   staged_analyze[analyze_stage].prepcmd,
+						   progname, echo);
+		}
+	}
+
+	for (cell = tables->head; cell; cell = cell->next)
+	{
+		ParallelSlot *free_slot;
+
+		if (CancelRequested)
+			goto fail;
+
+		/*
+		 * Get a free slot, waiting until one becomes free if none currently
+		 * is.
+		 */
+		free_slot = GetIdleSlot(slots, concurrentCons, dbname, progname);
+		if (!free_slot)
+			goto fail;
+
+		free_slot->isFree = false;
+
+		prepare_command(&sql, free_slot->connection, vacopts, cell->val);
+		run_vacuum_command(free_slot->connection, sql.data,
+						   echo, dbname, cell->val, progname, true);
+	}
+
+	for (i = 0; i < concurrentCons; i++)
+	{
+		/* wait for all connection to return the results */
+		if (!GetQueryResult((slots + i)->connection, dbname, progname))
+			goto fail;
+
+		(slots + i)->isFree = true;	/* XXX what's the point? */
+	}
+
+	if (false)
+	{
+fail:
+		for (i = 0; i < concurrentCons; i++)
+			DisconnectDatabase(slots + i);
+		exit(1);
+	}
+
+	termPQExpBuffer(&sql);
+}
+
+/*
+ * GetIdleSlot
+ *		Return a connection slot that is ready to execute a command.
+ *
+ * We return the first slot we find that is marked isFree, if one is;
+ * otherwise, we loop on select() until one socket becomes available.  When
+ * this happens, we read the whole set and mark as free all sockets that become
+ * available.
+ *
+ * Process the slot list, if any free slot is available then return the slotid
+ * else perform the select on all the socket's and wait until at least one slot
+ * becomes available.
+ *
+ * If an error occurs, NULL is returned.
+ */
+static ParallelSlot *
+GetIdleSlot(ParallelSlot slots[], int numslots, const char *dbname,
+			const char *progname)
+{
+	int		i;
+	int		firstFree = -1;
+	fd_set	slotset;
+	pgsocket maxFd;
+
+	for (i = 0; i < numslots; i++)
+		if ((slots + i)->isFree)
+			return slots + i;
+
+	FD_ZERO(&slotset);
+
+	maxFd = slots->sock;
+	for (i = 0; i < numslots; i++)
+	{
+		FD_SET((slots + i)->sock, &slotset);
+		if ((slots + i)->sock > maxFd)
+			maxFd = (slots + i)->sock;
+	}
+
+	/*
+	 * No free slot found, so wait until one of the connections has finished
+	 * its task and return the available slot.
+	 */
+	for (firstFree = -1; firstFree < 0; )
+	{
+		bool	aborting;
+
+		SetCancelConn(slots->connection);
+		i = select_loop(maxFd, &slotset, &aborting);
+		ResetCancelConn();
+
+		if (aborting)
+		{
+			/*
+			 * We set the cancel-receiving connection to the one in the zeroth
+			 * slot above, so fetch the error from there.
+			 */
+			GetQueryResult(slots->connection, dbname, progname);
+			return NULL;
+		}
+		Assert(i != 0);
+
+		for (i = 0; i < numslots; i++)
+		{
+			if (!FD_ISSET((slots + i)->sock, &slotset))
+				continue;
+
+			PQconsumeInput((slots + i)->connection);
+			if (PQisBusy((slots + i)->connection))
+				continue;
+
+			(slots + i)->isFree = true;
+
+			if (!GetQueryResult((slots + i)->connection, dbname, progname))
+				return NULL;
+
+			if (firstFree < 0)
+				firstFree = i;
+		}
+	}
+
+	return slots + firstFree;
+}
+
+/*
+ * GetQueryResult
+ *
+ * Process the query result.  Returns true if there's no error, false
+ * otherwise -- but errors about trying to vacuum a missing relation are
+ * reported and subsequently ignored.
+ */
+static bool
+GetQueryResult(PGconn *conn, const char *dbname, const char *progname)
+{
+	PGresult    *result;
+
+	SetCancelConn(conn);
+	while ((result = PQgetResult(conn)) != NULL)
+	{
+		/*
+		 * If errors are found, report them.  Errors about a missing table are
+		 * harmless so we continue processing; but die for other errors.
+		 */
+		if (PQresultStatus(result) != PGRES_COMMAND_OK)
+		{
+			char *sqlState = PQresultErrorField(result, PG_DIAG_SQLSTATE);
+
+			fprintf(stderr, _("%s: vacuuming of database \"%s\" failed: %s"),
+					progname, dbname, PQerrorMessage(conn));
+
+			if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
+			{
+				PQclear(result);
+				return false;
+			}
+		}
+
+		PQclear(result);
+	}
+	ResetCancelConn();
+
+	return true;
+}
+
+/*
+ * Loop on select() until a descriptor from the given set becomes readable.
+ *
+ * If we get a cancel request while we're waiting, we forego all further
+ * processing and set the *aborting flag to true.  The return value must be
+ * ignored in this case.  Otherwise, *aborting is set to false.
+ */
+static int
+select_loop(int maxFd, fd_set *workerset, bool *aborting)
+{
+	int			i;
+	fd_set		saveSet = *workerset;
+
+	if (CancelRequested)
+	{
+		*aborting = true;
+		return -1;
+	}
+	else
+		*aborting = false;
+
+	for (;;)
+	{
+		/*
+		 * On Windows, we need to check once in a while for cancel requests; on
+		 * other platforms we rely on select() returning when interrupted.
+		 */
+		struct timeval *tvp;
+#ifdef WIN32
+		struct timeval tv;
+
+		tv = {0, 1000000};
+		tvp = &tv;
+#else
+		tvp = NULL;
+#endif
+
+		*workerset = saveSet;
+		i = select(maxFd + 1, workerset, NULL, NULL, tvp);
+
+#ifdef WIN32
+		if (i == SOCKET_ERROR)
+		{
+			i = -1;
+			TranslateSocketError();
+		}
+#endif
+
+		if (i < 0 && errno == EINTR)
+			continue;			/* ignore this */
+		if (i < 0 || CancelRequested)
+			*aborting = true;	/* but not this */
+		if (i == 0)
+			continue;			/* timeout (Win32 only) */
+		break;
+	}
+
+	return i;
+}
+
+/*
+ * DisconnectDatabase
+ *		Disconnect the connection associated with the given slot
+ */
+static void
+DisconnectDatabase(ParallelSlot *slot)
+{
+	char		errbuf[256];
+
+	if (!slot->connection)
+		return;
+
+	if (PQtransactionStatus(slot->connection) == PQTRANS_ACTIVE)
+	{
+		PGcancel   *cancel;
+
+		if ((cancel = PQgetCancel(slot->connection)))
+		{
+			PQcancel(cancel, errbuf, sizeof(errbuf));
+			PQfreeCancel(cancel);
+		}
+	}
+
+	PQfinish(slot->connection);
+	slot->connection = NULL;
+}
+
+/*
+ * Construct a vacuum/analyze command to run based on the given options, in the
+ * given string buffer, which may contain previous garbage.
+ *
+ * An optional table name can be passed; this must be already be properly
+ * quoted.  The command is semicolon-terminated.
+ */
+static void
+prepare_command(PQExpBuffer sql, PGconn *conn, vacuumingOptions *vacopts,
+				const char *table)
+{
+	resetPQExpBuffer(sql);
+
+	if (vacopts->analyze_only)
+	{
+		appendPQExpBufferStr(sql, "ANALYZE");
+		if (vacopts->verbose)
+			appendPQExpBufferStr(sql, " VERBOSE");
+	}
+	else
+	{
+		appendPQExpBufferStr(sql, "VACUUM");
+		if (PQserverVersion(conn) >= 90000)
+		{
+			const char *paren = " (";
+			const char *comma = ", ";
+			const char *sep = paren;
+
+			if (vacopts->full)
+			{
+				appendPQExpBuffer(sql, "%sFULL", sep);
+				sep = comma;
+			}
+			if (vacopts->freeze)
+			{
+				appendPQExpBuffer(sql, "%sFREEZE", sep);
+				sep = comma;
+			}
+			if (vacopts->verbose)
+			{
+				appendPQExpBuffer(sql, "%sVERBOSE", sep);
+				sep = comma;
+			}
+			if (vacopts->and_analyze)
+			{
+				appendPQExpBuffer(sql, "%sANALYZE", sep);
+				sep = comma;
+			}
+			if (sep != paren)
+				appendPQExpBufferStr(sql, ")");
+		}
+		else
+		{
+			if (vacopts->full)
+				appendPQExpBufferStr(sql, " FULL");
+			if (vacopts->freeze)
+				appendPQExpBufferStr(sql, " FREEZE");
+			if (vacopts->verbose)
+				appendPQExpBufferStr(sql, " VERBOSE");
+			if (vacopts->and_analyze)
+				appendPQExpBufferStr(sql, " ANALYZE");
+		}
+	}
+
+	if (table)
+		appendPQExpBuffer(sql, " %s;", table);
+}
 
 static void
 help(const char *progname)
@@ -436,6 +1016,7 @@ help(const char *progname)
 	printf(_("  -V, --version                   output version information, then exit\n"));
 	printf(_("  -z, --analyze                   update optimizer statistics\n"));
 	printf(_("  -Z, --analyze-only              only update optimizer statistics\n"));
+	printf(_("  -j, --jobs=NUM                  use this many concurrent connections to vacuum\n"));
 	printf(_("      --analyze-in-stages         only update optimizer statistics, in multiple\n"
 		   "                                  stages for faster results\n"));
 	printf(_("  -?, --help                      show this help, then exit\n"));
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to