On Tue, Nov 29, 2022 at 12:02:44PM +0000, Simon Riggs wrote:
> The last important point for me is tests, in src/test/modules
> probably. It might be possible to reuse the final state of other
> modules' tests to test cleanup, or at least integrate a custodian test
> into each module.

Of course.  I found some existing tests for the test_decoding plugin that
appear to reliably generate the files we want the custodian to clean up, so
I added them there.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From d8342d121d39d04e995986b4244abf369b833730 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <bossa...@amazon.com>
Date: Wed, 5 Jan 2022 19:24:22 +0000
Subject: [PATCH v16 1/4] Introduce custodian.

The custodian process is a new auxiliary process that is intended
to help offload tasks could otherwise delay startup and
checkpointing.  This commit simply adds the new process; it does
not yet do anything useful.
---
 src/backend/postmaster/Makefile         |   1 +
 src/backend/postmaster/auxprocess.c     |   8 +
 src/backend/postmaster/custodian.c      | 382 ++++++++++++++++++++++++
 src/backend/postmaster/meson.build      |   1 +
 src/backend/postmaster/postmaster.c     |  38 ++-
 src/backend/storage/ipc/ipci.c          |   3 +
 src/backend/storage/lmgr/proc.c         |   1 +
 src/backend/utils/activity/wait_event.c |   3 +
 src/backend/utils/init/miscinit.c       |   3 +
 src/include/miscadmin.h                 |   3 +
 src/include/postmaster/custodian.h      |  32 ++
 src/include/storage/proc.h              |  11 +-
 src/include/utils/wait_event.h          |   1 +
 13 files changed, 482 insertions(+), 5 deletions(-)
 create mode 100644 src/backend/postmaster/custodian.c
 create mode 100644 src/include/postmaster/custodian.h

diff --git a/src/backend/postmaster/Makefile b/src/backend/postmaster/Makefile
index 3a794e54d6..e1e1d1123f 100644
--- a/src/backend/postmaster/Makefile
+++ b/src/backend/postmaster/Makefile
@@ -18,6 +18,7 @@ OBJS = \
 	bgworker.o \
 	bgwriter.o \
 	checkpointer.o \
+	custodian.o \
 	fork_process.o \
 	interrupt.o \
 	pgarch.o \
diff --git a/src/backend/postmaster/auxprocess.c b/src/backend/postmaster/auxprocess.c
index 7765d1c83d..c275271c95 100644
--- a/src/backend/postmaster/auxprocess.c
+++ b/src/backend/postmaster/auxprocess.c
@@ -20,6 +20,7 @@
 #include "pgstat.h"
 #include "postmaster/auxprocess.h"
 #include "postmaster/bgwriter.h"
+#include "postmaster/custodian.h"
 #include "postmaster/startup.h"
 #include "postmaster/walwriter.h"
 #include "replication/walreceiver.h"
@@ -74,6 +75,9 @@ AuxiliaryProcessMain(AuxProcType auxtype)
 		case CheckpointerProcess:
 			MyBackendType = B_CHECKPOINTER;
 			break;
+		case CustodianProcess:
+			MyBackendType = B_CUSTODIAN;
+			break;
 		case WalWriterProcess:
 			MyBackendType = B_WAL_WRITER;
 			break;
@@ -153,6 +157,10 @@ AuxiliaryProcessMain(AuxProcType auxtype)
 			CheckpointerMain();
 			proc_exit(1);
 
+		case CustodianProcess:
+			CustodianMain();
+			proc_exit(1);
+
 		case WalWriterProcess:
 			WalWriterMain();
 			proc_exit(1);
