From de079a6e777c0c8381f80478f121b09c173d7909 Mon Sep 17 00:00:00 2001
From: Julien Rouhaud <julien.rouhaud@free.fr>
Date: Fri, 28 Jun 2019 13:11:29 +0200
Subject: [PATCH 1/2] Export vacuumdb's parallel infrastructure

---
 src/bin/scripts/Makefile           |   4 +-
 src/bin/scripts/common.c           |  79 ++++++
 src/bin/scripts/common.h           |   7 +
 src/bin/scripts/scripts_parallel.c | 264 ++++++++++++++++++++
 src/bin/scripts/scripts_parallel.h |  34 +++
 src/bin/scripts/vacuumdb.c         | 374 ++---------------------------
 6 files changed, 407 insertions(+), 355 deletions(-)
 create mode 100644 src/bin/scripts/scripts_parallel.c
 create mode 100644 src/bin/scripts/scripts_parallel.h

diff --git a/src/bin/scripts/Makefile b/src/bin/scripts/Makefile
index 9f352b5e2b..3cd793b134 100644
--- a/src/bin/scripts/Makefile
+++ b/src/bin/scripts/Makefile
@@ -28,7 +28,7 @@ createuser: createuser.o common.o $(WIN32RES) | submake-libpq submake-libpgport
 dropdb: dropdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
 dropuser: dropuser.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
 clusterdb: clusterdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
-vacuumdb: vacuumdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
+vacuumdb: vacuumdb.o common.o scripts_parallel.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
 reindexdb: reindexdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
 pg_isready: pg_isready.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
 
@@ -50,7 +50,7 @@ uninstall:
 
 clean distclean maintainer-clean:
 	rm -f $(addsuffix $(X), $(PROGRAMS)) $(addsuffix .o, $(PROGRAMS))
-	rm -f common.o $(WIN32RES)
+	rm -f common.o scripts_parallel.o $(WIN32RES)
 	rm -rf tmp_check
 
 check:
diff --git a/src/bin/scripts/common.c b/src/bin/scripts/common.c
index 296029d809..8d7cb252b5 100644
--- a/src/bin/scripts/common.c
+++ b/src/bin/scripts/common.c
@@ -22,6 +22,8 @@
 #include "fe_utils/connect.h"
 #include "fe_utils/string_utils.h"
 
+#define ERRCODE_UNDEFINED_TABLE  "42P01"
+
 
 static PGcancel *volatile cancelConn = NULL;
 bool		CancelRequested = false;
@@ -178,6 +180,28 @@ connectMaintenanceDatabase(const char *maintenance_db,
 	return conn;
 }
 
+/*
+ * Disconnect the given connection, canceling any statement if one is active.
+ */
+void
+disconnectDatabase(PGconn *conn)
+{
+	char		errbuf[256];
+
+	if (PQtransactionStatus(conn) == PQTRANS_ACTIVE)
+	{
+		PGcancel   *cancel;
+
+		if ((cancel = PQgetCancel(conn)))
+		{
+			(void) PQcancel(cancel, errbuf, sizeof(errbuf));
+			PQfreeCancel(cancel);
+		}
+	}
+
+	PQfinish(conn);
+}
+
 /*
  * Run a query, return the results, exit program on failure.
  */
@@ -255,6 +279,61 @@ executeMaintenanceCommand(PGconn *conn, const char *query, bool echo)
 	return r;
 }
 
