From 894d0752b3d3248c27d37e6f02428f30b08d4eea Mon Sep 17 00:00:00 2001
From: Julien Rouhaud <julien.rouhaud@free.fr>
Date: Fri, 28 Jun 2019 13:21:58 +0200
Subject: [PATCH 3/4] Add parallel processing to reindexdb

---
 src/bin/scripts/Makefile           |   2 +-
 src/bin/scripts/parallel.c         |  52 ++-
 src/bin/scripts/parallel.h         |   5 +-
 src/bin/scripts/reindexdb.c        | 546 ++++++++++++++++++++++++++---
 src/bin/scripts/t/090_reindexdb.pl |  12 +-
 src/bin/scripts/vacuumdb.c         |   2 +-
 6 files changed, 558 insertions(+), 61 deletions(-)

diff --git a/src/bin/scripts/Makefile b/src/bin/scripts/Makefile
index 6979d8f9ff..bf2e82c594 100644
--- a/src/bin/scripts/Makefile
+++ b/src/bin/scripts/Makefile
@@ -29,7 +29,7 @@ dropdb: dropdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-
 dropuser: dropuser.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
 clusterdb: clusterdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
 vacuumdb: vacuumdb.o common.o parallel.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
-reindexdb: reindexdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
+reindexdb: reindexdb.o common.o parallel.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
 pg_isready: pg_isready.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
 
 install: all installdirs
diff --git a/src/bin/scripts/parallel.c b/src/bin/scripts/parallel.c
index 5e4505e9fe..bab49becaf 100644
--- a/src/bin/scripts/parallel.c
+++ b/src/bin/scripts/parallel.c
@@ -29,16 +29,21 @@
  * GetIdleSlot
  *		Return a connection slot that is ready to execute a command.
  *
- * We return the first slot we find that is marked isFree, if one is;
- * otherwise, we loop on select() until one socket becomes available.  When
- * this happens, we read the whole set and mark as free all sockets that become
- * available.
+ * If pending_only is false, we return the first slot we find that is marked
+ * isFree, if one is; otherwise, we loop on select() until one socket becomes
+ * available.  When this happens, we read the whole set and mark as free all
+ * sockets that become available.
+ * If pending_only is true, we filter out the slots that don't have any pending
+ * work to do, so we only return slots where slot is not NULL.  This is useful
+ * if the caller pushed a list of items to process (provided as the root
+ * SimpleStringListCell of a SimpleStringList), to make sure that all pushed
+ * work is completed before exiting the program.
  *
  * If an error occurs, NULL is returned.
  */
 ParallelSlot *
 GetIdleSlot(ParallelSlot slots[], int numslots,
-			const char *progname)
+			const char *progname, bool pending_only)
 {
 	int			i;
 	int			firstFree = -1;
@@ -47,7 +52,23 @@ GetIdleSlot(ParallelSlot slots[], int numslots,
 	for (i = 0; i < numslots; i++)
 	{
 		if (slots[i].isFree)
-			return slots + i;
+		{
+			/*
+			 * If the caller didn't ask to filter the slots with only the one
+			 * having pending items to process, we can return the first free
+			 * slot we find.
+			 */
+			if (!pending_only)
+				return slots + i;
+
+			/*
+			 * If the caller asked to filter slots having pending items to
+			 * process, check the current item and return the slot if it's not
+			 * empty
+			 */
+			if (slots[i].cell)
+				return slots + i;
+		}
 	}
 
 	/*
@@ -121,7 +142,23 @@ GetIdleSlot(ParallelSlot slots[], int numslots,
 					/* This connection has become idle */
 					slots[i].isFree = true;
 					if (firstFree < 0)
-						firstFree = i;
+					{
+						/*
+						 * If the caller didn't ask to filter the slots with
+						 * only the one having pending items to process, we can
+						 * mark as usable the first free slot we find as free.
+						 */
+						if (!pending_only)
+							firstFree = i;
+						/*
+						 * If the caller asked to filter slots having pending
+						 * items to process, we need to check the current item
+						 * and mark this slot as usable only if there's a
+						 * pending item
+						 */
+						else if (slots[i].cell)
+							firstFree = i;
+					}
 					break;
 				}
 			}