diff --git a/src/backend/postmaster/custodian.c b/src/backend/postmaster/custodian.c
new file mode 100644
index 0000000000..a94381bc21
--- /dev/null
+++ b/src/backend/postmaster/custodian.c
@@ -0,0 +1,382 @@
+/*-------------------------------------------------------------------------
+ *
+ * custodian.c
+ *
+ * The custodian process handles a variety of non-critical tasks that might
+ * otherwise delay startup, checkpointing, etc.  Offloaded tasks should not
+ * be synchronous (e.g., checkpointing shouldn't wait for the custodian to
+ * complete a task before proceeding).  However, tasks can be synchronously
+ * executed when necessary (e.g., single-user mode).  The custodian is not
+ * an essential process and can shutdown quickly when requested.  The
+ * custodian only wakes up to perform its tasks when its latch is set.
+ *
+ *
+ * Copyright (c) 2022, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ *   src/backend/postmaster/custodian.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "libpq/pqsignal.h"
+#include "pgstat.h"
+#include "postmaster/custodian.h"
+#include "postmaster/interrupt.h"
+#include "storage/bufmgr.h"
+#include "storage/condition_variable.h"
+#include "storage/fd.h"
+#include "storage/proc.h"
+#include "storage/procsignal.h"
+#include "storage/smgr.h"
+#include "utils/memutils.h"
+
+static void DoCustodianTasks(void);
+static CustodianTask CustodianGetNextTask(void);
+static void CustodianEnqueueTask(CustodianTask task);
+static const struct cust_task_funcs_entry *LookupCustodianFunctions(CustodianTask task);
+
+typedef struct
+{
+	slock_t		cust_lck;
+
+	CustodianTask task_queue_elems[NUM_CUSTODIAN_TASKS];
+	int			task_queue_head;
+} CustodianShmemStruct;
+
+static CustodianShmemStruct *CustodianShmem;
+
+typedef void (*CustodianTaskFunction) (void);
+typedef void (*CustodianTaskHandleArg) (Datum arg);
+
+struct cust_task_funcs_entry
+{
+	CustodianTask task;
+	CustodianTaskFunction task_func;		/* performs task */
+	CustodianTaskHandleArg handle_arg_func;	/* handles additional info in request */
+};
+
+/*
+ * Add new tasks here.
+ *
+ * task_func is the logic that will be executed via DoCustodianTasks() when the
+ * matching task is requested via RequestCustodian().  handle_arg_func is an
+ * optional function for providing extra information for the next invocation of
+ * the task.  Typically, the extra information should be stored in shared
+ * memory for access from the custodian process.  handle_arg_func is invoked
+ * before enqueueing the task, and it will still be invoked regardless of
+ * whether the task is already enqueued.
+ */
+static const struct cust_task_funcs_entry cust_task_functions[] = {
+	{INVALID_CUSTODIAN_TASK, NULL, NULL}	/* must be last */
+};
+
+/*
+ * Main entry point for custodian process
+ *
+ * This is invoked from AuxiliaryProcessMain, which has already created the
+ * basic execution environment, but not enabled signals yet.
+ */
+void
+CustodianMain(void)
+{
+	sigjmp_buf	local_sigjmp_buf;
+	MemoryContext custodian_context;
+
+	/*
+	 * Properly accept or ignore signals that might be sent to us.
+	 */
+	pqsignal(SIGHUP, SignalHandlerForConfigReload);
+	pqsignal(SIGINT, SignalHandlerForShutdownRequest);
+	pqsignal(SIGTERM, SignalHandlerForShutdownRequest);
+	/* SIGQUIT handler was already set up by InitPostmasterChild */
+	pqsignal(SIGALRM, SIG_IGN);
+	pqsignal(SIGPIPE, SIG_IGN);
+	pqsignal(SIGUSR1, procsignal_sigusr1_handler);
+	pqsignal(SIGUSR2, SIG_IGN);
+
+	/*
+	 * Reset some signals that are accepted by postmaster but not here
+	 */
+	pqsignal(SIGCHLD, SIG_DFL);
+
+	/*
+	 * Create a memory context that we will do all our work in.  We do this so
+	 * that we can reset the context during error recovery and thereby avoid
+	 * possible memory leaks.
+	 */
+	custodian_context = AllocSetContextCreate(TopMemoryContext,
+											  "Custodian",
+											  ALLOCSET_DEFAULT_SIZES);
+	MemoryContextSwitchTo(custodian_context);
+
+	/*
+	 * If an exception is encountered, processing resumes here.  As with other
+	 * auxiliary processes, we cannot use PG_TRY because this is the bottom of
+	 * the exception stack.
+	 */
+	if (sigsetjmp(local_sigjmp_buf, 1) != 0)
+	{
+		/* Since not using PG_TRY, must reset error stack by hand */
+		error_context_stack = NULL;
+
+		/* Prevent interrupts while cleaning up */
+		HOLD_INTERRUPTS();
+
+		/* Report the error to the server log */
+		EmitErrorReport();
+
+		/*
+		 * These operations are really just a minimal subset of
+		 * AbortTransaction().  We don't have very many resources to worry
+		 * about.
+		 */
+		LWLockReleaseAll();
+		ConditionVariableCancelSleep();
+		AbortBufferIO();
+		UnlockBuffers();
+		ReleaseAuxProcessResources(false);
+		AtEOXact_Buffers(false);
+		AtEOXact_SMgr();
+		AtEOXact_Files(false);
+		AtEOXact_HashTables(false);
+
+		/*
+		 * Now return to normal top-level context and clear ErrorContext for
+		 * next time.
+		 */
+		MemoryContextSwitchTo(custodian_context);
+		FlushErrorState();
+
+		/* Flush any leaked data in the top-level context */
+		MemoryContextResetAndDeleteChildren(custodian_context);
+
+		/* Now we can allow interrupts again */
+		RESUME_INTERRUPTS();
+
+		/*
+		 * Sleep at least 1 second after any error.  A write error is likely
+		 * to be repeated, and we don't want to be filling the error logs as
+		 * fast as we can.
+		 */
+		pg_usleep(1000000L);
+
+		/*
+		 * Close all open files after any error.  This is helpful on Windows,
+		 * where holding deleted files open causes various strange errors.
+		 * It's not clear we need it elsewhere, but shouldn't hurt.
+		 */
+		smgrcloseall();
+
+		/* Report wait end here, when there is no further possibility of wait */
+		pgstat_report_wait_end();
+	}
+
+	/* We can now handle ereport(ERROR) */
+	PG_exception_stack = &local_sigjmp_buf;
+
+	/*
+	 * Unblock signals (they were blocked when the postmaster forked us)
+	 */
+	PG_SETMASK(&UnBlockSig);
+
+	/*
+	 * Advertise out latch that backends can use to wake us up while we're
+	 * sleeping.
+	 */
+	ProcGlobal->custodianLatch = &MyProc->procLatch;
+
+	/*
+	 * Loop forever
+	 */
+	for (;;)
+	{
+		/* Clear any already-pending wakeups */
+		ResetLatch(MyLatch);
+
+		HandleMainLoopInterrupts();
+
+		DoCustodianTasks();
+
+		(void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
+						 WAIT_EVENT_CUSTODIAN_MAIN);
+	}
+
+	pg_unreachable();
+}
+
+/*
+ * DoCustodianTasks
+ *		Perform requested custodian tasks
+ *
+ * If we are not in a standalone backend, the custodian will re-enqueue the
+ * currently running task if an exception is encountered.
+ */
+static void
+DoCustodianTasks(void)
+{
+	CustodianTask	task;
+
+	while ((task = CustodianGetNextTask()) != INVALID_CUSTODIAN_TASK)
+	{
+		CustodianTaskFunction func = (LookupCustodianFunctions(task))->task_func;
+
+		PG_TRY();
+		{
+			(*func) ();
+		}
+		PG_CATCH();
+		{
+			if (IsPostmasterEnvironment)
+				CustodianEnqueueTask(task);
+
+			PG_RE_THROW();
+		}
+		PG_END_TRY();
+	}
+}
+
+Size
+CustodianShmemSize(void)
+{
+	return sizeof(CustodianShmemStruct);
+}
+
+void
+CustodianShmemInit(void)
+{
+	Size		size = CustodianShmemSize();
+	bool		found;
+
+	CustodianShmem = (CustodianShmemStruct *)
+		ShmemInitStruct("Custodian Data", size, &found);
+
+	if (!found)
+	{
+		memset(CustodianShmem, 0, size);
+		SpinLockInit(&CustodianShmem->cust_lck);
+		for (int i = 0; i < NUM_CUSTODIAN_TASKS; i++)
+			CustodianShmem->task_queue_elems[i] = INVALID_CUSTODIAN_TASK;
+	}
+}
+
+/*
+ * RequestCustodian
+ *		Called to request a custodian task.
+ *
+ * In standalone backends, the task is performed immediately in the current
+ * process, and this function will not return until it completes.  Otherwise,
+ * the task is added to the custodian's queue if it is not already enqueued,
+ * and this function returns without waiting for the task to complete.
+ *
+ * arg can be used to provide additional information to the custodian that is
+ * necessary for the task.  Typically, the handling function should store this
+ * information in shared memory for later use by the custodian.  Note that the
+ * task's handling function for arg is invoked before enqueueing the task, and
+ * it will still be invoked regardless of whether the task is already enqueued.
+ */
+void
+RequestCustodian(CustodianTask requested, Datum arg)
+{
+	CustodianTaskHandleArg arg_func = (LookupCustodianFunctions(requested))->handle_arg_func;
+
+	/* First process any extra information provided in the request. */
+	if (arg_func)
+		(*arg_func) (arg);
+
+	CustodianEnqueueTask(requested);
+
+	if (!IsPostmasterEnvironment)
+		DoCustodianTasks();
+	else if (ProcGlobal->custodianLatch)
+		SetLatch(ProcGlobal->custodianLatch);
+}
+
+/*
+ * CustodianEnqueueTask
+ *		Add a task to the custodian's queue
+ *
+ * If the task is already in the queue, this function has no effect.
+ */
+static void
+CustodianEnqueueTask(CustodianTask task)
+{
+	Assert(task >= 0 && task < NUM_CUSTODIAN_TASKS);
+
+	SpinLockAcquire(&CustodianShmem->cust_lck);
+
+	for (int i = 0; i < NUM_CUSTODIAN_TASKS; i++)
+	{
+		int idx = (CustodianShmem->task_queue_head + i) % NUM_CUSTODIAN_TASKS;
+		CustodianTask *elem = &CustodianShmem->task_queue_elems[idx];
+
+		/*
+		 * If the task is already queued in this slot or the slot is empty,
+		 * enqueue the task here and return.
+		 */
+		if (*elem == INVALID_CUSTODIAN_TASK || *elem == task)
+		{
+			*elem = task;
+			SpinLockRelease(&CustodianShmem->cust_lck);
+			return;
+		}
+	}
+
+	/* We should never run out of space in the queue. */
+	elog(ERROR, "could not enqueue custodian task %d", task);
+	pg_unreachable();
+}
+
+/*
+ * CustodianGetNextTask
+ *		Retrieve the next task that the custodian should execute
+ *
+ * The returned task is dequeued from the custodian's queue.  If no tasks are
+ * queued, INVALID_CUSTODIAN_TASK is returned.
+ */
+static CustodianTask
+CustodianGetNextTask(void)
+{
+	CustodianTask next_task;
+	CustodianTask *elem;
+
+	SpinLockAcquire(&CustodianShmem->cust_lck);
+
+	elem = &CustodianShmem->task_queue_elems[CustodianShmem->task_queue_head];
+
+	next_task = *elem;
+	*elem = INVALID_CUSTODIAN_TASK;
+
+	CustodianShmem->task_queue_head++;
+	CustodianShmem->task_queue_head %= NUM_CUSTODIAN_TASKS;
+
+	SpinLockRelease(&CustodianShmem->cust_lck);
+
+	return next_task;
+}
+
+/*
+ * LookupCustodianFunctions
+ *		Given a custodian task, look up its function pointers.
+ */
+static const struct cust_task_funcs_entry *
+LookupCustodianFunctions(CustodianTask task)
+{
+	const struct cust_task_funcs_entry *entry;
+
+	Assert(task >= 0 && task < NUM_CUSTODIAN_TASKS);
+
+	for (entry = cust_task_functions;
+		 entry && entry->task != INVALID_CUSTODIAN_TASK;
+		 entry++)
+	{
+		if (entry->task == task)
+			return entry;
+	}
+
+	/* All tasks must have an entry. */
+	elog(ERROR, "could not lookup functions for custodian task %d", task);
+	pg_unreachable();
+}
diff --git a/src/backend/postmaster/meson.build b/src/backend/postmaster/meson.build
index 293a44ca29..ac72a8a07f 100644
--- a/src/backend/postmaster/meson.build
+++ b/src/backend/postmaster/meson.build
@@ -4,6 +4,7 @@ backend_sources += files(
   'bgworker.c',
   'bgwriter.c',
   'checkpointer.c',
+  'custodian.c',
   'fork_process.c',
   'interrupt.c',
   'pgarch.c',
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index a8a246921f..6a74423172 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -240,6 +240,7 @@ bool		send_abort_for_kill = false;
 static pid_t StartupPID = 0,
 			BgWriterPID = 0,
 			CheckpointerPID = 0,
+			CustodianPID = 0,
 			WalWriterPID = 0,
 			WalReceiverPID = 0,
 			AutoVacPID = 0,
@@ -537,6 +538,7 @@ static void ShmemBackendArrayRemove(Backend *bn);
 #define StartArchiver()			StartChildProcess(ArchiverProcess)
 #define StartBackgroundWriter() StartChildProcess(BgWriterProcess)
 #define StartCheckpointer()		StartChildProcess(CheckpointerProcess)
+#define StartCustodian()		StartChildProcess(CustodianProcess)
 #define StartWalWriter()		StartChildProcess(WalWriterProcess)
 #define StartWalReceiver()		StartChildProcess(WalReceiverProcess)
 
@@ -1808,13 +1810,16 @@ ServerLoop(void)
 		/*
 		 * If no background writer process is running, and we are not in a
 		 * state that prevents it, start one.  It doesn't matter if this
-		 * fails, we'll just try again later.  Likewise for the checkpointer.
+		 * fails, we'll just try again later.  Likewise for the checkpointer
+		 * and custodian.
 		 */
 		if (pmState == PM_RUN || pmState == PM_RECOVERY ||
 			pmState == PM_HOT_STANDBY || pmState == PM_STARTUP)
 		{
 			if (CheckpointerPID == 0)
 				CheckpointerPID = StartCheckpointer();
+			if (CustodianPID == 0)
+				CustodianPID = StartCustodian();
 			if (BgWriterPID == 0)
 				BgWriterPID = StartBackgroundWriter();
 		}
@@ -2728,6 +2733,8 @@ SIGHUP_handler(SIGNAL_ARGS)
 			signal_child(BgWriterPID, SIGHUP);
 		if (CheckpointerPID != 0)
 			signal_child(CheckpointerPID, SIGHUP);
+		if (CustodianPID != 0)
+			signal_child(CustodianPID, SIGHUP);
 		if (WalWriterPID != 0)
 			signal_child(WalWriterPID, SIGHUP);
 		if (WalReceiverPID != 0)
@@ -3025,6 +3032,8 @@ reaper(SIGNAL_ARGS)
 			 */
 			if (CheckpointerPID == 0)
 				CheckpointerPID = StartCheckpointer();
+			if (CustodianPID == 0)
+				CustodianPID = StartCustodian();
 			if (BgWriterPID == 0)
 				BgWriterPID = StartBackgroundWriter();
 			if (WalWriterPID == 0)
@@ -3118,6 +3127,20 @@ reaper(SIGNAL_ARGS)
 			continue;
 		}
 