+/*
+ * getQuerySucess
+ *
+ * Process the given connection's result until it's depleted.  Return false if
+ * error was encountered.
+ * Note that this will block if the conn is busy.
+ */
+bool
+getQuerySuccess(PGconn *conn, const char *progname)
+{
+	bool		ok = true;
+	PGresult   *result;
+
+	SetCancelConn(conn);
+	while ((result = PQgetResult(conn)) != NULL)
+	{
+		if (!processQueryResult(conn, result, progname))
+			ok = false;
+	}
+	ResetCancelConn();
+	return ok;
+}
+
+/*
+ * processQueryResult
+ *
+ * Process (and delete) a query result.  Returns true if there's no error,
+ * false otherwise -- but errors about trying to vacuum a missing relation
+ * are reported and subsequently ignored.
+ */
+bool
+processQueryResult(PGconn *conn, PGresult *result, const char *progname)
+{
+	/*
+	 * If it's an error, report it.  Errors about a missing table are harmless
+	 * so we continue processing; but die for other errors.
+	 */
+	if (PQresultStatus(result) != PGRES_COMMAND_OK)
+	{
+		char	   *sqlState = PQresultErrorField(result, PG_DIAG_SQLSTATE);
+
+		pg_log_error("processing of database \"%s\" failed: %s",
+					 PQdb(conn), PQerrorMessage(conn));
+
+		if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
+		{
+			PQclear(result);
+			return false;
+		}
+	}
+
+	PQclear(result);
+	return true;
+}
+
 
 /*
  * Split TABLE[(COLUMNS)] into TABLE and [(COLUMNS)] portions.  When you
diff --git a/src/bin/scripts/common.h b/src/bin/scripts/common.h
index 35d1a3e0d5..9f10e1da96 100644
--- a/src/bin/scripts/common.h
+++ b/src/bin/scripts/common.h
@@ -39,6 +39,8 @@ extern PGconn *connectMaintenanceDatabase(const char *maintenance_db,
 										  const char *pguser, enum trivalue prompt_password,
 										  const char *progname, bool echo);
 
+extern void disconnectDatabase(PGconn *conn);
+
 extern PGresult *executeQuery(PGconn *conn, const char *query,
 							  const char *progname, bool echo);
 
@@ -48,6 +50,11 @@ extern void executeCommand(PGconn *conn, const char *query,
 extern bool executeMaintenanceCommand(PGconn *conn, const char *query,
 									  bool echo);
 
+extern bool getQuerySuccess(PGconn *conn, const char *progname);
+
+extern bool processQueryResult(PGconn *conn, PGresult *result,
+							   const char *progname);
+
 extern void splitTableColumnsSpec(const char *spec, int encoding,
 								  char **table, const char **columns);
 
diff --git a/src/bin/scripts/scripts_parallel.c b/src/bin/scripts/scripts_parallel.c
new file mode 100644
index 0000000000..3999f6f6e4
--- /dev/null
+++ b/src/bin/scripts/scripts_parallel.c
@@ -0,0 +1,264 @@
+/*-------------------------------------------------------------------------
+ *
+ *	scripts_parallel.c
+ *		Parallel support for bin/scripts/
+ *
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/bin/scripts/scripts_parallel.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#ifdef HAVE_SYS_SELECT_H
+#include <sys/select.h>
+#endif
+
+#include "common.h"
+#include "common/logging.h"
+#include "scripts_parallel.h"
+
+static void init_slot(ParallelSlot *slot, PGconn *conn);
+static int	select_loop(int maxFd, fd_set *workerset, bool *aborting);
+
+static void
+init_slot(ParallelSlot *slot, PGconn *conn)
+{
+	slot->connection = conn;
+	/* Initially assume connection is idle */
+	slot->isFree = true;
+}
+
+/*
+ * Loop on select() until a descriptor from the given set becomes readable.
+ *
+ * If we get a cancel request while we're waiting, we forego all further
+ * processing and set the *aborting flag to true.  The return value must be
+ * ignored in this case.  Otherwise, *aborting is set to false.
+ */
+static int
+select_loop(int maxFd, fd_set *workerset, bool *aborting)
+{
+	int			i;
+	fd_set		saveSet = *workerset;
+
+	if (CancelRequested)
+	{
+		*aborting = true;
+		return -1;
+	}
+	else
+		*aborting = false;
+
+	for (;;)
+	{
+		/*
+		 * On Windows, we need to check once in a while for cancel requests;
+		 * on other platforms we rely on select() returning when interrupted.
+		 */
+		struct timeval *tvp;
+#ifdef WIN32
+		struct timeval tv = {0, 1000000};
+
+		tvp = &tv;
+#else
+		tvp = NULL;
+#endif
+
+		*workerset = saveSet;
+		i = select(maxFd + 1, workerset, NULL, NULL, tvp);
+
+#ifdef WIN32
+		if (i == SOCKET_ERROR)
+		{
+			i = -1;
+
+			if (WSAGetLastError() == WSAEINTR)
+				errno = EINTR;
+		}
+#endif
+
+		if (i < 0 && errno == EINTR)
+			continue;			/* ignore this */
+		if (i < 0 || CancelRequested)
+			*aborting = true;	/* but not this */
+		if (i == 0)
+			continue;			/* timeout (Win32 only) */
+		break;
+	}
+
+	return i;
+}
+
+/*
+ * get_idle_slot
+ *		Return a connection slot that is ready to execute a command.
+ *
+ * We return the first slot we find that is marked isFree, if one is;
+ * otherwise, we loop on select() until one socket becomes available.  When
+ * this happens, we read the whole set and mark as free all sockets that become
+ * available.
+ *
+ * If an error occurs, NULL is returned.
+ */
+ParallelSlot *
+ParallelSlotsGetIdle(ParallelSlot *slots, int numslots,
+					 const char *progname)
+{
+	int			i;
+	int			firstFree = -1;
+
+	/* Any connection already known free? */
+	for (i = 0; i < numslots; i++)
+	{
+		if (slots[i].isFree)
+		{
+			slots[i].isFree = false;
+			return slots + i;
+		}
+	}
+
+	/*
+	 * No free slot found, so wait until one of the connections has finished
+	 * its task and return the available slot.
+	 */
+	while (firstFree < 0)
+	{
+		fd_set		slotset;
+		int			maxFd = 0;
+		bool		aborting;
+
+		/* We must reconstruct the fd_set for each call to select_loop */
+		FD_ZERO(&slotset);
+
+		for (i = 0; i < numslots; i++)
+		{
+			int			sock = PQsocket(slots[i].connection);
+
+			/*
+			 * We don't really expect any connections to lose their sockets
+			 * after startup, but just in case, cope by ignoring them.
+			 */
+			if (sock < 0)
+				continue;
+
+			FD_SET(sock, &slotset);
+			if (sock > maxFd)
+				maxFd = sock;
+		}
+
+		SetCancelConn(slots->connection);
+		i = select_loop(maxFd, &slotset, &aborting);
+		ResetCancelConn();
+
+		if (aborting)
+		{
+			/*
+			 * We set the cancel-receiving connection to the one in the zeroth
+			 * slot above, so fetch the error from there.
+			 */
+			getQuerySuccess(slots->connection, progname);
+			return NULL;
+		}
+		Assert(i != 0);
+
+		for (i = 0; i < numslots; i++)
+		{
+			int			sock = PQsocket(slots[i].connection);
+
+			if (sock >= 0 && FD_ISSET(sock, &slotset))
+			{
+				/* select() says input is available, so consume it */
+				PQconsumeInput(slots[i].connection);
+			}
+
+			/* Collect result(s) as long as any are available */
+			while (!PQisBusy(slots[i].connection))
+			{
+				PGresult   *result = PQgetResult(slots[i].connection);
+
+				if (result != NULL)
+				{
+					/* Check and discard the command result */
+					if (!processQueryResult(slots[i].connection, result,
+											progname))
+						return NULL;
+				}
+				else
+				{
+					/* This connection has become idle */
+					slots[i].isFree = true;
+					if (firstFree < 0)
+						firstFree = i;
+					break;
+				}
+			}
+		}
+	}
+
+	slots[firstFree].isFree = false;
+	return slots + firstFree;
+}
+
+ParallelSlot *
+ParallelSlotsSetup(const char *dbname, const char *host, const char *port,
+				   const char *username, bool prompt_password, const char *progname,
+				   bool echo, PGconn *conn, int numslots)
+{
+	ParallelSlot *slots;
+	int			i;
+
+	slots = (ParallelSlot *) pg_malloc(sizeof(ParallelSlot) * numslots);
+	init_slot(slots, conn);
+	if (numslots > 1)
+	{
+		for (i = 1; i < numslots; i++)
+		{
+			conn = connectDatabase(dbname, host, port, username, prompt_password,
+								   progname, echo, false, true);
+			init_slot(slots + i, conn);
+		}
+	}
+
+	return slots;
+}
+
+/*
+ * Iterate thhrough all connections in a given array of ParallelSlot and
+ * terminate all connections.
+ */
+void
+ParallelSlotsTerminate(ParallelSlot *slots, int numslots)
+{
+	int			i;
+
+	for (i = 0; i < numslots; i++)
+	{
+		PGconn	   *conn = slots[i].connection;
+
+		if (conn == NULL)
+			continue;
+
+		disconnectDatabase(conn);
+		conn = NULL;
+	}
+}
+
+/* Wait for all connections to finish, returning true if no error occured.. */
+bool
+ParallelSlotsWaitCompletion(ParallelSlot *slots, int numslots, const char *progname)
+{
+	int			i;
+
+	for (i = 0; i < numslots; i++)
+	{
+		if (!getQuerySuccess((slots + i)->connection, progname))
+			return false;
+	}
+
+	return true;
+}
diff --git a/src/bin/scripts/scripts_parallel.h b/src/bin/scripts/scripts_parallel.h
new file mode 100644
index 0000000000..b98dfc8936
--- /dev/null
+++ b/src/bin/scripts/scripts_parallel.h
@@ -0,0 +1,34 @@
+/*
+ *	scripts_parallel.h
+ *		Parallel support for bin/scripts/
+ *
+ *	Copyright (c) 2003-2019, PostgreSQL Global Development Group
+ *
+ *	src/bin/scripts/scripts_parallel.h
+ */
+#ifndef SCRIPTS_PARALLEL_H
+#define SCRIPTS_PARALLEL_H
+
+/* Parallel processing stuff */
+typedef struct ParallelSlot
+{
+	PGconn	   *connection;		/* One connection */
+	bool		isFree;			/* Is it known to be idle? */
+} ParallelSlot;
+
+extern ParallelSlot *ParallelSlotsGetIdle(ParallelSlot *slots, int numslots,
+										  const char *progname);
+
+extern ParallelSlot *ParallelSlotsSetup(const char *dbname, const char *host,
+										const char *port,
+										const char *username, bool prompt_password,
+										const char *progname, bool echo,
+										PGconn *conn, int numslots);
+
+extern void ParallelSlotsTerminate(ParallelSlot *slots, int numslots);
+
+extern bool ParallelSlotsWaitCompletion(ParallelSlot *slots, int numslots,
+										const char *progname);
+
+
+#endif							/* SCRIPTS_PARALLEL_H */
diff --git a/src/bin/scripts/vacuumdb.c b/src/bin/scripts/vacuumdb.c
index 3bcd14b4dc..3a5150d66e 100644
--- a/src/bin/scripts/vacuumdb.c
+++ b/src/bin/scripts/vacuumdb.c
@@ -12,10 +12,6 @@
 
 #include "postgres_fe.h"
 