@@ -277,6 +314,7 @@ void
 init_slot(ParallelSlot *slot, PGconn *conn)
 {
 	slot->connection = conn;
+	slot->cell = NULL;
 	/* Initially assume connection is idle */
 	slot->isFree = true;
 }
diff --git a/src/bin/scripts/parallel.h b/src/bin/scripts/parallel.h
index be58e0bb96..f3ce696472 100644
--- a/src/bin/scripts/parallel.h
+++ b/src/bin/scripts/parallel.h
@@ -9,15 +9,18 @@
 #ifndef SCRIPTS_PARALLEL_H
 #define SCRIPTS_PARALLEL_H
 
+#include "fe_utils/simple_list.h"
+
 /* Parallel processing stuff */
 typedef struct ParallelSlot
 {
 	PGconn	   *connection;		/* One connection */
+	SimpleStringListCell *cell;	/* Next item to process if any */
 	bool		isFree;			/* Is it known to be idle? */
 } ParallelSlot;
 
 extern ParallelSlot *GetIdleSlot(ParallelSlot slots[], int numslots,
-								 const char *progname);
+								 const char *progname, bool pending_only);
 
 extern bool ProcessQueryResult(PGconn *conn, PGresult *result,
 							   const char *progname);
diff --git a/src/bin/scripts/reindexdb.c b/src/bin/scripts/reindexdb.c
index 3528de21e0..f8a78a1b4a 100644
--- a/src/bin/scripts/reindexdb.c
+++ b/src/bin/scripts/reindexdb.c
@@ -12,8 +12,10 @@
 #include "postgres_fe.h"
 #include "common.h"
 #include "common/logging.h"
+#include "fe_utils/connect.h"
 #include "fe_utils/simple_list.h"
 #include "fe_utils/string_utils.h"
+#include "parallel.h"
 
 typedef enum ReindexType
 {
@@ -25,16 +27,29 @@ typedef enum ReindexType
 } ReindexType;
 
 
-static void reindex_one_database(const char *name, const char *dbname,
-								 ReindexType type, const char *host,
+static ReindexType get_parallel_object_list(PGconn *conn, ReindexType type,
+											SimpleStringList *user_list,
+											SimplePtrList * process_list,
+											const char *progname, bool echo);
+static void reindex_one_database(const char *dbname, ReindexType type,
+								 SimpleStringList *user_list, const char *host,
 								 const char *port, const char *username,
 								 enum trivalue prompt_password, const char *progname,
-								 bool echo, bool verbose, bool concurrently);
+								 bool echo, bool verbose, bool concurrently,
+								 int concurrentCons);
 static void reindex_all_databases(const char *maintenance_db,
 								  const char *host, const char *port,
 								  const char *username, enum trivalue prompt_password,
 								  const char *progname, bool echo,
-								  bool quiet, bool verbose, bool concurrently);
+								  bool quiet, bool verbose, bool concurrently,
+								  int concurrentCons);
+static void run_reindex_command(PGconn *conn, ReindexType type,
+								const char *name, const char *progname, bool echo,
+								bool verbose, bool concurrently, bool async);
+static void slot_process_item(ParallelSlot *slot, int *pending_conn,
+							  ReindexType process_type, const char *progname, bool echo,
+							  bool verbose, bool concurrently, bool parallel);
+
 static void help(const char *progname);
 
 int
@@ -54,6 +69,7 @@ main(int argc, char *argv[])
 		{"system", no_argument, NULL, 's'},
 		{"table", required_argument, NULL, 't'},
 		{"index", required_argument, NULL, 'i'},
+		{"jobs", required_argument, NULL, 'j'},
 		{"verbose", no_argument, NULL, 'v'},
 		{"concurrently", no_argument, NULL, 1},
 		{"maintenance-db", required_argument, NULL, 2},