+		/*
+		 * Was it the custodian?  Normal exit can be ignored; we'll start a
+		 * new one at the next iteration of the postmaster's main loop, if
+		 * necessary.  Any other exit condition is treated as a crash.
+		 */
+		if (pid == CustodianPID)
+		{
+			CustodianPID = 0;
+			if (!EXIT_STATUS_0(exitstatus))
+				HandleChildCrash(pid, exitstatus,
+								 _("custodian process"));
+			continue;
+		}
+
 		/*
 		 * Was it the wal writer?  Normal exit can be ignored; we'll start a
 		 * new one at the next iteration of the postmaster's main loop, if
@@ -3532,6 +3555,12 @@ HandleChildCrash(int pid, int exitstatus, const char *procname)
 	else if (CheckpointerPID != 0 && take_action)
 		sigquit_child(CheckpointerPID);
 
+	/* Take care of the custodian too */
+	if (pid == CustodianPID)
+		CustodianPID = 0;
+	else if (CustodianPID != 0 && take_action)
+		sigquit_child(CustodianPID);
+
 	/* Take care of the walwriter too */
 	if (pid == WalWriterPID)
 		WalWriterPID = 0;
@@ -3685,6 +3714,9 @@ PostmasterStateMachine(void)
 		/* and the bgwriter too */
 		if (BgWriterPID != 0)
 			signal_child(BgWriterPID, SIGTERM);