-#ifdef HAVE_SYS_SELECT_H
-#include <sys/select.h>
-#endif
-
 #include "catalog/pg_class_d.h"
 
 #include "common.h"
@@ -23,17 +19,9 @@
 #include "fe_utils/connect.h"
 #include "fe_utils/simple_list.h"
 #include "fe_utils/string_utils.h"
+#include "scripts_parallel.h"
 
 
-#define ERRCODE_UNDEFINED_TABLE  "42P01"
-
-/* Parallel vacuuming stuff */
-typedef struct ParallelSlot
-{
-	PGconn	   *connection;		/* One connection */
-	bool		isFree;			/* Is it known to be idle? */
-} ParallelSlot;
-
 /* vacuum options controlled by user flags */
 typedef struct vacuumingOptions
 {
@@ -69,21 +57,7 @@ static void prepare_vacuum_command(PQExpBuffer sql, int serverVersion,
 								   vacuumingOptions *vacopts, const char *table);
 
 static void run_vacuum_command(PGconn *conn, const char *sql, bool echo,
-							   const char *table, const char *progname, bool async);
-
-static ParallelSlot *GetIdleSlot(ParallelSlot slots[], int numslots,
-								 const char *progname);
-
-static bool ProcessQueryResult(PGconn *conn, PGresult *result,
-							   const char *progname);
-
-static bool GetQueryResult(PGconn *conn, const char *progname);
-
-static void DisconnectDatabase(ParallelSlot *slot);
-
-static int	select_loop(int maxFd, fd_set *workerset, bool *aborting);
-
-static void init_slot(ParallelSlot *slot, PGconn *conn);
+							   const char *table, const char *progname);
 
 static void help(const char *progname);
 