@@ -79,6 +95,9 @@ main(int argc, char *argv[])
 	SimpleStringList indexes = {NULL, NULL};
 	SimpleStringList tables = {NULL, NULL};
 	SimpleStringList schemas = {NULL, NULL};
+	int			concurrentCons = 1;
+	int			tbl_count = 0,
+				nsp_count = 0;
 
 	pg_logging_init(argv[0]);
 	progname = get_progname(argv[0]);
@@ -87,7 +106,7 @@ main(int argc, char *argv[])
 	handle_help_version_opts(argc, argv, "reindexdb", help);
 
 	/* process command-line options */
-	while ((c = getopt_long(argc, argv, "h:p:U:wWeqS:d:ast:i:v", long_options, &optindex)) != -1)
+	while ((c = getopt_long(argc, argv, "h:p:U:wWeqS:d:ast:i:j:v", long_options, &optindex)) != -1)
 	{
 		switch (c)
 		{
@@ -114,6 +133,7 @@ main(int argc, char *argv[])
 				break;
 			case 'S':
 				simple_string_list_append(&schemas, optarg);
+				nsp_count++;
 				break;
 			case 'd':
 				dbname = pg_strdup(optarg);
@@ -126,10 +146,25 @@ main(int argc, char *argv[])
 				break;
 			case 't':
 				simple_string_list_append(&tables, optarg);
+				tbl_count++;
 				break;
 			case 'i':
 				simple_string_list_append(&indexes, optarg);
 				break;
+			case 'j':
+				concurrentCons = atoi(optarg);
+				if (concurrentCons <= 0)
+				{
+					pg_log_error("number of parallel jobs must be at least 1");
+					exit(1);
+				}
+				if (concurrentCons > FD_SETSIZE - 1)
+				{
+					pg_log_error("too many parallel jobs requested (maximum: %d)",
+								 FD_SETSIZE - 1);
+					exit(1);
+				}
+				break;
 			case 'v':
 				verbose = true;
 				break;
@@ -194,7 +229,8 @@ main(int argc, char *argv[])
 		}
 
 		reindex_all_databases(maintenance_db, host, port, username,
-							  prompt_password, progname, echo, quiet, verbose, concurrently);
+							  prompt_password, progname, echo, quiet, verbose,
+							  concurrently, 1);
 	}
 	else if (syscatalog)
 	{
@@ -214,6 +250,12 @@ main(int argc, char *argv[])
 			exit(1);
 		}
 
+		if (concurrentCons > 1)
+		{
+			pg_log_error("cannot use multiple jobs to reindex system catalogs");
+			exit(1);
+		}
+
 		if (dbname == NULL)
 		{
 			if (getenv("PGDATABASE"))
@@ -224,9 +266,9 @@ main(int argc, char *argv[])
 				dbname = get_user_name_or_exit(progname);
 		}
 
-		reindex_one_database(NULL, dbname, REINDEX_SYSTEM, host,
+		reindex_one_database(dbname, REINDEX_SYSTEM, NULL, host,
 							 port, username, prompt_password, progname,
-							 echo, verbose, concurrently);
+							 echo, verbose, concurrently, 1);
 	}
 	else
 	{
@@ -241,61 +283,57 @@ main(int argc, char *argv[])
 		}
 
 		if (schemas.head != NULL)
-		{
-			SimpleStringListCell *cell;
-
-			for (cell = schemas.head; cell; cell = cell->next)
-			{
-				reindex_one_database(cell->val, dbname, REINDEX_SCHEMA, host,
-									 port, username, prompt_password, progname,
-									 echo, verbose, concurrently);
-			}
-		}
+			reindex_one_database(dbname, REINDEX_SCHEMA, &schemas, host,
+								 port, username, prompt_password, progname,
+								 echo, verbose, concurrently,
+								 Min(concurrentCons, nsp_count));
 
 		if (indexes.head != NULL)