+		/* and the custodian too */
+		if (CustodianPID != 0)
+			signal_child(CustodianPID, SIGTERM);
 		/* and the walwriter too */
 		if (WalWriterPID != 0)
 			signal_child(WalWriterPID, SIGTERM);
@@ -3722,6 +3754,7 @@ PostmasterStateMachine(void)
 			BgWriterPID == 0 &&
 			(CheckpointerPID == 0 ||
 			 (!FatalError && Shutdown < ImmediateShutdown)) &&
+			CustodianPID == 0 &&
 			WalWriterPID == 0 &&
 			AutoVacPID == 0)
 		{
@@ -3815,6 +3848,7 @@ PostmasterStateMachine(void)
 			Assert(WalReceiverPID == 0);
 			Assert(BgWriterPID == 0);
 			Assert(CheckpointerPID == 0);
+			Assert(CustodianPID == 0);
 			Assert(WalWriterPID == 0);
 			Assert(AutoVacPID == 0);
 			/* syslogger is not considered here */
@@ -4027,6 +4061,8 @@ TerminateChildren(int signal)
 		signal_child(BgWriterPID, signal);
 	if (CheckpointerPID != 0)
 		signal_child(CheckpointerPID, signal);
+	if (CustodianPID != 0)
+		signal_child(CustodianPID, signal);
 	if (WalWriterPID != 0)
 		signal_child(WalWriterPID, signal);
 	if (WalReceiverPID != 0)
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index b204ecdbc3..cf80e65779 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -30,6 +30,7 @@
 #include "postmaster/autovacuum.h"
 #include "postmaster/bgworker_internals.h"
 #include "postmaster/bgwriter.h"
+#include "postmaster/custodian.h"
 #include "postmaster/postmaster.h"
 #include "replication/logicallauncher.h"
 #include "replication/origin.h"
@@ -130,6 +131,7 @@ CalculateShmemSize(int *num_semaphores)
 	size = add_size(size, PMSignalShmemSize());
 	size = add_size(size, ProcSignalShmemSize());
 	size = add_size(size, CheckpointerShmemSize());
+	size = add_size(size, CustodianShmemSize());
 	size = add_size(size, AutoVacuumShmemSize());
 	size = add_size(size, ReplicationSlotsShmemSize());
 	size = add_size(size, ReplicationOriginShmemSize());
@@ -278,6 +280,7 @@ CreateSharedMemoryAndSemaphores(void)
 	PMSignalShmemInit();
 	ProcSignalShmemInit();
 	CheckpointerShmemInit();
+	CustodianShmemInit();
 	AutoVacuumShmemInit();
 	ReplicationSlotsShmemInit();
 	ReplicationOriginShmemInit();
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index b1c35653fc..6a8485e865 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -180,6 +180,7 @@ InitProcGlobal(void)
 	ProcGlobal->startupBufferPinWaitBufId = -1;
 	ProcGlobal->walwriterLatch = NULL;
 	ProcGlobal->checkpointerLatch = NULL;
+	ProcGlobal->custodianLatch = NULL;
 	pg_atomic_init_u32(&ProcGlobal->procArrayGroupFirst, INVALID_PGPROCNO);
 	pg_atomic_init_u32(&ProcGlobal->clogGroupFirst, INVALID_PGPROCNO);
 
diff --git a/src/backend/utils/activity/wait_event.c b/src/backend/utils/activity/wait_event.c
index b2abd75ddb..63fd242b1e 100644
--- a/src/backend/utils/activity/wait_event.c
+++ b/src/backend/utils/activity/wait_event.c
@@ -224,6 +224,9 @@ pgstat_get_wait_activity(WaitEventActivity w)
 		case WAIT_EVENT_CHECKPOINTER_MAIN:
 			event_name = "CheckpointerMain";
 			break;
+		case WAIT_EVENT_CUSTODIAN_MAIN:
+			event_name = "CustodianMain";
+			break;
 		case WAIT_EVENT_LOGICAL_APPLY_MAIN:
 			event_name = "LogicalApplyMain";
 			break;
diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c
index eb1046450b..f19f4c3075 100644
--- a/src/backend/utils/init/miscinit.c
+++ b/src/backend/utils/init/miscinit.c
@@ -278,6 +278,9 @@ GetBackendTypeDesc(BackendType backendType)
 		case B_CHECKPOINTER:
 			backendDesc = "checkpointer";
 			break;
+		case B_CUSTODIAN:
+			backendDesc = "custodian";
+			break;
 		case B_LOGGER:
 			backendDesc = "logger";
 			break;
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index 795182fa51..59a95dd7c0 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -323,6 +323,7 @@ typedef enum BackendType
 	B_BG_WORKER,
 	B_BG_WRITER,
 	B_CHECKPOINTER,
+	B_CUSTODIAN,
 	B_LOGGER,
 	B_STANDALONE_BACKEND,
 	B_STARTUP,
@@ -429,6 +430,7 @@ typedef enum
 	BgWriterProcess,
 	ArchiverProcess,
 	CheckpointerProcess,
+	CustodianProcess,
 	WalWriterProcess,
 	WalReceiverProcess,
 
@@ -441,6 +443,7 @@ extern PGDLLIMPORT AuxProcType MyAuxProcType;
 #define AmBackgroundWriterProcess() (MyAuxProcType == BgWriterProcess)
 #define AmArchiverProcess()			(MyAuxProcType == ArchiverProcess)
 #define AmCheckpointerProcess()		(MyAuxProcType == CheckpointerProcess)
+#define AmCustodianProcess()		(MyAuxProcType == CustodianProcess)
 #define AmWalWriterProcess()		(MyAuxProcType == WalWriterProcess)
 #define AmWalReceiverProcess()		(MyAuxProcType == WalReceiverProcess)
 
diff --git a/src/include/postmaster/custodian.h b/src/include/postmaster/custodian.h
new file mode 100644
index 0000000000..73d0bc5f02
--- /dev/null
+++ b/src/include/postmaster/custodian.h
@@ -0,0 +1,32 @@
+/*-------------------------------------------------------------------------
+ *
+ * custodian.h
+ *   Exports from postmaster/custodian.c.
+ *
+ * Copyright (c) 2022, PostgreSQL Global Development Group
+ *
+ * src/include/postmaster/custodian.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef _CUSTODIAN_H
+#define _CUSTODIAN_H
+
+/*
+ * If you add a new task here, be sure to add its corresponding function
+ * pointers to cust_task_functions in custodian.c.
+ */
+typedef enum CustodianTask
+{
+	FAKE_TASK,						/* placeholder until we have a real task */
+
+	NUM_CUSTODIAN_TASKS,			/* new tasks go above */
+	INVALID_CUSTODIAN_TASK
+} CustodianTask;
+
+extern void CustodianMain(void) pg_attribute_noreturn();
+extern Size CustodianShmemSize(void);
+extern void CustodianShmemInit(void);
+extern void RequestCustodian(CustodianTask task, Datum arg);
+
+#endif						/* _CUSTODIAN_H */
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index aa13e1d66e..8f0e696663 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -400,6 +400,8 @@ typedef struct PROC_HDR
 	Latch	   *walwriterLatch;
 	/* Checkpointer process's latch */
 	Latch	   *checkpointerLatch;
+	/* Custodian process's latch */
+	Latch	   *custodianLatch;
 	/* Current shared estimate of appropriate spins_per_delay value */
 	int			spins_per_delay;
 	/* Buffer id of the buffer that Startup process waits for pin on, or -1 */
@@ -417,11 +419,12 @@ extern PGDLLIMPORT PGPROC *PreparedXactProcs;
  * We set aside some extra PGPROC structures for auxiliary processes,
  * ie things that aren't full-fledged backends but need shmem access.
  *
- * Background writer, checkpointer, WAL writer and archiver run during normal
- * operation.  Startup process and WAL receiver also consume 2 slots, but WAL
- * writer is launched only after startup has exited, so we only need 5 slots.
+ * Background writer, checkpointer, custodian, WAL writer and archiver run
+ * during normal operation.  Startup process and WAL receiver also consume 2
+ * slots, but WAL writer is launched only after startup has exited, so we only
+ * need 6 slots.
  */
-#define NUM_AUXILIARY_PROCS		5
+#define NUM_AUXILIARY_PROCS		6
 
 /* configurable options */
 extern PGDLLIMPORT int DeadlockTimeout;
diff --git a/src/include/utils/wait_event.h b/src/include/utils/wait_event.h
index 0b2100be4a..48602c8a16 100644
--- a/src/include/utils/wait_event.h
+++ b/src/include/utils/wait_event.h
@@ -40,6 +40,7 @@ typedef enum
 	WAIT_EVENT_BGWRITER_HIBERNATE,
 	WAIT_EVENT_BGWRITER_MAIN,
 	WAIT_EVENT_CHECKPOINTER_MAIN,
+	WAIT_EVENT_CUSTODIAN_MAIN,
 	WAIT_EVENT_LOGICAL_APPLY_MAIN,
 	WAIT_EVENT_LOGICAL_LAUNCHER_MAIN,
 	WAIT_EVENT_RECOVERY_WAL_STREAM,
-- 
2.25.1

>From 71407bf47926c707401278d6274db7641549d975 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <bossa...@amazon.com>
Date: Sun, 5 Dec 2021 22:02:40 -0800
Subject: [PATCH v16 2/4] Move removal of old serialized snapshots to
 custodian.

This was only done during checkpoints because it was a convenient
place to put it.  However, if there are many snapshots to remove,
it can significantly extend checkpoint time.  To avoid this, move
this work to the newly-introduced custodian process.
---
 contrib/test_decoding/expected/spill.out    | 21 +++++++++++++++++++++
 contrib/test_decoding/sql/spill.sql         | 17 +++++++++++++++++
 src/backend/access/transam/xlog.c           |  6 ++++--
 src/backend/postmaster/custodian.c          |  2 ++
 src/backend/replication/logical/snapbuild.c |  9 ++++-----
 src/include/postmaster/custodian.h          |  2 +-
 src/include/replication/snapbuild.h         |  2 +-
 7 files changed, 50 insertions(+), 9 deletions(-)

diff --git a/contrib/test_decoding/expected/spill.out b/contrib/test_decoding/expected/spill.out
index 10734bdb6a..75acbd5d5c 100644
--- a/contrib/test_decoding/expected/spill.out
+++ b/contrib/test_decoding/expected/spill.out
@@ -248,6 +248,27 @@ GROUP BY 1 ORDER BY 1;
 (2 rows)
 
 DROP TABLE spill_test;
+-- make sure custodian cleans up files
+CHECKPOINT;
+DO $$
+DECLARE
+    snaps_removed bool;
+    loops int := 0;
+BEGIN
+    LOOP
+        snaps_removed := count(*) = 0 FROM pg_ls_logicalsnapdir();
+        IF snaps_removed OR loops > 120 * 100 THEN EXIT; END IF;
+        PERFORM pg_sleep(0.01);
+        loops := loops + 1;
+    END LOOP;
+END
+$$;
+SELECT count(*) = 0 FROM pg_ls_logicalsnapdir();
+ ?column? 
+----------
+ t
+(1 row)
+
 SELECT pg_drop_replication_slot('regression_slot');
  pg_drop_replication_slot 
 --------------------------
diff --git a/contrib/test_decoding/sql/spill.sql b/contrib/test_decoding/sql/spill.sql
index e638cacd3f..94d522f548 100644
--- a/contrib/test_decoding/sql/spill.sql
+++ b/contrib/test_decoding/sql/spill.sql
@@ -176,4 +176,21 @@ GROUP BY 1 ORDER BY 1;
 
 DROP TABLE spill_test;
 
+-- make sure custodian cleans up files
+CHECKPOINT;
+DO $$
+DECLARE
+    snaps_removed bool;
+    loops int := 0;
+BEGIN
+    LOOP
+        snaps_removed := count(*) = 0 FROM pg_ls_logicalsnapdir();
+        IF snaps_removed OR loops > 120 * 100 THEN EXIT; END IF;
+        PERFORM pg_sleep(0.01);
+        loops := loops + 1;
+    END LOOP;
+END
+$$;
+SELECT count(*) = 0 FROM pg_ls_logicalsnapdir();
+
 SELECT pg_drop_replication_slot('regression_slot');
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index a31fbbff78..c153c32a77 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -76,12 +76,12 @@
 #include "port/atomics.h"
 #include "port/pg_iovec.h"
 #include "postmaster/bgwriter.h"
+#include "postmaster/custodian.h"
 #include "postmaster/startup.h"
 #include "postmaster/walwriter.h"
 #include "replication/logical.h"
 #include "replication/origin.h"
 #include "replication/slot.h"
-#include "replication/snapbuild.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "storage/bufmgr.h"
@@ -7001,10 +7001,12 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
 {
 	CheckPointRelationMap();
 	CheckPointReplicationSlots();
-	CheckPointSnapBuild();
 	CheckPointLogicalRewriteHeap();
 	CheckPointReplicationOrigin();
 
+	/* tasks offloaded to custodian */
+	RequestCustodian(CUSTODIAN_REMOVE_SERIALIZED_SNAPSHOTS, (Datum) 0);
+
 	/* Write out all dirty data in SLRUs and the main buffer pool */
 	TRACE_POSTGRESQL_BUFFER_CHECKPOINT_START(flags);
 	CheckpointStats.ckpt_write_t = GetCurrentTimestamp();
diff --git a/src/backend/postmaster/custodian.c b/src/backend/postmaster/custodian.c
index a94381bc21..d0fd955d4b 100644
--- a/src/backend/postmaster/custodian.c
+++ b/src/backend/postmaster/custodian.c
@@ -25,6 +25,7 @@
 #include "pgstat.h"
 #include "postmaster/custodian.h"
 #include "postmaster/interrupt.h"
+#include "replication/snapbuild.h"
 #include "storage/bufmgr.h"
 #include "storage/condition_variable.h"
 #include "storage/fd.h"
@@ -70,6 +71,7 @@ struct cust_task_funcs_entry
  * whether the task is already enqueued.
  */
 static const struct cust_task_funcs_entry cust_task_functions[] = {
+	{CUSTODIAN_REMOVE_SERIALIZED_SNAPSHOTS, RemoveOldSerializedSnapshots, NULL},
 	{INVALID_CUSTODIAN_TASK, NULL, NULL}	/* must be last */
 };
 
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index beddcbcdea..e7c4f69b42 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -2036,14 +2036,13 @@ SnapBuildRestoreContents(int fd, char *dest, Size size, const char *path)
 
 /*
  * Remove all serialized snapshots that are not required anymore because no
- * slot can need them. This doesn't actually have to run during a checkpoint,
- * but it's a convenient point to schedule this.
+ * slot can need them.
  *
- * NB: We run this during checkpoints even if logical decoding is disabled so
- * we cleanup old slots at some point after it got disabled.
+ * NB: We run this even if logical decoding is disabled so we cleanup old slots
+ * at some point after it got disabled.
  */
 void
-CheckPointSnapBuild(void)
+RemoveOldSerializedSnapshots(void)
 {
 	XLogRecPtr	cutoff;
 	XLogRecPtr	redo;
diff --git a/src/include/postmaster/custodian.h b/src/include/postmaster/custodian.h
index 73d0bc5f02..ab6d4283b9 100644
--- a/src/include/postmaster/custodian.h
+++ b/src/include/postmaster/custodian.h
@@ -18,7 +18,7 @@
  */
 typedef enum CustodianTask
 {
-	FAKE_TASK,						/* placeholder until we have a real task */
+	CUSTODIAN_REMOVE_SERIALIZED_SNAPSHOTS,
 
 	NUM_CUSTODIAN_TASKS,			/* new tasks go above */
 	INVALID_CUSTODIAN_TASK
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index 2a697e57c3..9eba403e0c 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -57,7 +57,7 @@ struct ReorderBuffer;
 struct xl_heap_new_cid;
 struct xl_running_xacts;
 
-extern void CheckPointSnapBuild(void);
+extern void RemoveOldSerializedSnapshots(void);
 
 extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *reorder,
 										  TransactionId xmin_horizon, XLogRecPtr start_lsn,
-- 
2.25.1

>From f3c7ff4ee56a66bd94d43bfabcf866f57d9eb829 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <bossa...@amazon.com>
Date: Sun, 12 Dec 2021 22:07:11 -0800
Subject: [PATCH v16 3/4] Move removal of old logical rewrite mapping files to
 custodian.

If there are many such files to remove, checkpoints can take much
longer.  To avoid this, move this work to the newly-introduced
custodian process.

Since the mapping files include 32-bit transaction IDs, there is a
risk of wraparound if the files are not cleaned up fast enough.
Removing these files in checkpoints offered decent wraparound
protection simply due to the relatively high frequency of
checkpointing.  With this change, servers should still clean up
mappings files with decently high frequency, but in theory the
wraparound risk might worsen for some (e.g., if the custodian is
spending a lot of time on a different task).  Given this is an
existing problem, this change makes no effort to handle the
wraparound risk, and it is left as a future exercise.
---
 contrib/test_decoding/expected/rewrite.out | 21 ++++++
 contrib/test_decoding/sql/rewrite.sql      | 17 +++++
 src/backend/access/heap/rewriteheap.c      | 78 +++++++++++++++++++---
 src/backend/postmaster/custodian.c         | 43 ++++++++++++
 src/include/access/rewriteheap.h           |  1 +
 src/include/postmaster/custodian.h         |  4 ++
 6 files changed, 154 insertions(+), 10 deletions(-)

diff --git a/contrib/test_decoding/expected/rewrite.out b/contrib/test_decoding/expected/rewrite.out
index b30999c436..00b505ef67 100644
--- a/contrib/test_decoding/expected/rewrite.out
+++ b/contrib/test_decoding/expected/rewrite.out
@@ -152,6 +152,27 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
  COMMIT
 (6 rows)
 
+-- make sure custodian cleans up files
+CHECKPOINT;
+DO $$
+DECLARE
+	mappings_removed bool;
+	loops int := 0;
+BEGIN
+	LOOP
+		mappings_removed := count(*) = 0 FROM pg_ls_logicalmapdir();
+		IF mappings_removed OR loops > 120 * 100 THEN EXIT; END IF;
+		PERFORM pg_sleep(0.01);
+		loops := loops + 1;
+	END LOOP;
+END
+$$;
+SELECT count(*) = 0 FROM pg_ls_logicalmapdir();
+ ?column? 
+----------
+ t
+(1 row)
+
 SELECT pg_drop_replication_slot('regression_slot');
  pg_drop_replication_slot 
 --------------------------
diff --git a/contrib/test_decoding/sql/rewrite.sql b/contrib/test_decoding/sql/rewrite.sql
index 62dead3a9b..767eccbed4 100644
--- a/contrib/test_decoding/sql/rewrite.sql
+++ b/contrib/test_decoding/sql/rewrite.sql
@@ -100,6 +100,23 @@ VACUUM FULL pg_proc; VACUUM FULL pg_description; VACUUM FULL pg_shdescription; V
 INSERT INTO replication_example(somedata, testcolumn1, testcolumn3) VALUES (9, 7, 1);
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
+-- make sure custodian cleans up files
+CHECKPOINT;
+DO $$
+DECLARE
+	mappings_removed bool;
+	loops int := 0;
+BEGIN
+	LOOP
+		mappings_removed := count(*) = 0 FROM pg_ls_logicalmapdir();
+		IF mappings_removed OR loops > 120 * 100 THEN EXIT; END IF;
+		PERFORM pg_sleep(0.01);
+		loops := loops + 1;
+	END LOOP;
+END
+$$;
+SELECT count(*) = 0 FROM pg_ls_logicalmapdir();
+
 SELECT pg_drop_replication_slot('regression_slot');
 DROP TABLE IF EXISTS replication_example;
 DROP FUNCTION iamalongfunction();
diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c
index 2fe9e48e50..ff4cd8cef9 100644
--- a/src/backend/access/heap/rewriteheap.c
+++ b/src/backend/access/heap/rewriteheap.c
@@ -116,6 +116,7 @@
 #include "lib/ilist.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "postmaster/custodian.h"
 #include "replication/logical.h"
 #include "replication/slot.h"
 #include "storage/bufmgr.h"
@@ -123,6 +124,7 @@
 #include "storage/procarray.h"
 #include "storage/smgr.h"
 #include "utils/memutils.h"
+#include "utils/pg_lsn.h"
 #include "utils/rel.h"
 
 /*
@@ -1179,7 +1181,8 @@ heap_xlog_logical_rewrite(XLogReaderState *r)
  * Perform a checkpoint for logical rewrite mappings
  *
  * This serves two tasks:
- * 1) Remove all mappings not needed anymore based on the logical restart LSN
+ * 1) Alert the custodian to remove all mappings not needed anymore based on the
+ *    logical restart LSN
  * 2) Flush all remaining mappings to disk, so that replay after a checkpoint
  *	  only has to deal with the parts of a mapping that have been written out
  *	  after the checkpoint started.
@@ -1207,6 +1210,9 @@ CheckPointLogicalRewriteHeap(void)
 	if (cutoff != InvalidXLogRecPtr && redo < cutoff)
 		cutoff = redo;
 
+	/* let the custodian know what it can remove */
+	RequestCustodian(CUSTODIAN_REMOVE_REWRITE_MAPPINGS, LSNGetDatum(cutoff));
+
 	mappings_dir = AllocateDir("pg_logical/mappings");
 	while ((mapping_de = ReadDir(mappings_dir, "pg_logical/mappings")) != NULL)
 	{
@@ -1239,15 +1245,7 @@ CheckPointLogicalRewriteHeap(void)
 
 		lsn = ((uint64) hi) << 32 | lo;
 
-		if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
-		{
-			elog(DEBUG1, "removing logical rewrite file \"%s\"", path);
-			if (unlink(path) < 0)
-				ereport(ERROR,
-						(errcode_for_file_access(),
-						 errmsg("could not remove file \"%s\": %m", path)));
-		}
-		else
+		if (lsn >= cutoff && cutoff != InvalidXLogRecPtr)
 		{
 			/* on some operating systems fsyncing a file requires O_RDWR */
 			int			fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
@@ -1285,3 +1283,63 @@ CheckPointLogicalRewriteHeap(void)
 	/* persist directory entries to disk */
 	fsync_fname("pg_logical/mappings", true);
 }
+
+/*
+ * Remove all mappings not needed anymore based on the logical restart LSN saved
+ * by the checkpointer.  We use this saved value instead of calling
+ * ReplicationSlotsComputeLogicalRestartLSN() so that we don't try to remove
+ * files that a concurrent call to CheckPointLogicalRewriteHeap() is trying to
+ * flush to disk.
+ */
+void
+RemoveOldLogicalRewriteMappings(void)
+{
+	XLogRecPtr	cutoff;
+	DIR		   *mappings_dir;
+	struct dirent *mapping_de;
+	char		path[MAXPGPATH + 20];
+
+	cutoff = CustodianGetLogicalRewriteCutoff();
+
+	mappings_dir = AllocateDir("pg_logical/mappings");
+	while ((mapping_de = ReadDir(mappings_dir, "pg_logical/mappings")) != NULL)
+	{
+		Oid			dboid;
+		Oid			relid;
+		XLogRecPtr	lsn;
+		TransactionId rewrite_xid;
+		TransactionId create_xid;
+		uint32		hi,
+					lo;
+		PGFileType	de_type;
+
+		if (strcmp(mapping_de->d_name, ".") == 0 ||
+			strcmp(mapping_de->d_name, "..") == 0)
+			continue;
+
+		snprintf(path, sizeof(path), "pg_logical/mappings/%s", mapping_de->d_name);
+		de_type = get_dirent_type(path, mapping_de, false, DEBUG1);
+
+		if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_REG)
+			continue;
+
+		/* Skip over files that cannot be ours. */
+		if (strncmp(mapping_de->d_name, "map-", 4) != 0)
+			continue;
+
+		if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
+				   &dboid, &relid, &hi, &lo, &rewrite_xid, &create_xid) != 6)
+			elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
+
+		lsn = ((uint64) hi) << 32 | lo;
+		if (lsn >= cutoff && cutoff != InvalidXLogRecPtr)
+			continue;
+
+		elog(DEBUG1, "removing logical rewrite file \"%s\"", path);
+		if (unlink(path) < 0)
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not remove file \"%s\": %m", path)));
+	}
+	FreeDir(mappings_dir);
+}
diff --git a/src/backend/postmaster/custodian.c b/src/backend/postmaster/custodian.c
index d0fd955d4b..c4d0a22451 100644
--- a/src/backend/postmaster/custodian.c
+++ b/src/backend/postmaster/custodian.c
@@ -21,6 +21,7 @@
  */
 #include "postgres.h"
 