@@ -625,17 +599,9 @@ vacuum_one_database(const char *dbname, vacuumingOptions *vacopts,
 	 */
 	if (concurrentCons <= 0)
 		concurrentCons = 1;
-	slots = (ParallelSlot *) pg_malloc(sizeof(ParallelSlot) * concurrentCons);
-	init_slot(slots, conn);
-	if (parallel)
-	{
-		for (i = 1; i < concurrentCons; i++)
-		{
-			conn = connectDatabase(dbname, host, port, username, prompt_password,
-								   progname, echo, false, true);
-			init_slot(slots + i, conn);
-		}
-	}
+
+	slots = ParallelSlotsSetup(dbname, host, port, username, prompt_password,
+							   progname, echo, conn, concurrentCons);
 
 	/*
 	 * Prepare all the connections to run the appropriate analyze stage, if
@@ -666,29 +632,12 @@ vacuum_one_database(const char *dbname, vacuumingOptions *vacopts,
 			goto finish;
 		}
 
-		/*
-		 * Get the connection slot to use.  If in parallel mode, here we wait
-		 * for one connection to become available if none already is.  In
-		 * non-parallel mode we simply use the only slot we have, which we
-		 * know to be free.
-		 */
-		if (parallel)
+		free_slot = ParallelSlotsGetIdle(slots, concurrentCons, progname);
+		if (!free_slot)
 		{
-			/*
-			 * Get a free slot, waiting until one becomes free if none
-			 * currently is.
-			 */
-			free_slot = GetIdleSlot(slots, concurrentCons, progname);
-			if (!free_slot)
-			{
-				failed = true;
-				goto finish;
-			}
-
-			free_slot->isFree = false;
+			failed = true;
+			goto finish;
 		}
-		else
-			free_slot = slots;
 
 		prepare_vacuum_command(&sql, PQserverVersion(free_slot->connection),
 							   vacopts, tabname);
@@ -696,32 +645,19 @@ vacuum_one_database(const char *dbname, vacuumingOptions *vacopts,
 		/*
 		 * Execute the vacuum.  If not in parallel mode, this terminates the
 		 * program in case of an error.  (The parallel case handles query
-		 * errors in ProcessQueryResult through GetIdleSlot.)
+		 * errors in ProcessQueryResult through ParallelSlotsGetIdle.)
 		 */
 		run_vacuum_command(free_slot->connection, sql.data,
-						   echo, tabname, progname, parallel);
+						   echo, tabname, progname);
 
 		cell = cell->next;
 	} while (cell != NULL);
 