-		{
-			SimpleStringListCell *cell;
 
-			for (cell = indexes.head; cell; cell = cell->next)
-			{
-				reindex_one_database(cell->val, dbname, REINDEX_INDEX, host,
-									 port, username, prompt_password, progname,
-									 echo, verbose, concurrently);
-			}
-		}
-		if (tables.head != NULL)
-		{
-			SimpleStringListCell *cell;
+			/*
+			 * The number of threads will be checked in the function, as it's
+			 * depending on the number of underlying tables
+			 */
+			reindex_one_database(dbname, REINDEX_INDEX, &indexes, host,
+								 port, username, prompt_password, progname,
+								 echo, verbose, concurrently,
+								 concurrentCons);
 
-			for (cell = tables.head; cell; cell = cell->next)
-			{
-				reindex_one_database(cell->val, dbname, REINDEX_TABLE, host,
-									 port, username, prompt_password, progname,
-									 echo, verbose, concurrently);
-			}
-		}
+		if (tables.head != NULL)
+			reindex_one_database(dbname, REINDEX_TABLE, &tables, host,
+								 port, username, prompt_password, progname,
+								 echo, verbose, concurrently,
+								 Min(concurrentCons, tbl_count));
 
 		/*
 		 * reindex database only if neither index nor table nor schema is
 		 * specified
 		 */
 		if (indexes.head == NULL && tables.head == NULL && schemas.head == NULL)
-			reindex_one_database(NULL, dbname, REINDEX_DATABASE, host,
+			reindex_one_database(dbname, REINDEX_DATABASE, NULL, host,
 								 port, username, prompt_password, progname,
-								 echo, verbose, concurrently);
+								 echo, verbose, concurrently, concurrentCons);
 	}
 
 	exit(0);
 }
 
 static void