+#include "access/rewriteheap.h"
 #include "libpq/pqsignal.h"
 #include "pgstat.h"
 #include "postmaster/custodian.h"
@@ -33,11 +34,13 @@
 #include "storage/procsignal.h"
 #include "storage/smgr.h"
 #include "utils/memutils.h"
+#include "utils/pg_lsn.h"
 
 static void DoCustodianTasks(void);
 static CustodianTask CustodianGetNextTask(void);
 static void CustodianEnqueueTask(CustodianTask task);
 static const struct cust_task_funcs_entry *LookupCustodianFunctions(CustodianTask task);
+static void CustodianSetLogicalRewriteCutoff(Datum arg);
 
 typedef struct
 {
@@ -45,6 +48,8 @@ typedef struct
 
 	CustodianTask task_queue_elems[NUM_CUSTODIAN_TASKS];
 	int			task_queue_head;
+
+	XLogRecPtr  logical_rewrite_mappings_cutoff;    /* can remove older mappings */
 } CustodianShmemStruct;
 
 static CustodianShmemStruct *CustodianShmem;
@@ -72,6 +77,7 @@ struct cust_task_funcs_entry
  */
 static const struct cust_task_funcs_entry cust_task_functions[] = {
 	{CUSTODIAN_REMOVE_SERIALIZED_SNAPSHOTS, RemoveOldSerializedSnapshots, NULL},
+	{CUSTODIAN_REMOVE_REWRITE_MAPPINGS, RemoveOldLogicalRewriteMappings, CustodianSetLogicalRewriteCutoff},
 	{INVALID_CUSTODIAN_TASK, NULL, NULL}	/* must be last */
 };
 