-	if (parallel)
-	{
-		int			j;
-
-		/* wait for all connections to finish */
-		for (j = 0; j < concurrentCons; j++)
-		{
-			if (!GetQueryResult((slots + j)->connection, progname))
-			{
-				failed = true;
-				goto finish;
-			}
-		}
-	}
+	if (!ParallelSlotsWaitCompletion(slots, concurrentCons, progname))
+		failed = true;
 
 finish:
-	for (i = 0; i < concurrentCons; i++)
-		DisconnectDatabase(slots + i);
+	ParallelSlotsTerminate(slots, concurrentCons);
 	pfree(slots);
 
 	termPQExpBuffer(&sql);
@@ -914,27 +850,21 @@ prepare_vacuum_command(PQExpBuffer sql, int serverVersion,
 }
 
 /*
- * Send a vacuum/analyze command to the server.  In async mode, return after
- * sending the command; else, wait for it to finish.
+ * Send a vacuum/analyze command to the server, returning after sending the
+ * command.
  *
- * Any errors during command execution are reported to stderr.  If async is
- * false, this function exits the program after reporting the error.
+ * Any errors during command execution are reported to stderr.
  */
 static void
 run_vacuum_command(PGconn *conn, const char *sql, bool echo,
-				   const char *table, const char *progname, bool async)
+				   const char *table, const char *progname)
 {
 	bool		status;
 
-	if (async)
-	{
-		if (echo)
-			printf("%s\n", sql);
+	if (echo)
+		printf("%s\n", sql);
 
-		status = PQsendQuery(conn, sql) == 1;
-	}
-	else
-		status = executeMaintenanceCommand(conn, sql, echo);
+	status = PQsendQuery(conn, sql) == 1;
 
 	if (!status)
 	{
@@ -944,271 +874,9 @@ run_vacuum_command(PGconn *conn, const char *sql, bool echo,
 		else
 			pg_log_error("vacuuming of database \"%s\" failed: %s",
 						 PQdb(conn), PQerrorMessage(conn));
-
-		if (!async)
-		{
-			PQfinish(conn);
-			exit(1);
-		}
 	}
 }
 
-/*
- * GetIdleSlot
- *		Return a connection slot that is ready to execute a command.
- *
- * We return the first slot we find that is marked isFree, if one is;
- * otherwise, we loop on select() until one socket becomes available.  When
- * this happens, we read the whole set and mark as free all sockets that become
- * available.
- *
- * If an error occurs, NULL is returned.
- */
-static ParallelSlot *
-GetIdleSlot(ParallelSlot slots[], int numslots,
-			const char *progname)
-{
-	int			i;
-	int			firstFree = -1;
-
-	/* Any connection already known free? */
-	for (i = 0; i < numslots; i++)
-	{
-		if (slots[i].isFree)
-			return slots + i;
-	}
-
-	/*
-	 * No free slot found, so wait until one of the connections has finished
-	 * its task and return the available slot.
-	 */
-	while (firstFree < 0)
-	{
-		fd_set		slotset;
-		int			maxFd = 0;
-		bool		aborting;
-
-		/* We must reconstruct the fd_set for each call to select_loop */
-		FD_ZERO(&slotset);
-
-		for (i = 0; i < numslots; i++)
-		{
-			int			sock = PQsocket(slots[i].connection);
-
-			/*
-			 * We don't really expect any connections to lose their sockets
-			 * after startup, but just in case, cope by ignoring them.
-			 */
-			if (sock < 0)
-				continue;
-
-			FD_SET(sock, &slotset);
-			if (sock > maxFd)
-				maxFd = sock;
-		}
-
-		SetCancelConn(slots->connection);
-		i = select_loop(maxFd, &slotset, &aborting);
-		ResetCancelConn();
-
-		if (aborting)
-		{
-			/*
-			 * We set the cancel-receiving connection to the one in the zeroth
-			 * slot above, so fetch the error from there.
-			 */
-			GetQueryResult(slots->connection, progname);
-			return NULL;
-		}
-		Assert(i != 0);
-
-		for (i = 0; i < numslots; i++)
-		{
-			int			sock = PQsocket(slots[i].connection);
-
-			if (sock >= 0 && FD_ISSET(sock, &slotset))
-			{
-				/* select() says input is available, so consume it */
-				PQconsumeInput(slots[i].connection);
-			}
-
-			/* Collect result(s) as long as any are available */
-			while (!PQisBusy(slots[i].connection))
-			{
-				PGresult   *result = PQgetResult(slots[i].connection);
-
-				if (result != NULL)
-				{
-					/* Check and discard the command result */
-					if (!ProcessQueryResult(slots[i].connection, result,
-											progname))
-						return NULL;
-				}
-				else
-				{
-					/* This connection has become idle */
-					slots[i].isFree = true;
-					if (firstFree < 0)
-						firstFree = i;
-					break;
-				}
-			}
-		}
-	}
-
-	return slots + firstFree;
-}
-
-/*
- * ProcessQueryResult
- *
- * Process (and delete) a query result.  Returns true if there's no error,
- * false otherwise -- but errors about trying to vacuum a missing relation
- * are reported and subsequently ignored.
- */
-static bool
-ProcessQueryResult(PGconn *conn, PGresult *result, const char *progname)
-{
-	/*
-	 * If it's an error, report it.  Errors about a missing table are harmless
-	 * so we continue processing; but die for other errors.
-	 */
-	if (PQresultStatus(result) != PGRES_COMMAND_OK)
-	{
-		char	   *sqlState = PQresultErrorField(result, PG_DIAG_SQLSTATE);
-
-		pg_log_error("vacuuming of database \"%s\" failed: %s",
-					 PQdb(conn), PQerrorMessage(conn));
-
-		if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
-		{
-			PQclear(result);
-			return false;
-		}
-	}
-
-	PQclear(result);
-	return true;
-}
-
-/*
- * GetQueryResult
- *
- * Pump the conn till it's dry of results; return false if any are errors.
- * Note that this will block if the conn is busy.
- */
-static bool
-GetQueryResult(PGconn *conn, const char *progname)
-{
-	bool		ok = true;
-	PGresult   *result;
-
-	SetCancelConn(conn);
-	while ((result = PQgetResult(conn)) != NULL)
-	{
-		if (!ProcessQueryResult(conn, result, progname))
-			ok = false;
-	}
-	ResetCancelConn();
-	return ok;
-}
-
-/*
- * DisconnectDatabase
- *		Disconnect the connection associated with the given slot
- */
-static void
-DisconnectDatabase(ParallelSlot *slot)
-{
-	char		errbuf[256];
-
-	if (!slot->connection)
-		return;
-
-	if (PQtransactionStatus(slot->connection) == PQTRANS_ACTIVE)
-	{
-		PGcancel   *cancel;
-
-		if ((cancel = PQgetCancel(slot->connection)))
-		{
-			(void) PQcancel(cancel, errbuf, sizeof(errbuf));
-			PQfreeCancel(cancel);
-		}
-	}
-
-	PQfinish(slot->connection);
-	slot->connection = NULL;
-}
-
-/*
- * Loop on select() until a descriptor from the given set becomes readable.
- *
- * If we get a cancel request while we're waiting, we forego all further
- * processing and set the *aborting flag to true.  The return value must be
- * ignored in this case.  Otherwise, *aborting is set to false.
- */
-static int
-select_loop(int maxFd, fd_set *workerset, bool *aborting)
-{
-	int			i;
-	fd_set		saveSet = *workerset;
-
-	if (CancelRequested)
-	{
-		*aborting = true;
-		return -1;
-	}
-	else
-		*aborting = false;
-
-	for (;;)
-	{
-		/*
-		 * On Windows, we need to check once in a while for cancel requests;
-		 * on other platforms we rely on select() returning when interrupted.
-		 */
-		struct timeval *tvp;
-#ifdef WIN32
-		struct timeval tv = {0, 1000000};
-
-		tvp = &tv;
-#else
-		tvp = NULL;
-#endif
-
-		*workerset = saveSet;
-		i = select(maxFd + 1, workerset, NULL, NULL, tvp);
-
-#ifdef WIN32
-		if (i == SOCKET_ERROR)
-		{
-			i = -1;
-
-			if (WSAGetLastError() == WSAEINTR)
-				errno = EINTR;
-		}
-#endif
-
-		if (i < 0 && errno == EINTR)
-			continue;			/* ignore this */
-		if (i < 0 || CancelRequested)
-			*aborting = true;	/* but not this */
-		if (i == 0)
-			continue;			/* timeout (Win32 only) */
-		break;
-	}
-
-	return i;
-}
-
-static void
-init_slot(ParallelSlot *slot, PGconn *conn)
-{
-	slot->connection = conn;
-	/* Initially assume connection is idle */
-	slot->isFree = true;
-}
-
 static void
 help(const char *progname)
 {
-- 
2.20.1