-reindex_one_database(const char *name, const char *dbname, ReindexType type,
-					 const char *host, const char *port, const char *username,
+reindex_one_database(const char *dbname, ReindexType type,
+					 SimpleStringList *user_list, const char *host,
+					 const char *port, const char *username,
 					 enum trivalue prompt_password, const char *progname, bool echo,
-					 bool verbose, bool concurrently)
+					 bool verbose, bool concurrently, int concurrentCons)
 {
-	PQExpBufferData sql;
 	PGconn	   *conn;
+	bool		parallel = concurrentCons > 1;
+	ReindexType process_type = type;
+	SimplePtrList process_list = {0, NULL, NULL};
+	SimplePtrListCell *cell;
+	ParallelSlot *slots;
+	int			i,
+				pending_conn;
+	bool		failed = false;
 
 	conn = connectDatabase(dbname, host, port, username, prompt_password,
 						   progname, echo, false, false);
@@ -308,6 +346,173 @@ reindex_one_database(const char *name, const char *dbname, ReindexType type,
 		exit(1);
 	}
 
+	if (!parallel)
+	{
+		if (user_list)
+		{
+			/*
+			 * In non parallel mode, if the user provided a list, just use it
+			 * as-is
+			 */
+			simple_ptr_list_append(&process_list, user_list);
+		}
+		else
+		{
+			/*
+			 * Otherwise, create a dummy list with an empty string, as user
+			 * requires an element.
+			 */
+			SimpleStringList *dummy = palloc0(sizeof(SimpleStringList));
+
+			simple_string_list_append(dummy, "");
+			simple_ptr_list_append(&process_list, dummy);
+		}
+	}
+	else
+	{
+		/*
+		 * Database-wide parallel reindex requires special processing.  If
+		 * multiple jobs were asked, we have to reindex system catalogs first,
+		 * as they can't be processed in parallel.
+		 */
+		if (type == REINDEX_DATABASE)
+		{
+			run_reindex_command(conn, REINDEX_SYSTEM, NULL, progname, echo, verbose,
+								concurrently, false);
+		}
+
+		/* Get the list of objects to process */
+		process_type = get_parallel_object_list(conn, type, user_list,
+												&process_list, progname, echo);
+	}
+
+	/* Lower down the number of required connections if needed. */
+	concurrentCons = Min(concurrentCons, process_list.size);
+
+	/* If no object was found, we're done. */
+	if (concurrentCons <= 0)
+		return;
+	if (concurrentCons == 1)
+		parallel = false;
+
+	slots = (ParallelSlot *) pg_malloc(sizeof(ParallelSlot) * concurrentCons);
+	init_slot(slots, conn);
+	if (parallel)
+	{
+		for (i = 1; i < concurrentCons; i++)
+		{
+			conn = connectDatabase(dbname, host, port, username, prompt_password,
+								   progname, echo, false, true);
+			init_slot(slots + i, conn);
+		}
+	}
+
+	pending_conn = 0;
+	cell = process_list.head;
+	do
+	{
+		ParallelSlot *free_slot = NULL;
+		SimpleStringList *cur = (SimpleStringList *) cell->val;
+
+		if (CancelRequested)
+		{
+			failed = true;
+			goto finish;
+		}
+
+		/*
+		 * Get the connection slot to use.  If in parallel mode, here we wait
+		 * for one connection to become available if none already is. In
+		 * non-parallel mode we simply use the only slot we have, which we
+		 * know to be free.
+		 */
+		if (parallel)
+		{
+			/*
+			 * Get a free slot, waiting until one becomes free if none
+			 * currently is.
+			 */
+			free_slot = GetIdleSlot(slots, concurrentCons, progname, false);
+			if (!free_slot)
+			{
+				failed = true;
+				goto finish;
+			}
+
+			free_slot->isFree = false;
+		}
+		else
+			free_slot = slots;
+
+		/*
+		 * If the idle slot found as done processing its object list, we can
+		 * pop an item from the global processing list and affect it to this
+		 * slot
+		 */
+		if (free_slot->cell == NULL)
+		{
+			free_slot->cell = cur->head;
+			cell = cell->next;
+			pending_conn++;
+		}
+
+		/* Consume an item for this slot's list */
+		slot_process_item(free_slot, &pending_conn, process_type, progname,
+						  echo, verbose, concurrently, parallel);
+	} while (cell != NULL);
+
+	/*
+	 * We now have walked through all the global processing list.  We still
+	 * have to make sure that all slots are done processing their local lists.
+	 */
+	while (pending_conn > 0)
+	{
+		ParallelSlot *free_slot = NULL;
+
+		free_slot = GetIdleSlot(slots, concurrentCons, progname, true);
+		if (!free_slot)
+		{
+			failed = true;
+			goto finish;
+		}
+
+		slot_process_item(free_slot, &pending_conn, process_type, progname,
+						  echo, verbose, concurrently, parallel);
+
+	}
+
+	if (parallel)
+	{
+		int			j;
+
+		/* wait for all connections to finish */
+		for (j = 0; j < concurrentCons; j++)
+		{
+			if (!GetQueryResult((slots + j)->connection, progname))
+			{
+				failed = true;
+				goto finish;
+			}
+		}
+	}
+
+finish:
+	for (i = 0; i < concurrentCons; i++)
+		DisconnectDatabase(slots + i);
+	pfree(slots);
+
+	if (failed)
+		exit(1);
+}
+
+static void
+run_reindex_command(PGconn *conn, ReindexType type, const char *name,
+					const char *progname, bool echo, bool verbose,
+					bool concurrently, bool async)
+{
+	PQExpBufferData sql;
+	bool		status;
+
 	/* build the REINDEX query */
 	initPQExpBuffer(&sql);
 
@@ -358,7 +563,17 @@ reindex_one_database(const char *name, const char *dbname, ReindexType type,
 	/* finish the query */
 	appendPQExpBufferChar(&sql, ';');
 
-	if (!executeMaintenanceCommand(conn, sql.data, echo))
+	if (async)
+	{
+		if (echo)
+			printf("%s\n", sql.data);
+
+		status = PQsendQuery(conn, sql.data) == 1;
+	}
+	else
+		status = executeMaintenanceCommand(conn, sql.data, echo);
+
+	if (!status)
 	{
 		switch (type)
 		{
@@ -383,20 +598,249 @@ reindex_one_database(const char *name, const char *dbname, ReindexType type,
 							 name, PQdb(conn), PQerrorMessage(conn));
 				break;
 		}
-		PQfinish(conn);
-		exit(1);
+		if (!async)
+		{
+			PQfinish(conn);
+			exit(1);
+		}
 	}
 
-	PQfinish(conn);
 	termPQExpBuffer(&sql);
 }
 
+/*
+ * Prepare the list of objects to process by querying the catalogs.
+ *
+ * This function will fill the given process_list list with SimpleStringList
+ * objects, filtered by the list of objects that the script was provided with
+ * if any.
+ * Each SimpleStringList describes objects that can be processed by
+ * multiple connections.  This is required as multiple indexes belonging to the
+ * same table cannot be processed in parallel.
+ */
+static ReindexType
+get_parallel_object_list(PGconn *conn, ReindexType type,
+						 SimpleStringList *user_list,
+						 SimplePtrList * process_list, const char *progname,
+						 bool echo)
+{
+	ReindexType process_type = type;
+	PQExpBufferData buf;
+	PQExpBufferData catalog_query;
+	int			i;
+
+	Assert(process_list->size == 0);
+
+	if (type == REINDEX_DATABASE)
+	{
+		PGresult   *res;
+		SimpleStringList *tables;
+		int			ntups;
+
+		Assert(user_list == NULL);
+
+		process_type = REINDEX_TABLE;
+
+		initPQExpBuffer(&catalog_query);
+
+		/*
+		 * This query is run using a safe search_path, so there's no need to
+		 * fully qualify everything.
+		 */
+		appendPQExpBuffer(&catalog_query,
+						  "SELECT c.relname, ns.nspname\n"
+						  " FROM pg_catalog.pg_class c\n"
+						  " JOIN pg_catalog.pg_namespace ns"
+						  " ON c.relnamespace = ns.oid\n"
+						  " WHERE ns.nspname != 'pg_catalog'\n"
+						  "   AND c.relkind IN ("
+						  CppAsString2(RELKIND_RELATION) ", "
+						  CppAsString2(RELKIND_MATVIEW) ")\n"
+						  " ORDER BY c.relpages DESC;");
+
+		res = executeQuery(conn, catalog_query.data, progname, echo);
+		termPQExpBuffer(&catalog_query);
+
+		/*
+		 * If no rows are returned, there are no matching tables, so we are
+		 * done.
+		 */
+		ntups = PQntuples(res);
+		if (ntups == 0)
+		{
+			PQclear(res);
+			PQfinish(conn);
+			return process_type;
+		}
+
+		/* Build qualified identifiers for each table */
+		initPQExpBuffer(&buf);
+		for (i = 0; i < ntups; i++)
+		{
+			tables = pg_malloc0(sizeof(SimpleStringList));
+
+			appendPQExpBufferStr(&buf,
+								 fmtQualifiedId(PQgetvalue(res, i, 1),
+												PQgetvalue(res, i, 0)));
+
+			simple_string_list_append(tables, buf.data);
+			simple_ptr_list_append(process_list, tables);
+			resetPQExpBuffer(&buf);
+		}
+		termPQExpBuffer(&buf);
+		PQclear(res);
+	}
+	else if (type == REINDEX_INDEX)
+	{
+		SimpleStringList *indexes = NULL;
+		SimpleStringListCell *cell;
+		PGresult   *res;
+		int			ntups;
+		char	   *prev_rel = NULL;
+
+		Assert(user_list != NULL);
+
+		initPQExpBuffer(&catalog_query);
+
+		/*
+		 * Since we execute the constructed query with the default search_path
+		 * (which could be unsafe), everything in this query MUST be fully
+		 * qualified. Note that if the user provided catalog indexes, those
+		 * would be processed in parallel with other indexes, with all the
+		 * locking issues that would be implied.
+		 */
+		appendPQExpBuffer(&catalog_query,
+						  "SELECT ic.relname, ns.nspname, tc.oid\n"
+						  " FROM pg_catalog.pg_index i\n"
+						  " JOIN pg_catalog.pg_class ic"
+						  " ON i.indexrelid OPERATOR(pg_catalog.=) ic.oid\n"
+						  " JOIN pg_catalog.pg_namespace ns"
+						  " ON ic.relnamespace OPERATOR(pg_catalog.=) ns.oid\n"
+						  " JOIN pg_catalog.pg_class tc"
+						  " ON i.indrelid OPERATOR(pg_catalog.=) tc.oid\n"
+						  " WHERE i.indexrelid OPERATOR(pg_catalog.=) ANY (array[\n");
+
+		for (cell = user_list->head; cell; cell = cell->next)
+		{
+			if (cell != user_list->head)
+				appendPQExpBuffer(&catalog_query, ", ");
+
+			appendStringLiteralConn(&catalog_query, cell->val, conn);
+			appendPQExpBuffer(&catalog_query, "::pg_catalog.regclass");
+		}
+
+		/*
+		 * We try to get the biggest indexes first so they're processed
+		 * earlier.  We require that all indexes belonging to the same table
+		 * are contiguous, so we can only order by the underlying table size.
+		 * We also need to order by the underlying table oid to make sure that
+		 * indexes belonging to different tables of the same size are still
+		 * correctly ordered.
+		 */
+		appendPQExpBuffer(&catalog_query,
+						  "\n])\n"
+						  " ORDER BY tc.relpages DESC, tc.oid;");
+
+		executeCommand(conn, "RESET search_path;", progname, echo);
+		res = executeQuery(conn, catalog_query.data, progname, echo);
+		termPQExpBuffer(&catalog_query);
+		PQclear(executeQuery(conn, ALWAYS_SECURE_SEARCH_PATH_SQL,
+							 progname, echo));
+
+		/*
+		 * If no rows are returned, there are no matching tables, so we are
+		 * done.
+		 */
+		ntups = PQntuples(res);
+		if (ntups == 0)
+		{
+			PQclear(res);
+			PQfinish(conn);
+			return process_type;
+		}
+
+		indexes = pg_malloc0(sizeof(SimpleStringList));
+
+		/* Build a list of qualified index name, aggregated per table */
+		for (i = 0; i < ntups; i++)
+		{
+			if (!prev_rel)
+				prev_rel = pg_strdup(PQgetvalue(res, i, 2));
+
+			if (strcmp(PQgetvalue(res, i, 2), prev_rel) != 0)
+			{
+				simple_ptr_list_append(process_list, indexes);
+
+				indexes = pg_malloc0(sizeof(SimpleStringList));
+				pg_free(prev_rel);
+				prev_rel = pg_strdup(PQgetvalue(res, i, 2));
+			}
+
+			simple_string_list_append(indexes,
+									  fmtQualifiedId(PQgetvalue(res, i, 1),
+													 PQgetvalue(res, i, 0)));
+		}
+		simple_ptr_list_append(process_list, indexes);
+
+		PQclear(res);
+	}
+
+	/*
+	 * Any other object list is safe to dispatch a-is. Note that if the user
+	 * provided catalog tables, those would be processed in parallel with
+	 * other tables, with all the locking issues that would be implied.
+	 */
+	else
+	{
+		SimpleStringList *res;
+		SimpleStringListCell *cell;
+
+		Assert(user_list != NULL);
+
+		for (cell = user_list->head; cell; cell = cell->next)
+		{
+			res = pg_malloc0(sizeof(SimpleStringList));
+
+			simple_string_list_append(res, cell->val);
+			simple_ptr_list_append(process_list, res);
+		}
+	}
+
+	return process_type;
+}
+
+static void
+slot_process_item(ParallelSlot *slot, int *pending_conn,
+				  ReindexType process_type, const char *progname, bool echo,
+				  bool verbose, bool concurrently, bool parallel)
+{
+	Assert(slot->cell);
+	Assert(*pending_conn > 0);
+
+	/*
+	 * Process one item of current slot local list and advance it.  If not in
+	 * parallel mode, this terminates the program in case of an error.  (The
+	 * parallel case handles query errors in ProcessQueryResult through
+	 * GetIdleSlot.)
+	 */
+	run_reindex_command(slot->connection, process_type,
+						slot->cell->val,
+						progname, echo, verbose, concurrently,
+						parallel);
+
+	slot->cell = slot->cell->next;
+
+	/* Check if that was the last item of this list */
+	if (!slot->cell)
+		(*pending_conn)--;
+}
+
 static void
 reindex_all_databases(const char *maintenance_db,
 					  const char *host, const char *port,
 					  const char *username, enum trivalue prompt_password,
 					  const char *progname, bool echo, bool quiet, bool verbose,
-					  bool concurrently)
+					  bool concurrently, int concurrentCons)
 {
 	PGconn	   *conn;
 	PGresult   *result;
@@ -423,9 +867,10 @@ reindex_all_databases(const char *maintenance_db,
 		appendPQExpBuffer(&connstr, "dbname=");
 		appendConnStrVal(&connstr, dbname);
 
-		reindex_one_database(NULL, connstr.data, REINDEX_DATABASE, host,
+		reindex_one_database(connstr.data, REINDEX_DATABASE, NULL, host,
 							 port, username, prompt_password,
-							 progname, echo, verbose, concurrently);
+							 progname, echo, verbose, concurrently,
+							 concurrentCons);
 	}
 	termPQExpBuffer(&connstr);
 
@@ -444,6 +889,7 @@ help(const char *progname)
 	printf(_("  -d, --dbname=DBNAME       database to reindex\n"));
 	printf(_("  -e, --echo                show the commands being sent to the server\n"));
 	printf(_("  -i, --index=INDEX         recreate specific index(es) only\n"));
+	printf(_("  -j, --jobs=NUM            use this many concurrent connections to reindex\n"));
 	printf(_("  -q, --quiet               don't write any messages\n"));
 	printf(_("  -s, --system              reindex system catalogs\n"));
 	printf(_("  -S, --schema=SCHEMA       reindex specific schema(s) only\n"));
diff --git a/src/bin/scripts/t/090_reindexdb.pl b/src/bin/scripts/t/090_reindexdb.pl
index 1af8ab70ad..a14853b3fc 100644
--- a/src/bin/scripts/t/090_reindexdb.pl
+++ b/src/bin/scripts/t/090_reindexdb.pl
@@ -3,7 +3,7 @@ use warnings;
 
 use PostgresNode;
 use TestLib;
-use Test::More tests => 34;
+use Test::More tests => 39;
 
 program_help_ok('reindexdb');
 program_version_ok('reindexdb');
@@ -77,3 +77,13 @@ $node->command_ok(
 $node->command_ok(
 	[qw(reindexdb --echo --system dbname=template1)],
 	'reindexdb system with connection string');
+
+# parallel processing
+$node->command_fails([qw(vreindexdb -j2 -s)],
+   'vreindexdb cannot process systam catalogs in parallel');
+$node->issues_sql_like([qw(vreindexdb -j2)],
+   qr/statement: REINDEX SYSTEM postgres/,
+   'Global and parallel reindex will issue a REINDEX SYSTEM');
+$node->issues_sql_like([qw(vreindexdb -j2)],
+   qr/statement: REINDEX TABLE public.test1/,
+   'Global and parallel reindex will issue per-table REINDEX');
diff --git a/src/bin/scripts/vacuumdb.c b/src/bin/scripts/vacuumdb.c
index 80c9341a5b..ad4ba0915c 100644
--- a/src/bin/scripts/vacuumdb.c
+++ b/src/bin/scripts/vacuumdb.c
@@ -652,7 +652,7 @@ vacuum_one_database(const char *dbname, vacuumingOptions *vacopts,
 			 * Get a free slot, waiting until one becomes free if none
 			 * currently is.
 			 */
-			free_slot = GetIdleSlot(slots, concurrentCons, progname);
+			free_slot = GetIdleSlot(slots, concurrentCons, progname, false);
 			if (!free_slot)
 			{
 				failed = true;
-- 
2.20.1