@@ -382,3 +388,40 @@ LookupCustodianFunctions(CustodianTask task)
 	elog(ERROR, "could not lookup functions for custodian task %d", task);
 	pg_unreachable();
 }
+
+/*
+ * Stores the provided cutoff LSN in the custodian's shared memory.
+ *
+ * It's okay if the cutoff LSN is updated before a previously set cutoff has
+ * been used for cleaning up files.  If that happens, it just means that the
+ * next invocation of RemoveOldLogicalRewriteMappings() will use a more accurate
+ * cutoff.
+ */
+static void
+CustodianSetLogicalRewriteCutoff(Datum arg)
+{
+	SpinLockAcquire(&CustodianShmem->cust_lck);
+	CustodianShmem->logical_rewrite_mappings_cutoff = DatumGetLSN(arg);
+	SpinLockRelease(&CustodianShmem->cust_lck);
+
+	/* if pass-by-ref, free Datum memory */
+#ifndef USE_FLOAT8_BYVAL
+	pfree(DatumGetPointer(arg));
+#endif
+}
+
+/*
+ * Used by the custodian to determine which logical rewrite mapping files it can
+ * remove.
+ */
+XLogRecPtr
+CustodianGetLogicalRewriteCutoff(void)
+{
+	XLogRecPtr  cutoff;
+
+	SpinLockAcquire(&CustodianShmem->cust_lck);
+	cutoff = CustodianShmem->logical_rewrite_mappings_cutoff;
+	SpinLockRelease(&CustodianShmem->cust_lck);
+
+	return cutoff;
+}
diff --git a/src/include/access/rewriteheap.h b/src/include/access/rewriteheap.h
index 5cc04756a5..bc875330d7 100644
--- a/src/include/access/rewriteheap.h
+++ b/src/include/access/rewriteheap.h
@@ -53,5 +53,6 @@ typedef struct LogicalRewriteMappingData
  */
 #define LOGICAL_REWRITE_FORMAT "map-%x-%x-%X_%X-%x-%x"
 extern void CheckPointLogicalRewriteHeap(void);
+extern void RemoveOldLogicalRewriteMappings(void);
 
 #endif							/* REWRITE_HEAP_H */
diff --git a/src/include/postmaster/custodian.h b/src/include/postmaster/custodian.h
index ab6d4283b9..00280c203b 100644
--- a/src/include/postmaster/custodian.h
+++ b/src/include/postmaster/custodian.h
@@ -12,6 +12,8 @@
 #ifndef _CUSTODIAN_H
 #define _CUSTODIAN_H
 
+#include "access/xlogdefs.h"
+
 /*
  * If you add a new task here, be sure to add its corresponding function
  * pointers to cust_task_functions in custodian.c.
@@ -19,6 +21,7 @@
 typedef enum CustodianTask
 {
 	CUSTODIAN_REMOVE_SERIALIZED_SNAPSHOTS,
+	CUSTODIAN_REMOVE_REWRITE_MAPPINGS,
 
 	NUM_CUSTODIAN_TASKS,			/* new tasks go above */
 	INVALID_CUSTODIAN_TASK
@@ -28,5 +31,6 @@ extern void CustodianMain(void) pg_attribute_noreturn();
 extern Size CustodianShmemSize(void);
 extern void CustodianShmemInit(void);
 extern void RequestCustodian(CustodianTask task, Datum arg);
+extern XLogRecPtr CustodianGetLogicalRewriteCutoff(void);
 
 #endif						/* _CUSTODIAN_H */
-- 
2.25.1

>From 6282487d43a83590edd749c77618839a1291e36e Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathandboss...@gmail.com>
Date: Mon, 28 Nov 2022 15:15:37 -0800
Subject: [PATCH v16 4/4] Do not delay shutdown due to long-running custodian
 tasks.

These tasks are not essential enough to delay shutdown and can be
retried the next time the server is running.
---
 src/backend/access/heap/rewriteheap.c       | 9 +++++++++
 src/backend/postmaster/custodian.c          | 8 ++++++++
 src/backend/replication/logical/snapbuild.c | 9 +++++++++
 3 files changed, 26 insertions(+)

diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c
index ff4cd8cef9..a098060d76 100644
--- a/src/backend/access/heap/rewriteheap.c
+++ b/src/backend/access/heap/rewriteheap.c
@@ -117,6 +117,7 @@
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/custodian.h"
+#include "postmaster/interrupt.h"
 #include "replication/logical.h"
 #include "replication/slot.h"
 #include "storage/bufmgr.h"
@@ -1313,6 +1314,14 @@ RemoveOldLogicalRewriteMappings(void)
 					lo;
 		PGFileType	de_type;
 
+		/*
+		 * This task is not essential enough to delay shutdown, so bail out if
+		 * there's a pending shutdown request.  We'll try again the next time
+		 * the server is running.
+		 */
+		if (ShutdownRequestPending)
+			break;
+
 		if (strcmp(mapping_de->d_name, ".") == 0 ||
 			strcmp(mapping_de->d_name, "..") == 0)
 			continue;
diff --git a/src/backend/postmaster/custodian.c b/src/backend/postmaster/custodian.c
index c4d0a22451..394b7047af 100644
--- a/src/backend/postmaster/custodian.c
+++ b/src/backend/postmaster/custodian.c
@@ -231,6 +231,14 @@ DoCustodianTasks(void)
 	{
 		CustodianTaskFunction func = (LookupCustodianFunctions(task))->task_func;
 
+		/*
+		 * Custodian tasks are not essential enough to delay shutdown, so bail
+		 * out if there's a pending shutdown request.  Tasks should be
+		 * requested again and retried the next time the server is running.
+		 */
+		if (ShutdownRequestPending)
+			break;
+
 		PG_TRY();
 		{
 			(*func) ();
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index e7c4f69b42..939ad4c4ab 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -126,6 +126,7 @@
 #include "common/file_utils.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "postmaster/interrupt.h"
 #include "replication/logical.h"
 #include "replication/reorderbuffer.h"
 #include "replication/snapbuild.h"
@@ -2072,6 +2073,14 @@ RemoveOldSerializedSnapshots(void)
 		XLogRecPtr	lsn;
 		PGFileType	de_type;
 
+		/*
+		 * This task is not essential enough to delay shutdown, so bail out if
+		 * there's a pending shutdown request.  We'll try again the next time
+		 * the server is running.
+		 */
+		if (ShutdownRequestPending)
+			break;
+
 		if (strcmp(snap_de->d_name, ".") == 0 ||
 			strcmp(snap_de->d_name, "..") == 0)
 			continue;
-- 
2.25.1

Reply via email to