On Sun, Nov 06, 2022 at 02:38:42PM -0800, Nathan Bossart wrote:
> rebased

another rebase for cfbot

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From b2c36a6d0d8ca5cde374b1c8b34aafaabbd7f6c2 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <bossa...@amazon.com>
Date: Wed, 5 Jan 2022 19:24:22 +0000
Subject: [PATCH v13 1/6] Introduce custodian.

The custodian process is a new auxiliary process that is intended
to help offload tasks could otherwise delay startup and
checkpointing.  This commit simply adds the new process; it does
not yet do anything useful.
---
 src/backend/postmaster/Makefile         |   1 +
 src/backend/postmaster/auxprocess.c     |   8 +
 src/backend/postmaster/custodian.c      | 383 ++++++++++++++++++++++++
 src/backend/postmaster/meson.build      |   1 +
 src/backend/postmaster/postmaster.c     |  38 ++-
 src/backend/storage/ipc/ipci.c          |   3 +
 src/backend/storage/lmgr/proc.c         |   1 +
 src/backend/utils/activity/wait_event.c |   3 +
 src/backend/utils/init/miscinit.c       |   3 +
 src/include/miscadmin.h                 |   3 +
 src/include/postmaster/custodian.h      |  32 ++
 src/include/storage/proc.h              |  11 +-
 src/include/utils/wait_event.h          |   1 +
 13 files changed, 483 insertions(+), 5 deletions(-)
 create mode 100644 src/backend/postmaster/custodian.c
 create mode 100644 src/include/postmaster/custodian.h

diff --git a/src/backend/postmaster/Makefile b/src/backend/postmaster/Makefile
index 3a794e54d6..e1e1d1123f 100644
--- a/src/backend/postmaster/Makefile
+++ b/src/backend/postmaster/Makefile
@@ -18,6 +18,7 @@ OBJS = \
 	bgworker.o \
 	bgwriter.o \
 	checkpointer.o \
+	custodian.o \
 	fork_process.o \
 	interrupt.o \
 	pgarch.o \
diff --git a/src/backend/postmaster/auxprocess.c b/src/backend/postmaster/auxprocess.c
index 7765d1c83d..c275271c95 100644
--- a/src/backend/postmaster/auxprocess.c
+++ b/src/backend/postmaster/auxprocess.c
@@ -20,6 +20,7 @@
 #include "pgstat.h"
 #include "postmaster/auxprocess.h"
 #include "postmaster/bgwriter.h"
+#include "postmaster/custodian.h"
 #include "postmaster/startup.h"
 #include "postmaster/walwriter.h"
 #include "replication/walreceiver.h"
@@ -74,6 +75,9 @@ AuxiliaryProcessMain(AuxProcType auxtype)
 		case CheckpointerProcess:
 			MyBackendType = B_CHECKPOINTER;
 			break;
+		case CustodianProcess:
+			MyBackendType = B_CUSTODIAN;
+			break;
 		case WalWriterProcess:
 			MyBackendType = B_WAL_WRITER;
 			break;
@@ -153,6 +157,10 @@ AuxiliaryProcessMain(AuxProcType auxtype)
 			CheckpointerMain();
 			proc_exit(1);
 
+		case CustodianProcess:
+			CustodianMain();
+			proc_exit(1);
+
 		case WalWriterProcess:
 			WalWriterMain();
 			proc_exit(1);
diff --git a/src/backend/postmaster/custodian.c b/src/backend/postmaster/custodian.c
new file mode 100644
index 0000000000..e90f5d0d1f
--- /dev/null
+++ b/src/backend/postmaster/custodian.c
@@ -0,0 +1,383 @@
+/*-------------------------------------------------------------------------
+ *
+ * custodian.c
+ *
+ * The custodian process handles a variety of non-critical tasks that might
+ * otherwise delay startup, checkpointing, etc.  Offloaded tasks should not
+ * be synchronous (e.g., checkpointing shouldn't wait for the custodian to
+ * complete a task before proceeding).  However, tasks can be synchronously
+ * executed when necessary (e.g., single-user mode).  The custodian is not
+ * an essential process and can shutdown quickly when requested.  The
+ * custodian only wakes up to perform its tasks when its latch is set.
+ *
+ *
+ * Copyright (c) 2022, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ *   src/backend/postmaster/custodian.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "libpq/pqsignal.h"
+#include "pgstat.h"
+#include "postmaster/custodian.h"
+#include "postmaster/interrupt.h"
+#include "storage/bufmgr.h"
+#include "storage/condition_variable.h"
+#include "storage/fd.h"
+#include "storage/proc.h"
+#include "storage/procsignal.h"
+#include "storage/smgr.h"
+#include "utils/memutils.h"
+
+static void DoCustodianTasks(bool retry);
+static CustodianTask CustodianGetNextTask(void);
+static void CustodianEnqueueTask(CustodianTask task);
+static const struct cust_task_funcs_entry *LookupCustodianFunctions(CustodianTask task);
+
+typedef struct
+{
+	slock_t		cust_lck;
+
+	CustodianTask task_queue_elems[NUM_CUSTODIAN_TASKS];
+	int			task_queue_head;
+} CustodianShmemStruct;
+
+static CustodianShmemStruct *CustodianShmem;
+
+typedef void (*CustodianTaskFunction) (void);
+typedef void (*CustodianTaskHandleArg) (Datum arg);
+
+struct cust_task_funcs_entry
+{
+	CustodianTask task;
+	CustodianTaskFunction task_func;		/* performs task */
+	CustodianTaskHandleArg handle_arg_func;	/* handles additional info in request */
+};
+
+/*
+ * Add new tasks here.
+ *
+ * task_func is the logic that will be executed via DoCustodianTasks() when the
+ * matching task is requested via RequestCustodian().  handle_arg_func is an
+ * optional function for providing extra information for the next invocation of
+ * the task.  Typically, the extra information should be stored in shared
+ * memory for access from the custodian process.  handle_arg_func is invoked
+ * before enqueueing the task, and it will still be invoked regardless of
+ * whether the task is already enqueued.
+ */
+static const struct cust_task_funcs_entry cust_task_functions[] = {
+	{INVALID_CUSTODIAN_TASK, NULL, NULL}	/* must be last */
+};
+
+/*
+ * Main entry point for custodian process
+ *
+ * This is invoked from AuxiliaryProcessMain, which has already created the
+ * basic execution environment, but not enabled signals yet.
+ */
+void
+CustodianMain(void)
+{
+	sigjmp_buf	local_sigjmp_buf;
+	MemoryContext custodian_context;
+
+	/*
+	 * Properly accept or ignore signals that might be sent to us.
+	 */
+	pqsignal(SIGHUP, SignalHandlerForConfigReload);
+	pqsignal(SIGINT, SignalHandlerForShutdownRequest);
+	pqsignal(SIGTERM, SignalHandlerForShutdownRequest);
+	/* SIGQUIT handler was already set up by InitPostmasterChild */
+	pqsignal(SIGALRM, SIG_IGN);
+	pqsignal(SIGPIPE, SIG_IGN);
+	pqsignal(SIGUSR1, procsignal_sigusr1_handler);
+	pqsignal(SIGUSR2, SIG_IGN);
+
+	/*
+	 * Reset some signals that are accepted by postmaster but not here
+	 */
+	pqsignal(SIGCHLD, SIG_DFL);
+
+	/*
+	 * Create a memory context that we will do all our work in.  We do this so
+	 * that we can reset the context during error recovery and thereby avoid
+	 * possible memory leaks.
+	 */
+	custodian_context = AllocSetContextCreate(TopMemoryContext,
+											  "Custodian",
+											  ALLOCSET_DEFAULT_SIZES);
+	MemoryContextSwitchTo(custodian_context);
+
+	/*
+	 * If an exception is encountered, processing resumes here.  As with other
+	 * auxiliary processes, we cannot use PG_TRY because this is the bottom of
+	 * the exception stack.
+	 */
+	if (sigsetjmp(local_sigjmp_buf, 1) != 0)
+	{
+		/* Since not using PG_TRY, must reset error stack by hand */
+		error_context_stack = NULL;
+
+		/* Prevent interrupts while cleaning up */
+		HOLD_INTERRUPTS();
+
+		/* Report the error to the server log */
+		EmitErrorReport();
+
+		/*
+		 * These operations are really just a minimal subset of
+		 * AbortTransaction().  We don't have very many resources to worry
+		 * about.
+		 */
+		LWLockReleaseAll();
+		ConditionVariableCancelSleep();
+		AbortBufferIO();
+		UnlockBuffers();
+		ReleaseAuxProcessResources(false);
+		AtEOXact_Buffers(false);
+		AtEOXact_SMgr();
+		AtEOXact_Files(false);
+		AtEOXact_HashTables(false);
+
+		/*
+		 * Now return to normal top-level context and clear ErrorContext for
+		 * next time.
+		 */
+		MemoryContextSwitchTo(custodian_context);
+		FlushErrorState();
+
+		/* Flush any leaked data in the top-level context */
+		MemoryContextResetAndDeleteChildren(custodian_context);
+
+		/* Now we can allow interrupts again */
+		RESUME_INTERRUPTS();
+
+		/*
+		 * Sleep at least 1 second after any error.  A write error is likely
+		 * to be repeated, and we don't want to be filling the error logs as
+		 * fast as we can.
+		 */
+		pg_usleep(1000000L);
+
+		/*
+		 * Close all open files after any error.  This is helpful on Windows,
+		 * where holding deleted files open causes various strange errors.
+		 * It's not clear we need it elsewhere, but shouldn't hurt.
+		 */
+		smgrcloseall();
+
+		/* Report wait end here, when there is no further possibility of wait */
+		pgstat_report_wait_end();
+	}
+
+	/* We can now handle ereport(ERROR) */
+	PG_exception_stack = &local_sigjmp_buf;
+
+	/*
+	 * Unblock signals (they were blocked when the postmaster forked us)
+	 */
+	PG_SETMASK(&UnBlockSig);
+
+	/*
+	 * Advertise out latch that backends can use to wake us up while we're
+	 * sleeping.
+	 */
+	ProcGlobal->custodianLatch = &MyProc->procLatch;
+
+	/*
+	 * Loop forever
+	 */
+	for (;;)
+	{
+		/* Clear any already-pending wakeups */
+		ResetLatch(MyLatch);
+
+		HandleMainLoopInterrupts();
+
+		DoCustodianTasks(true);
+
+		(void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
+						 WAIT_EVENT_CUSTODIAN_MAIN);
+	}
+
+	pg_unreachable();
+}
+
+/*
+ * DoCustodianTasks
+ *		Perform requested custodian tasks
+ *
+ * If retry is true, the custodian will re-enqueue the currently running task if
+ * an exception is encountered.
+ */
+static void
+DoCustodianTasks(bool retry)
+{
+	CustodianTask	task;
+
+	while ((task = CustodianGetNextTask()) != INVALID_CUSTODIAN_TASK)
+	{
+		CustodianTaskFunction func = (LookupCustodianFunctions(task))->task_func;
+
+		PG_TRY();
+		{
+			(*func) ();
+		}
+		PG_CATCH();
+		{
+			if (retry)
+				CustodianEnqueueTask(task);
+
+			PG_RE_THROW();
+		}
+		PG_END_TRY();
+	}
+}
+
+Size
+CustodianShmemSize(void)
+{
+	return sizeof(CustodianShmemStruct);
+}
+
+void
+CustodianShmemInit(void)
+{
+	Size		size = CustodianShmemSize();
+	bool		found;
+
+	CustodianShmem = (CustodianShmemStruct *)
+		ShmemInitStruct("Custodian Data", size, &found);
+
+	if (!found)
+	{
+		memset(CustodianShmem, 0, size);
+		SpinLockInit(&CustodianShmem->cust_lck);
+		for (int i = 0; i < NUM_CUSTODIAN_TASKS; i++)
+			CustodianShmem->task_queue_elems[i] = INVALID_CUSTODIAN_TASK;
+	}
+}
+
+/*
+ * RequestCustodian
+ *		Called to request a custodian task.
+ *
+ * If immediate is true, the task is performed immediately in the current
+ * process, and this function will not return until it completes.  This is
+ * mostly useful for single-user mode.  If immediate is false, the task is added
+ * to the custodian's queue if it is not already enqueued, and this function
+ * returns without waiting for the task to complete.
+ *
+ * arg can be used to provide additional information to the custodian that is
+ * necessary for the task.  Typically, the handling function should store this
+ * information in shared memory for later use by the custodian.  Note that the
+ * task's handling function for arg is invoked before enqueueing the task, and
+ * it will still be invoked regardless of whether the task is already enqueued.
+ */
+void
+RequestCustodian(CustodianTask requested, bool immediate, Datum arg)
+{
+	CustodianTaskHandleArg arg_func = (LookupCustodianFunctions(requested))->handle_arg_func;
+
+	/* First process any extra information provided in the request. */
+	if (arg_func)
+		(*arg_func) (arg);
+
+	CustodianEnqueueTask(requested);
+
+	if (immediate)
+		DoCustodianTasks(false);
+	else if (ProcGlobal->custodianLatch)
+		SetLatch(ProcGlobal->custodianLatch);
+}
+
+/*
+ * CustodianEnqueueTask
+ *		Add a task to the custodian's queue
+ *
+ * If the task is already in the queue, this function has no effect.
+ */
+static void
+CustodianEnqueueTask(CustodianTask task)
+{
+	Assert(task >= 0 && task < NUM_CUSTODIAN_TASKS);
+
+	SpinLockAcquire(&CustodianShmem->cust_lck);
+
+	for (int i = 0; i < NUM_CUSTODIAN_TASKS; i++)
+	{
+		int idx = (CustodianShmem->task_queue_head + i) % NUM_CUSTODIAN_TASKS;
+		CustodianTask *elem = &CustodianShmem->task_queue_elems[idx];
+
+		/*
+		 * If the task is already queued in this slot or the slot is empty,
+		 * enqueue the task here and return.
+		 */
+		if (*elem == INVALID_CUSTODIAN_TASK || *elem == task)
+		{
+			*elem = task;
+			SpinLockRelease(&CustodianShmem->cust_lck);
+			return;
+		}
+	}
+
+	/* We should never run out of space in the queue. */
+	elog(ERROR, "could not enqueue custodian task %d", task);
+	pg_unreachable();
+}
+
+/*
+ * CustodianGetNextTask
+ *		Retrieve the next task that the custodian should execute
+ *
+ * The returned task is dequeued from the custodian's queue.  If no tasks are
+ * queued, INVALID_CUSTODIAN_TASK is returned.
+ */
+static CustodianTask
+CustodianGetNextTask(void)
+{
+	CustodianTask next_task;
+	CustodianTask *elem;
+
+	SpinLockAcquire(&CustodianShmem->cust_lck);
+
+	elem = &CustodianShmem->task_queue_elems[CustodianShmem->task_queue_head];
+
+	next_task = *elem;
+	*elem = INVALID_CUSTODIAN_TASK;
+
+	CustodianShmem->task_queue_head++;
+	CustodianShmem->task_queue_head %= NUM_CUSTODIAN_TASKS;
+
+	SpinLockRelease(&CustodianShmem->cust_lck);
+
+	return next_task;
+}
+
+/*
+ * LookupCustodianFunctions
+ *		Given a custodian task, look up its function pointers.
+ */
+static const struct cust_task_funcs_entry *
+LookupCustodianFunctions(CustodianTask task)
+{
+	const struct cust_task_funcs_entry *entry;
+
+	Assert(task >= 0 && task < NUM_CUSTODIAN_TASKS);
+
+	for (entry = cust_task_functions;
+		 entry && entry->task != INVALID_CUSTODIAN_TASK;
+		 entry++)
+	{
+		if (entry->task == task)
+			return entry;
+	}
+
+	/* All tasks must have an entry. */
+	elog(ERROR, "could not lookup functions for custodian task %d", task);
+	pg_unreachable();
+}
diff --git a/src/backend/postmaster/meson.build b/src/backend/postmaster/meson.build
index 293a44ca29..ac72a8a07f 100644
--- a/src/backend/postmaster/meson.build
+++ b/src/backend/postmaster/meson.build
@@ -4,6 +4,7 @@ backend_sources += files(
   'bgworker.c',
   'bgwriter.c',
   'checkpointer.c',
+  'custodian.c',
   'fork_process.c',
   'interrupt.c',
   'pgarch.c',
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index c83cc8cc6c..00d18ee761 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -240,6 +240,7 @@ bool		send_abort_for_kill = false;
 static pid_t StartupPID = 0,
 			BgWriterPID = 0,
 			CheckpointerPID = 0,
+			CustodianPID = 0,
 			WalWriterPID = 0,
 			WalReceiverPID = 0,
 			AutoVacPID = 0,
@@ -537,6 +538,7 @@ static void ShmemBackendArrayRemove(Backend *bn);
 #define StartArchiver()			StartChildProcess(ArchiverProcess)
 #define StartBackgroundWriter() StartChildProcess(BgWriterProcess)
 #define StartCheckpointer()		StartChildProcess(CheckpointerProcess)
+#define StartCustodian()		StartChildProcess(CustodianProcess)
 #define StartWalWriter()		StartChildProcess(WalWriterProcess)
 #define StartWalReceiver()		StartChildProcess(WalReceiverProcess)
 
@@ -1808,13 +1810,16 @@ ServerLoop(void)
 		/*
 		 * If no background writer process is running, and we are not in a
 		 * state that prevents it, start one.  It doesn't matter if this
-		 * fails, we'll just try again later.  Likewise for the checkpointer.
+		 * fails, we'll just try again later.  Likewise for the checkpointer
+		 * and custodian.
 		 */
 		if (pmState == PM_RUN || pmState == PM_RECOVERY ||
 			pmState == PM_HOT_STANDBY || pmState == PM_STARTUP)
 		{
 			if (CheckpointerPID == 0)
 				CheckpointerPID = StartCheckpointer();
+			if (CustodianPID == 0)
+				CustodianPID = StartCustodian();
 			if (BgWriterPID == 0)
 				BgWriterPID = StartBackgroundWriter();
 		}
@@ -2728,6 +2733,8 @@ SIGHUP_handler(SIGNAL_ARGS)
 			signal_child(BgWriterPID, SIGHUP);
 		if (CheckpointerPID != 0)
 			signal_child(CheckpointerPID, SIGHUP);
+		if (CustodianPID != 0)
+			signal_child(CustodianPID, SIGHUP);
 		if (WalWriterPID != 0)
 			signal_child(WalWriterPID, SIGHUP);
 		if (WalReceiverPID != 0)
@@ -3025,6 +3032,8 @@ reaper(SIGNAL_ARGS)
 			 */
 			if (CheckpointerPID == 0)
 				CheckpointerPID = StartCheckpointer();
+			if (CustodianPID == 0)
+				CustodianPID = StartCustodian();
 			if (BgWriterPID == 0)
 				BgWriterPID = StartBackgroundWriter();
 			if (WalWriterPID == 0)
@@ -3118,6 +3127,20 @@ reaper(SIGNAL_ARGS)
 			continue;
 		}
 
+		/*
+		 * Was it the custodian?  Normal exit can be ignored; we'll start a
+		 * new one at the next iteration of the postmaster's main loop, if
+		 * necessary.  Any other exit condition is treated as a crash.
+		 */
+		if (pid == CustodianPID)
+		{
+			CustodianPID = 0;
+			if (!EXIT_STATUS_0(exitstatus))
+				HandleChildCrash(pid, exitstatus,
+								 _("custodian process"));
+			continue;
+		}
+
 		/*
 		 * Was it the wal writer?  Normal exit can be ignored; we'll start a
 		 * new one at the next iteration of the postmaster's main loop, if
@@ -3532,6 +3555,12 @@ HandleChildCrash(int pid, int exitstatus, const char *procname)
 	else if (CheckpointerPID != 0 && take_action)
 		sigquit_child(CheckpointerPID);
 
+	/* Take care of the custodian too */
+	if (pid == CustodianPID)
+		CustodianPID = 0;
+	else if (CustodianPID != 0 && take_action)
+		sigquit_child(CustodianPID);
+
 	/* Take care of the walwriter too */
 	if (pid == WalWriterPID)
 		WalWriterPID = 0;
@@ -3685,6 +3714,9 @@ PostmasterStateMachine(void)
 		/* and the bgwriter too */
 		if (BgWriterPID != 0)
 			signal_child(BgWriterPID, SIGTERM);
+		/* and the custodian too */
+		if (CustodianPID != 0)
+			signal_child(CustodianPID, SIGTERM);
 		/* and the walwriter too */
 		if (WalWriterPID != 0)
 			signal_child(WalWriterPID, SIGTERM);
@@ -3722,6 +3754,7 @@ PostmasterStateMachine(void)
 			BgWriterPID == 0 &&
 			(CheckpointerPID == 0 ||
 			 (!FatalError && Shutdown < ImmediateShutdown)) &&
+			CustodianPID == 0 &&
 			WalWriterPID == 0 &&
 			AutoVacPID == 0)
 		{
@@ -3815,6 +3848,7 @@ PostmasterStateMachine(void)
 			Assert(WalReceiverPID == 0);
 			Assert(BgWriterPID == 0);
 			Assert(CheckpointerPID == 0);
+			Assert(CustodianPID == 0);
 			Assert(WalWriterPID == 0);
 			Assert(AutoVacPID == 0);
 			/* syslogger is not considered here */
@@ -4027,6 +4061,8 @@ TerminateChildren(int signal)
 		signal_child(BgWriterPID, signal);
 	if (CheckpointerPID != 0)
 		signal_child(CheckpointerPID, signal);
+	if (CustodianPID != 0)
+		signal_child(CustodianPID, signal);
 	if (WalWriterPID != 0)
 		signal_child(WalWriterPID, signal);
 	if (WalReceiverPID != 0)
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index b204ecdbc3..cf80e65779 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -30,6 +30,7 @@
 #include "postmaster/autovacuum.h"
 #include "postmaster/bgworker_internals.h"
 #include "postmaster/bgwriter.h"
+#include "postmaster/custodian.h"
 #include "postmaster/postmaster.h"
 #include "replication/logicallauncher.h"
 #include "replication/origin.h"
@@ -130,6 +131,7 @@ CalculateShmemSize(int *num_semaphores)
 	size = add_size(size, PMSignalShmemSize());
 	size = add_size(size, ProcSignalShmemSize());
 	size = add_size(size, CheckpointerShmemSize());
+	size = add_size(size, CustodianShmemSize());
 	size = add_size(size, AutoVacuumShmemSize());
 	size = add_size(size, ReplicationSlotsShmemSize());
 	size = add_size(size, ReplicationOriginShmemSize());
@@ -278,6 +280,7 @@ CreateSharedMemoryAndSemaphores(void)
 	PMSignalShmemInit();
 	ProcSignalShmemInit();
 	CheckpointerShmemInit();
+	CustodianShmemInit();
 	AutoVacuumShmemInit();
 	ReplicationSlotsShmemInit();
 	ReplicationOriginShmemInit();
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index b1c35653fc..6a8485e865 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -180,6 +180,7 @@ InitProcGlobal(void)
 	ProcGlobal->startupBufferPinWaitBufId = -1;
 	ProcGlobal->walwriterLatch = NULL;
 	ProcGlobal->checkpointerLatch = NULL;
+	ProcGlobal->custodianLatch = NULL;
 	pg_atomic_init_u32(&ProcGlobal->procArrayGroupFirst, INVALID_PGPROCNO);
 	pg_atomic_init_u32(&ProcGlobal->clogGroupFirst, INVALID_PGPROCNO);
 
diff --git a/src/backend/utils/activity/wait_event.c b/src/backend/utils/activity/wait_event.c
index b2abd75ddb..63fd242b1e 100644
--- a/src/backend/utils/activity/wait_event.c
+++ b/src/backend/utils/activity/wait_event.c
@@ -224,6 +224,9 @@ pgstat_get_wait_activity(WaitEventActivity w)
 		case WAIT_EVENT_CHECKPOINTER_MAIN:
 			event_name = "CheckpointerMain";
 			break;
+		case WAIT_EVENT_CUSTODIAN_MAIN:
+			event_name = "CustodianMain";
+			break;
 		case WAIT_EVENT_LOGICAL_APPLY_MAIN:
 			event_name = "LogicalApplyMain";
 			break;
diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c
index eb1046450b..f19f4c3075 100644
--- a/src/backend/utils/init/miscinit.c
+++ b/src/backend/utils/init/miscinit.c
@@ -278,6 +278,9 @@ GetBackendTypeDesc(BackendType backendType)
 		case B_CHECKPOINTER:
 			backendDesc = "checkpointer";
 			break;
+		case B_CUSTODIAN:
+			backendDesc = "custodian";
+			break;
 		case B_LOGGER:
 			backendDesc = "logger";
 			break;
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index 795182fa51..59a95dd7c0 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -323,6 +323,7 @@ typedef enum BackendType
 	B_BG_WORKER,
 	B_BG_WRITER,
 	B_CHECKPOINTER,
+	B_CUSTODIAN,
 	B_LOGGER,
 	B_STANDALONE_BACKEND,
 	B_STARTUP,
@@ -429,6 +430,7 @@ typedef enum
 	BgWriterProcess,
 	ArchiverProcess,
 	CheckpointerProcess,
+	CustodianProcess,
 	WalWriterProcess,
 	WalReceiverProcess,
 
@@ -441,6 +443,7 @@ extern PGDLLIMPORT AuxProcType MyAuxProcType;
 #define AmBackgroundWriterProcess() (MyAuxProcType == BgWriterProcess)
 #define AmArchiverProcess()			(MyAuxProcType == ArchiverProcess)
 #define AmCheckpointerProcess()		(MyAuxProcType == CheckpointerProcess)
+#define AmCustodianProcess()		(MyAuxProcType == CustodianProcess)
 #define AmWalWriterProcess()		(MyAuxProcType == WalWriterProcess)
 #define AmWalReceiverProcess()		(MyAuxProcType == WalReceiverProcess)
 
diff --git a/src/include/postmaster/custodian.h b/src/include/postmaster/custodian.h
new file mode 100644
index 0000000000..170ca61a21
--- /dev/null
+++ b/src/include/postmaster/custodian.h
@@ -0,0 +1,32 @@
+/*-------------------------------------------------------------------------
+ *
+ * custodian.h
+ *   Exports from postmaster/custodian.c.
+ *
+ * Copyright (c) 2022, PostgreSQL Global Development Group
+ *
+ * src/include/postmaster/custodian.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef _CUSTODIAN_H
+#define _CUSTODIAN_H
+
+/*
+ * If you add a new task here, be sure to add its corresponding function
+ * pointers to cust_task_functions in custodian.c.
+ */
+typedef enum CustodianTask
+{
+	FAKE_TASK,						/* placeholder until we have a real task */
+
+	NUM_CUSTODIAN_TASKS,			/* new tasks go above */
+	INVALID_CUSTODIAN_TASK
+} CustodianTask;
+
+extern void CustodianMain(void) pg_attribute_noreturn();
+extern Size CustodianShmemSize(void);
+extern void CustodianShmemInit(void);
+extern void RequestCustodian(CustodianTask task, bool immediate, Datum arg);
+
+#endif						/* _CUSTODIAN_H */
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index aa13e1d66e..8f0e696663 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -400,6 +400,8 @@ typedef struct PROC_HDR
 	Latch	   *walwriterLatch;
 	/* Checkpointer process's latch */
 	Latch	   *checkpointerLatch;
+	/* Custodian process's latch */
+	Latch	   *custodianLatch;
 	/* Current shared estimate of appropriate spins_per_delay value */
 	int			spins_per_delay;
 	/* Buffer id of the buffer that Startup process waits for pin on, or -1 */
@@ -417,11 +419,12 @@ extern PGDLLIMPORT PGPROC *PreparedXactProcs;
  * We set aside some extra PGPROC structures for auxiliary processes,
  * ie things that aren't full-fledged backends but need shmem access.
  *
- * Background writer, checkpointer, WAL writer and archiver run during normal
- * operation.  Startup process and WAL receiver also consume 2 slots, but WAL
- * writer is launched only after startup has exited, so we only need 5 slots.
+ * Background writer, checkpointer, custodian, WAL writer and archiver run
+ * during normal operation.  Startup process and WAL receiver also consume 2
+ * slots, but WAL writer is launched only after startup has exited, so we only
+ * need 6 slots.
  */
-#define NUM_AUXILIARY_PROCS		5
+#define NUM_AUXILIARY_PROCS		6
 
 /* configurable options */
 extern PGDLLIMPORT int DeadlockTimeout;
diff --git a/src/include/utils/wait_event.h b/src/include/utils/wait_event.h
index 0b2100be4a..48602c8a16 100644
--- a/src/include/utils/wait_event.h
+++ b/src/include/utils/wait_event.h
@@ -40,6 +40,7 @@ typedef enum
 	WAIT_EVENT_BGWRITER_HIBERNATE,
 	WAIT_EVENT_BGWRITER_MAIN,
 	WAIT_EVENT_CHECKPOINTER_MAIN,
+	WAIT_EVENT_CUSTODIAN_MAIN,
 	WAIT_EVENT_LOGICAL_APPLY_MAIN,
 	WAIT_EVENT_LOGICAL_LAUNCHER_MAIN,
 	WAIT_EVENT_RECOVERY_WAL_STREAM,
-- 
2.25.1

>From 020e50cf89b7e87c2073c2684b63c1c4e844e6b3 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <bossa...@amazon.com>
Date: Sun, 5 Dec 2021 19:38:20 -0800
Subject: [PATCH v13 2/6] Also remove pgsql_tmp directories during startup.

Presently, the server only removes the contents of the temporary
directories during startup, not the directory itself.  This changes
that to prepare for future commits that will move temporary file
cleanup to a separate auxiliary process.
---
 src/backend/postmaster/postmaster.c         |  2 +-
 src/backend/storage/file/fd.c               | 20 ++++++++++----------
 src/include/storage/fd.h                    |  4 ++--
 src/test/recovery/t/022_crash_temp_files.pl |  6 ++++--
 4 files changed, 17 insertions(+), 15 deletions(-)

diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 00d18ee761..bda8b7f532 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -1113,7 +1113,7 @@ PostmasterMain(int argc, char *argv[])
 	 * safe to do so now, because we verified earlier that there are no
 	 * conflicting Postgres processes in this data directory.
 	 */
-	RemovePgTempFilesInDir(PG_TEMP_FILES_DIR, true, false);
+	RemovePgTempDir(PG_TEMP_FILES_DIR, true, false);
 #endif
 
 	/*
diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c
index 4151cafec5..0e1398d07c 100644
--- a/src/backend/storage/file/fd.c
+++ b/src/backend/storage/file/fd.c
@@ -3081,7 +3081,7 @@ RemovePgTempFiles(void)
 	 * First process temp files in pg_default ($PGDATA/base)
 	 */
 	snprintf(temp_path, sizeof(temp_path), "base/%s", PG_TEMP_FILES_DIR);
-	RemovePgTempFilesInDir(temp_path, true, false);
+	RemovePgTempDir(temp_path, true, false);
 	RemovePgTempRelationFiles("base");
 
 	/*
@@ -3097,7 +3097,7 @@ RemovePgTempFiles(void)
 
 		snprintf(temp_path, sizeof(temp_path), "pg_tblspc/%s/%s/%s",
 				 spc_de->d_name, TABLESPACE_VERSION_DIRECTORY, PG_TEMP_FILES_DIR);
-		RemovePgTempFilesInDir(temp_path, true, false);
+		RemovePgTempDir(temp_path, true, false);
 
 		snprintf(temp_path, sizeof(temp_path), "pg_tblspc/%s/%s",
 				 spc_de->d_name, TABLESPACE_VERSION_DIRECTORY);
@@ -3130,7 +3130,7 @@ RemovePgTempFiles(void)
  * them separate.)
  */
 void
-RemovePgTempFilesInDir(const char *tmpdirname, bool missing_ok, bool unlink_all)
+RemovePgTempDir(const char *tmpdirname, bool missing_ok, bool unlink_all)
 {
 	DIR		   *temp_dir;
 	struct dirent *temp_de;
@@ -3162,13 +3162,7 @@ RemovePgTempFilesInDir(const char *tmpdirname, bool missing_ok, bool unlink_all)
 			else if (type == PGFILETYPE_DIR)
 			{
 				/* recursively remove contents, then directory itself */
-				RemovePgTempFilesInDir(rm_path, false, true);
-
-				if (rmdir(rm_path) < 0)
-					ereport(LOG,
-							(errcode_for_file_access(),
-							 errmsg("could not remove directory \"%s\": %m",
-									rm_path)));
+				RemovePgTempDir(rm_path, false, true);
 			}
 			else
 			{
@@ -3186,6 +3180,12 @@ RemovePgTempFilesInDir(const char *tmpdirname, bool missing_ok, bool unlink_all)
 	}
 
 	FreeDir(temp_dir);
+
+	if (rmdir(tmpdirname) < 0)
+		ereport(LOG,
+				(errcode_for_file_access(),
+				 errmsg("could not remove directory \"%s\": %m",
+						tmpdirname)));
 }
 
 /* Process one tablespace directory, look for per-DB subdirectories */
diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h
index c0a212487d..790b9a9a14 100644
--- a/src/include/storage/fd.h
+++ b/src/include/storage/fd.h
@@ -167,8 +167,8 @@ extern void AtEOXact_Files(bool isCommit);
 extern void AtEOSubXact_Files(bool isCommit, SubTransactionId mySubid,
 							  SubTransactionId parentSubid);
 extern void RemovePgTempFiles(void);
-extern void RemovePgTempFilesInDir(const char *tmpdirname, bool missing_ok,
-								   bool unlink_all);
+extern void RemovePgTempDir(const char *tmpdirname, bool missing_ok,
+							bool unlink_all);
 extern bool looks_like_temp_rel_name(const char *name);
 
 extern int	pg_fsync(int fd);
diff --git a/src/test/recovery/t/022_crash_temp_files.pl b/src/test/recovery/t/022_crash_temp_files.pl
index 53a55c7a8a..8ed8afeadd 100644
--- a/src/test/recovery/t/022_crash_temp_files.pl
+++ b/src/test/recovery/t/022_crash_temp_files.pl
@@ -152,7 +152,8 @@ $node->poll_query_until('postgres', undef, '');
 
 # Check for temporary files
 is( $node->safe_psql(
-		'postgres', 'SELECT COUNT(1) FROM pg_ls_dir($$base/pgsql_tmp$$)'),
+		'postgres',
+		'SELECT COUNT(1) FROM pg_ls_dir($$base$$) WHERE pg_ls_dir = \'pgsql_tmp\''),
 	qq(0),
 	'no temporary files');
 
@@ -268,7 +269,8 @@ $node->restart();
 
 # Check the temporary files -- should be gone
 is( $node->safe_psql(
-		'postgres', 'SELECT COUNT(1) FROM pg_ls_dir($$base/pgsql_tmp$$)'),
+		'postgres',
+		'SELECT COUNT(1) FROM pg_ls_dir($$base$$) WHERE pg_ls_dir = \'pgsql_tmp\''),
 	qq(0),
 	'temporary file was removed');
 
-- 
2.25.1

>From 38c0101d73a36281a1a5b96c13ee9a39fdd9346e Mon Sep 17 00:00:00 2001
From: Nathan Bossart <bossa...@amazon.com>
Date: Sun, 5 Dec 2021 21:16:44 -0800
Subject: [PATCH v13 3/6] Split pgsql_tmp cleanup into two stages.

First, pgsql_tmp directories will be moved to a staging directory
and renamed to prepare them for removal.  Then, all files in these
directories are removed before removing the directories themselves.
This change is being made in preparation for a follow-up change to
offload most temporary file cleanup to the new custodian process.

Note that temporary relation files cannot be cleaned up via the
aforementioned strategy and will not be offloaded to the custodian.

This change also modifies several ereport(LOG, ...) calls within
the temporary file cleanup code to ERROR instead.  While temporary
file cleanup is typically not urgent enough to prevent startup,
excessive lenience might mask bugs.
---
 src/backend/postmaster/postmaster.c |   4 +
 src/backend/storage/file/fd.c       | 215 +++++++++++++++++++++++-----
 src/include/storage/fd.h            |   1 +
 3 files changed, 182 insertions(+), 38 deletions(-)

diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index bda8b7f532..3840da94ce 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -1386,6 +1386,7 @@ PostmasterMain(int argc, char *argv[])
 	 * Remove old temporary files.  At this point there can be no other
 	 * Postgres processes running in this directory, so this should be safe.
 	 */
+	StagePgTempFilesForRemoval();
 	RemovePgTempFiles();
 
 	/*
@@ -3920,7 +3921,10 @@ PostmasterStateMachine(void)
 
 		/* remove leftover temporary files after a crash */
 		if (remove_temp_files_after_crash)
+		{
+			StagePgTempFilesForRemoval();
 			RemovePgTempFiles();
+		}
 
 		/* allow background workers to immediately restart */
 		ResetBackgroundWorkerCrashTimes();
diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c
index 0e1398d07c..9610850d45 100644
--- a/src/backend/storage/file/fd.c
+++ b/src/backend/storage/file/fd.c
@@ -77,6 +77,7 @@
 #include <sys/param.h>
 #include <sys/resource.h>		/* for getrlimit */
 #include <sys/stat.h>
+#include <sys/time.h>
 #include <sys/types.h>
 #ifndef WIN32
 #include <sys/mman.h>
@@ -88,6 +89,7 @@
 #include "access/xact.h"
 #include "access/xlog.h"
 #include "catalog/pg_tablespace.h"
+#include "common/int.h"
 #include "common/file_perm.h"
 #include "common/file_utils.h"
 #include "common/pg_prng.h"
@@ -109,6 +111,8 @@
 #define PG_FLUSH_DATA_WORKS 1
 #endif
 
+#define PG_TEMP_TO_REMOVE_DIR (PG_TEMP_FILES_DIR "_staged_for_removal")
+
 /*
  * We must leave some file descriptors free for system(), the dynamic loader,
  * and other code that tries to open files without consulting fd.c.  This
@@ -335,6 +339,8 @@ static void BeforeShmemExit_Files(int code, Datum arg);
 static void CleanupTempFiles(bool isCommit, bool isProcExit);
 static void RemovePgTempRelationFiles(const char *tsdirname);
 static void RemovePgTempRelationFilesInDbspace(const char *dbspacedirname);
+static void StagePgTempDirForRemoval(const char *tmp_dir);
+static void RemoveStagedPgTempDirs(const char *spc_dir);
 
 static void walkdir(const char *path,
 					void (*action) (const char *fname, bool isdir, int elevel),
@@ -3049,29 +3055,24 @@ CleanupTempFiles(bool isCommit, bool isProcExit)
 		FreeDesc(&allocatedDescs[0]);
 }
 
-
 /*
- * Remove temporary and temporary relation files left over from a prior
- * postmaster session
+ * Stage temporary files left over from a prior postmaster session for removal.
  *
- * This should be called during postmaster startup.  It will forcibly
- * remove any leftover files created by OpenTemporaryFile and any leftover
- * temporary relation files created by mdcreate.
+ * This function also removes any leftover temporary relation files.  Unlike
+ * temporary files stored in pgsql_tmp directories, temporary relation files do
+ * not live in their own directory, so there isn't a tremendously beneficial way
+ * to stage them for removal at a later time.
  *
- * During post-backend-crash restart cycle, this routine is called when
- * remove_temp_files_after_crash GUC is enabled. Multiple crashes while
- * queries are using temp files could result in useless storage usage that can
- * only be reclaimed by a service restart. The argument against enabling it is
- * that someone might want to examine the temporary files for debugging
- * purposes. This does however mean that OpenTemporaryFile had better allow for
- * collision with an existing temp file name.
+ * RemovePgTempFiles() should be called at some point after this function in
+ * order to remove the staged temporary directories.
  *
- * NOTE: this function and its subroutines generally report syscall failures
- * with ereport(LOG) and keep going.  Removing temp files is not so critical
- * that we should fail to start the database when we can't do it.
+ * In EXEC_BACKEND case there is a pgsql_tmp directory at the top level of
+ * DataDir as well.  However, that is *not* cleaned here because doing so would
+ * create a race condition.  It's done separately, earlier in postmaster
+ * startup.
  */
 void
-RemovePgTempFiles(void)
+StagePgTempFilesForRemoval(void)
 {
 	char		temp_path[MAXPGPATH + 10 + sizeof(TABLESPACE_VERSION_DIRECTORY) + sizeof(PG_TEMP_FILES_DIR)];
 	DIR		   *spc_dir;
@@ -3081,7 +3082,8 @@ RemovePgTempFiles(void)
 	 * First process temp files in pg_default ($PGDATA/base)
 	 */
 	snprintf(temp_path, sizeof(temp_path), "base/%s", PG_TEMP_FILES_DIR);
-	RemovePgTempDir(temp_path, true, false);
+	StagePgTempDirForRemoval(temp_path);
+
 	RemovePgTempRelationFiles("base");
 
 	/*
@@ -3089,7 +3091,7 @@ RemovePgTempFiles(void)
 	 */
 	spc_dir = AllocateDir("pg_tblspc");
 
-	while ((spc_de = ReadDirExtended(spc_dir, "pg_tblspc", LOG)) != NULL)
+	while ((spc_de = ReadDir(spc_dir, "pg_tblspc")) != NULL)
 	{
 		if (strcmp(spc_de->d_name, ".") == 0 ||
 			strcmp(spc_de->d_name, "..") == 0)
@@ -3097,7 +3099,7 @@ RemovePgTempFiles(void)
 
 		snprintf(temp_path, sizeof(temp_path), "pg_tblspc/%s/%s/%s",
 				 spc_de->d_name, TABLESPACE_VERSION_DIRECTORY, PG_TEMP_FILES_DIR);
-		RemovePgTempDir(temp_path, true, false);
+		StagePgTempDirForRemoval(temp_path);
 
 		snprintf(temp_path, sizeof(temp_path), "pg_tblspc/%s/%s",
 				 spc_de->d_name, TABLESPACE_VERSION_DIRECTORY);
@@ -3105,21 +3107,160 @@ RemovePgTempFiles(void)
 	}
 
 	FreeDir(spc_dir);
+}
+
+/*
+ * Remove temporary files that have been previously staged for removal by
+ * StagePgTempFilesForRemoval().
+ */
+void
+RemovePgTempFiles(void)
+{
+	char		temp_path[MAXPGPATH + 10 + sizeof(TABLESPACE_VERSION_DIRECTORY) + sizeof(PG_TEMP_FILES_DIR)];
+	DIR		   *spc_dir;
+	struct dirent *spc_de;
+
+	/*
+	 * First process temp files in pg_default ($PGDATA/base)
+	 */
+	RemoveStagedPgTempDirs("base");
 
 	/*
-	 * In EXEC_BACKEND case there is a pgsql_tmp directory at the top level of
-	 * DataDir as well.  However, that is *not* cleaned here because doing so
-	 * would create a race condition.  It's done separately, earlier in
-	 * postmaster startup.
+	 * Cycle through temp directories for all non-default tablespaces.
 	 */
+	spc_dir = AllocateDir("pg_tblspc");
+
+	while ((spc_de = ReadDir(spc_dir, "pg_tblspc")) != NULL)
+	{
+		if (strcmp(spc_de->d_name, ".") == 0 ||
+			strcmp(spc_de->d_name, "..") == 0)
+			continue;
+
+		snprintf(temp_path, sizeof(temp_path), "pg_tblspc/%s/%s",
+				 spc_de->d_name, TABLESPACE_VERSION_DIRECTORY);
+		RemoveStagedPgTempDirs(temp_path);
+	}
+
+	FreeDir(spc_dir);
 }
 
 /*
- * Process one pgsql_tmp directory for RemovePgTempFiles.
+ * StagePgTempDirForRemoval
+ *
+ * This function moves the given directory to a staging directory and renames
+ * it in preparation for removal by a later call to RemoveStagedPgTempDirs().
+ * The current timestamp is appended to the end of the new directory name in
+ * case previously staged pgsql_tmp directories have not yet been removed.
+ */
+static void
+StagePgTempDirForRemoval(const char *tmp_dir)
+{
+	struct stat st;
+	char		stage_path[MAXPGPATH * 2];
+	char		parent_path[MAXPGPATH * 2];
+	char		to_remove_path[MAXPGPATH * 2];
+	struct timeval tv;
+	uint64		epoch;
+
+	/*
+	 * If tmp_dir doesn't exist, there is nothing to stage.
+	 */
+	if (stat(tmp_dir, &st) != 0)
+	{
+		if (errno != ENOENT)
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not stat file \"%s\": %m", tmp_dir)));
+		return;
+	}
+	else if (!S_ISDIR(st.st_mode))
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("\"%s\" is not a directory", tmp_dir)));
+
+	strlcpy(parent_path, tmp_dir, MAXPGPATH * 2);
+	get_parent_directory(parent_path);
+
+	/*
+	 * get_parent_directory() returns an empty string if the input argument is
+	 * just a file name (see comments in path.c), so handle that as being the
+	 * current directory.
+	 */
+	if (strlen(parent_path) == 0)
+		strlcpy(parent_path, ".", MAXPGPATH * 2);
+
+	/*
+	 * Make sure the pgsql_tmp_staged_for_removal directory exists.
+	 */
+	snprintf(to_remove_path, sizeof(to_remove_path), "%s/%s", parent_path,
+			 PG_TEMP_TO_REMOVE_DIR);
+	if (MakePGDirectory(to_remove_path) != 0 && errno != EEXIST)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not create directory \"%s\": %m",
+						to_remove_path)));
+
+	/*
+	 * Pick a sufficiently unique name for the stage directory.  We just append
+	 * the current timestamp to the end of the name.
+	 */
+	gettimeofday(&tv, NULL);
+	if (pg_mul_u64_overflow((uint64) 1000, (uint64) tv.tv_sec, &epoch) ||
+		pg_add_u64_overflow(epoch, (uint64) tv.tv_usec, &epoch))
+		elog(ERROR, "could not stage temporary file directory for removal");
+
+	snprintf(stage_path, sizeof(stage_path), "%s/%s." UINT64_FORMAT,
+			 to_remove_path, PG_TEMP_FILES_DIR, epoch);
+
+	/*
+	 * Rename the temporary directory.
+	 */
+	if (rename(tmp_dir, stage_path) != 0)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not rename directory \"%s\" to \"%s\": %m",
+						tmp_dir, stage_path)));
+}
+
+/*
+ * RemoveStagedPgTempDirs
+ *
+ * This function removes all pgsql_tmp directories that have been staged for
+ * removal by StagePgTempDirForRemoval() in the given tablespace directory.
+ */
+static void
+RemoveStagedPgTempDirs(const char *spc_dir)
+{
+	char		stage_path[MAXPGPATH * 2];
+	char		temp_path[MAXPGPATH * 2];
+	DIR		   *dir;
+	struct dirent *de;
+
+	snprintf(stage_path, sizeof(stage_path), "%s/%s", spc_dir,
+			 PG_TEMP_TO_REMOVE_DIR);
+
+	dir = AllocateDir(stage_path);
+	if (dir == NULL && errno == ENOENT)
+		return;
+
+	while ((de = ReadDir(dir, stage_path)) != NULL)
+	{
+		if (strncmp(de->d_name, PG_TEMP_FILES_DIR,
+					strlen(PG_TEMP_FILES_DIR)) != 0)
+			continue;
+
+		snprintf(temp_path, sizeof(temp_path), "%s/%s", stage_path, de->d_name);
+		RemovePgTempDir(temp_path, true, false);
+	}
+	FreeDir(dir);
+}
+
+/*
+ * Process one pgsql_tmp directory for RemoveStagedPgTempDirs.
  *
  * If missing_ok is true, it's all right for the named directory to not exist.
- * Any other problem results in a LOG message.  (missing_ok should be true at
- * the top level, since pgsql_tmp directories are not created until needed.)
+ * Any other problem results in an ERROR.  (missing_ok should be true at the
+ * top level, since pgsql_tmp directories are not created until needed.)
  *
  * At the top level, this should be called with unlink_all = false, so that
  * only files matching the temporary name prefix will be unlinked.  When
@@ -3141,7 +3282,7 @@ RemovePgTempDir(const char *tmpdirname, bool missing_ok, bool unlink_all)
 	if (temp_dir == NULL && errno == ENOENT && missing_ok)
 		return;
 
-	while ((temp_de = ReadDirExtended(temp_dir, tmpdirname, LOG)) != NULL)
+	while ((temp_de = ReadDir(temp_dir, tmpdirname)) != NULL)
 	{
 		if (strcmp(temp_de->d_name, ".") == 0 ||
 			strcmp(temp_de->d_name, "..") == 0)
@@ -3155,11 +3296,9 @@ RemovePgTempDir(const char *tmpdirname, bool missing_ok, bool unlink_all)
 					PG_TEMP_FILE_PREFIX,
 					strlen(PG_TEMP_FILE_PREFIX)) == 0)
 		{
-			PGFileType	type = get_dirent_type(rm_path, temp_de, false, LOG);
+			PGFileType	type = get_dirent_type(rm_path, temp_de, false, ERROR);
 
-			if (type == PGFILETYPE_ERROR)
-				continue;
-			else if (type == PGFILETYPE_DIR)
+			if (type == PGFILETYPE_DIR)
 			{
 				/* recursively remove contents, then directory itself */
 				RemovePgTempDir(rm_path, false, true);
@@ -3167,14 +3306,14 @@ RemovePgTempDir(const char *tmpdirname, bool missing_ok, bool unlink_all)
 			else
 			{
 				if (unlink(rm_path) < 0)
-					ereport(LOG,
+					ereport(ERROR,
 							(errcode_for_file_access(),
 							 errmsg("could not remove file \"%s\": %m",
 									rm_path)));
 			}
 		}
 		else
-			ereport(LOG,
+			ereport(ERROR,
 					(errmsg("unexpected file found in temporary-files directory: \"%s\"",
 							rm_path)));
 	}
@@ -3182,7 +3321,7 @@ RemovePgTempDir(const char *tmpdirname, bool missing_ok, bool unlink_all)
 	FreeDir(temp_dir);
 
 	if (rmdir(tmpdirname) < 0)
-		ereport(LOG,
+		ereport(ERROR,
 				(errcode_for_file_access(),
 				 errmsg("could not remove directory \"%s\": %m",
 						tmpdirname)));
@@ -3198,7 +3337,7 @@ RemovePgTempRelationFiles(const char *tsdirname)
 
 	ts_dir = AllocateDir(tsdirname);
 
-	while ((de = ReadDirExtended(ts_dir, tsdirname, LOG)) != NULL)
+	while ((de = ReadDir(ts_dir, tsdirname)) != NULL)
 	{
 		/*
 		 * We're only interested in the per-database directories, which have
@@ -3226,7 +3365,7 @@ RemovePgTempRelationFilesInDbspace(const char *dbspacedirname)
 
 	dbspace_dir = AllocateDir(dbspacedirname);
 
-	while ((de = ReadDirExtended(dbspace_dir, dbspacedirname, LOG)) != NULL)
+	while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
 	{
 		if (!looks_like_temp_rel_name(de->d_name))
 			continue;
@@ -3235,7 +3374,7 @@ RemovePgTempRelationFilesInDbspace(const char *dbspacedirname)
 				 dbspacedirname, de->d_name);
 
 		if (unlink(rm_path) < 0)
-			ereport(LOG,
+			ereport(ERROR,
 					(errcode_for_file_access(),
 					 errmsg("could not remove file \"%s\": %m",
 							rm_path)));
diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h
index 790b9a9a14..cf27e90aea 100644
--- a/src/include/storage/fd.h
+++ b/src/include/storage/fd.h
@@ -166,6 +166,7 @@ extern Oid	GetNextTempTableSpace(void);
 extern void AtEOXact_Files(bool isCommit);
 extern void AtEOSubXact_Files(bool isCommit, SubTransactionId mySubid,
 							  SubTransactionId parentSubid);
+extern void StagePgTempFilesForRemoval(void);
 extern void RemovePgTempFiles(void);
 extern void RemovePgTempDir(const char *tmpdirname, bool missing_ok,
 							bool unlink_all);
-- 
2.25.1

>From 01f01a2304e8bee652c429a601fa25783bfe967f Mon Sep 17 00:00:00 2001
From: Nathan Bossart <bossa...@amazon.com>
Date: Sun, 5 Dec 2021 21:42:52 -0800
Subject: [PATCH v13 4/6] Move pgsql_tmp file removal to custodian process.

With this change, startup (and restart after a crash) simply
renames the pgsql_tmp directories, and the custodian process
actually removes all the files in the staged directories as well as
the staged directories themselves.  This should help avoid long
startup delays due to many leftover temporary files.
---
 src/backend/postmaster/custodian.c  |  1 +
 src/backend/postmaster/postmaster.c | 24 +++++++++++++++++++-----
 src/backend/storage/file/fd.c       | 13 +++++++------
 src/include/postmaster/custodian.h  |  2 +-
 4 files changed, 28 insertions(+), 12 deletions(-)

diff --git a/src/backend/postmaster/custodian.c b/src/backend/postmaster/custodian.c
index e90f5d0d1f..fe1f48844e 100644
--- a/src/backend/postmaster/custodian.c
+++ b/src/backend/postmaster/custodian.c
@@ -70,6 +70,7 @@ struct cust_task_funcs_entry
  * whether the task is already enqueued.
  */
 static const struct cust_task_funcs_entry cust_task_functions[] = {
+	{CUSTODIAN_REMOVE_TEMP_FILES, RemovePgTempFiles, NULL},
 	{INVALID_CUSTODIAN_TASK, NULL, NULL}	/* must be last */
 };
 
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 3840da94ce..86000935fd 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -109,6 +109,7 @@
 #include "postmaster/autovacuum.h"
 #include "postmaster/auxprocess.h"
 #include "postmaster/bgworker_internals.h"
+#include "postmaster/custodian.h"
 #include "postmaster/fork_process.h"
 #include "postmaster/interrupt.h"
 #include "postmaster/pgarch.h"
@@ -1385,9 +1386,12 @@ PostmasterMain(int argc, char *argv[])
 	/*
 	 * Remove old temporary files.  At this point there can be no other
 	 * Postgres processes running in this directory, so this should be safe.
+	 *
+	 * Note that this just stages the pgsql_tmp directories for deletion.  The
+	 * custodian process is responsible for actually removing the files.
 	 */
 	StagePgTempFilesForRemoval();
-	RemovePgTempFiles();
+	RequestCustodian(CUSTODIAN_REMOVE_TEMP_FILES, false, (Datum) 0);
 
 	/*
 	 * Initialize the autovacuum subsystem (again, no process start yet)
@@ -3919,12 +3923,14 @@ PostmasterStateMachine(void)
 		ereport(LOG,
 				(errmsg("all server processes terminated; reinitializing")));
 
-		/* remove leftover temporary files after a crash */
+		/*
+		 * Remove leftover temporary files after a crash.
+		 *
+		 * Note that this just stages the pgsql_tmp directories for deletion.
+		 * The custodian process is responsible for actually removing the files.
+		 */
 		if (remove_temp_files_after_crash)
-		{
 			StagePgTempFilesForRemoval();
-			RemovePgTempFiles();
-		}
 
 		/* allow background workers to immediately restart */
 		ResetBackgroundWorkerCrashTimes();
@@ -3937,6 +3943,14 @@ PostmasterStateMachine(void)
 		/* re-create shared memory and semaphores */
 		CreateSharedMemoryAndSemaphores();
 
+		/*
+		 * Now that shared memory is initialized, notify the custodian to clean
+		 * up the staged pgsql_tmp directories.  We do this even if
+		 * remove_temp_files_after_crash is false so that any previously staged
+		 * directories are eventually cleaned up.
+		 */
+		RequestCustodian(CUSTODIAN_REMOVE_TEMP_FILES, false, (Datum) 0);
+
 		StartupPID = StartupDataBase();
 		Assert(StartupPID != 0);
 		StartupStatus = STARTUP_RUNNING;
diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c
index 9610850d45..625355c56a 100644
--- a/src/backend/storage/file/fd.c
+++ b/src/backend/storage/file/fd.c
@@ -96,6 +96,7 @@
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "portability/mem.h"
+#include "postmaster/custodian.h"
 #include "postmaster/startup.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
@@ -1564,9 +1565,9 @@ PathNameOpenFilePerm(const char *fileName, int fileFlags, mode_t fileMode)
  *
  * Directories created within the top-level temporary directory should begin
  * with PG_TEMP_FILE_PREFIX, so that they can be identified as temporary and
- * deleted at startup by RemovePgTempFiles().  Further subdirectories below
- * that do not need any particular prefix.
-*/
+ * deleted by RemovePgTempFiles().  Further subdirectories below that do not
+ * need any particular prefix.
+ */
 void
 PathNameCreateTemporaryDir(const char *basedir, const char *directory)
 {
@@ -1764,9 +1765,9 @@ OpenTemporaryFileInTablespace(Oid tblspcOid, bool rejectError)
  *
  * If the file is inside the top-level temporary directory, its name should
  * begin with PG_TEMP_FILE_PREFIX so that it can be identified as temporary
- * and deleted at startup by RemovePgTempFiles().  Alternatively, it can be
- * inside a directory created with PathNameCreateTemporaryDir(), in which case
- * the prefix isn't needed.
+ * and deleted by RemovePgTempFiles().  Alternatively, it can be inside a
+ * directory created with PathNameCreateTemporaryDir(), in which case the prefix
+ * isn't needed.
  */
 File
 PathNameCreateTemporaryFile(const char *path, bool error_on_failure)
diff --git a/src/include/postmaster/custodian.h b/src/include/postmaster/custodian.h
index 170ca61a21..80890ceadd 100644
--- a/src/include/postmaster/custodian.h
+++ b/src/include/postmaster/custodian.h
@@ -18,7 +18,7 @@
  */
 typedef enum CustodianTask
 {
-	FAKE_TASK,						/* placeholder until we have a real task */
+	CUSTODIAN_REMOVE_TEMP_FILES,
 
 	NUM_CUSTODIAN_TASKS,			/* new tasks go above */
 	INVALID_CUSTODIAN_TASK
-- 
2.25.1

>From 674f54c720bfd7d56de04370e5a015fe929c6b65 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <bossa...@amazon.com>
Date: Sun, 5 Dec 2021 22:02:40 -0800
Subject: [PATCH v13 5/6] Move removal of old serialized snapshots to
 custodian.

This was only done during checkpoints because it was a convenient
place to put it.  However, if there are many snapshots to remove,
it can significantly extend checkpoint time.  To avoid this, move
this work to the newly-introduced custodian process.
---
 src/backend/access/transam/xlog.c           | 8 ++++++--
 src/backend/postmaster/custodian.c          | 2 ++
 src/backend/replication/logical/snapbuild.c | 9 ++++-----
 src/include/postmaster/custodian.h          | 1 +
 src/include/replication/snapbuild.h         | 2 +-
 5 files changed, 14 insertions(+), 8 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index a31fbbff78..4991c10f86 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -76,12 +76,12 @@
 #include "port/atomics.h"
 #include "port/pg_iovec.h"
 #include "postmaster/bgwriter.h"
+#include "postmaster/custodian.h"
 #include "postmaster/startup.h"
 #include "postmaster/walwriter.h"
 #include "replication/logical.h"
 #include "replication/origin.h"
 #include "replication/slot.h"
-#include "replication/snapbuild.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "storage/bufmgr.h"
@@ -7001,10 +7001,14 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
 {
 	CheckPointRelationMap();
 	CheckPointReplicationSlots();
-	CheckPointSnapBuild();
 	CheckPointLogicalRewriteHeap();
 	CheckPointReplicationOrigin();
 
+	/* tasks offloaded to custodian */
+	RequestCustodian(CUSTODIAN_REMOVE_SERIALIZED_SNAPSHOTS,
+					 !IsUnderPostmaster,
+					 (Datum) 0);
+
 	/* Write out all dirty data in SLRUs and the main buffer pool */
 	TRACE_POSTGRESQL_BUFFER_CHECKPOINT_START(flags);
 	CheckpointStats.ckpt_write_t = GetCurrentTimestamp();
diff --git a/src/backend/postmaster/custodian.c b/src/backend/postmaster/custodian.c
index fe1f48844e..855a756ca0 100644
--- a/src/backend/postmaster/custodian.c
+++ b/src/backend/postmaster/custodian.c
@@ -25,6 +25,7 @@
 #include "pgstat.h"
 #include "postmaster/custodian.h"
 #include "postmaster/interrupt.h"
+#include "replication/snapbuild.h"
 #include "storage/bufmgr.h"
 #include "storage/condition_variable.h"
 #include "storage/fd.h"
@@ -71,6 +72,7 @@ struct cust_task_funcs_entry
  */
 static const struct cust_task_funcs_entry cust_task_functions[] = {
 	{CUSTODIAN_REMOVE_TEMP_FILES, RemovePgTempFiles, NULL},
+	{CUSTODIAN_REMOVE_SERIALIZED_SNAPSHOTS, RemoveOldSerializedSnapshots, NULL},
 	{INVALID_CUSTODIAN_TASK, NULL, NULL}	/* must be last */
 };
 
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index a1fd1d92d6..f957b9aa49 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -2037,14 +2037,13 @@ SnapBuildRestoreContents(int fd, char *dest, Size size, const char *path)
 
 /*
  * Remove all serialized snapshots that are not required anymore because no
- * slot can need them. This doesn't actually have to run during a checkpoint,
- * but it's a convenient point to schedule this.
+ * slot can need them.
  *
- * NB: We run this during checkpoints even if logical decoding is disabled so
- * we cleanup old slots at some point after it got disabled.
+ * NB: We run this even if logical decoding is disabled so we cleanup old slots
+ * at some point after it got disabled.
  */
 void
-CheckPointSnapBuild(void)
+RemoveOldSerializedSnapshots(void)
 {
 	XLogRecPtr	cutoff;
 	XLogRecPtr	redo;
diff --git a/src/include/postmaster/custodian.h b/src/include/postmaster/custodian.h
index 80890ceadd..37334941cc 100644
--- a/src/include/postmaster/custodian.h
+++ b/src/include/postmaster/custodian.h
@@ -19,6 +19,7 @@
 typedef enum CustodianTask
 {
 	CUSTODIAN_REMOVE_TEMP_FILES,
+	CUSTODIAN_REMOVE_SERIALIZED_SNAPSHOTS,
 
 	NUM_CUSTODIAN_TASKS,			/* new tasks go above */
 	INVALID_CUSTODIAN_TASK
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index 2a697e57c3..9eba403e0c 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -57,7 +57,7 @@ struct ReorderBuffer;
 struct xl_heap_new_cid;
 struct xl_running_xacts;
 
-extern void CheckPointSnapBuild(void);
+extern void RemoveOldSerializedSnapshots(void);
 
 extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *reorder,
 										  TransactionId xmin_horizon, XLogRecPtr start_lsn,
-- 
2.25.1

>From 1490ce000e87bdca7edcb9e3e952a04ffdea8335 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <bossa...@amazon.com>
Date: Sun, 12 Dec 2021 22:07:11 -0800
Subject: [PATCH v13 6/6] Move removal of old logical rewrite mapping files to
 custodian.

If there are many such files to remove, checkpoints can take much
longer.  To avoid this, move this work to the newly-introduced
custodian process.

Since the mapping files include 32-bit transaction IDs, there is a
risk of wraparound if the files are not cleaned up fast enough.
Removing these files in checkpoints offered decent wraparound
protection simply due to the relatively high frequency of
checkpointing.  With this change, servers should still clean up
mappings files with decently high frequency, but in theory the
wraparound risk might worsen for some (e.g., if the custodian is
spending a lot of time on a different task).  Given this is an
existing problem, this change makes no effort to handle the
wraparound risk, and it is left as a future exercise.
---
 src/backend/access/heap/rewriteheap.c | 80 +++++++++++++++++++++++----
 src/backend/postmaster/custodian.c    | 43 ++++++++++++++
 src/include/access/rewriteheap.h      |  1 +
 src/include/postmaster/custodian.h    |  4 ++
 4 files changed, 118 insertions(+), 10 deletions(-)

diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c
index 2fe9e48e50..07976504cc 100644
--- a/src/backend/access/heap/rewriteheap.c
+++ b/src/backend/access/heap/rewriteheap.c
@@ -116,6 +116,7 @@
 #include "lib/ilist.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "postmaster/custodian.h"
 #include "replication/logical.h"
 #include "replication/slot.h"
 #include "storage/bufmgr.h"
@@ -123,6 +124,7 @@
 #include "storage/procarray.h"
 #include "storage/smgr.h"
 #include "utils/memutils.h"
+#include "utils/pg_lsn.h"
 #include "utils/rel.h"
 
 /*
@@ -1179,7 +1181,8 @@ heap_xlog_logical_rewrite(XLogReaderState *r)
  * Perform a checkpoint for logical rewrite mappings
  *
  * This serves two tasks:
- * 1) Remove all mappings not needed anymore based on the logical restart LSN
+ * 1) Alert the custodian to remove all mappings not needed anymore based on the
+ *    logical restart LSN
  * 2) Flush all remaining mappings to disk, so that replay after a checkpoint
  *	  only has to deal with the parts of a mapping that have been written out
  *	  after the checkpoint started.
@@ -1207,6 +1210,11 @@ CheckPointLogicalRewriteHeap(void)
 	if (cutoff != InvalidXLogRecPtr && redo < cutoff)
 		cutoff = redo;
 
+	/* let the custodian know what it can remove */
+	RequestCustodian(CUSTODIAN_REMOVE_REWRITE_MAPPINGS,
+					 !IsUnderPostmaster,
+					 LSNGetDatum(cutoff));
+
 	mappings_dir = AllocateDir("pg_logical/mappings");
 	while ((mapping_de = ReadDir(mappings_dir, "pg_logical/mappings")) != NULL)
 	{
@@ -1239,15 +1247,7 @@ CheckPointLogicalRewriteHeap(void)
 
 		lsn = ((uint64) hi) << 32 | lo;
 
-		if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
-		{
-			elog(DEBUG1, "removing logical rewrite file \"%s\"", path);
-			if (unlink(path) < 0)
-				ereport(ERROR,
-						(errcode_for_file_access(),
-						 errmsg("could not remove file \"%s\": %m", path)));
-		}
-		else
+		if (lsn >= cutoff && cutoff != InvalidXLogRecPtr)
 		{
 			/* on some operating systems fsyncing a file requires O_RDWR */
 			int			fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
@@ -1285,3 +1285,63 @@ CheckPointLogicalRewriteHeap(void)
 	/* persist directory entries to disk */
 	fsync_fname("pg_logical/mappings", true);
 }
+
+/*
+ * Remove all mappings not needed anymore based on the logical restart LSN saved
+ * by the checkpointer.  We use this saved value instead of calling
+ * ReplicationSlotsComputeLogicalRestartLSN() so that we don't try to remove
+ * files that a concurrent call to CheckPointLogicalRewriteHeap() is trying to
+ * flush to disk.
+ */
+void
+RemoveOldLogicalRewriteMappings(void)
+{
+	XLogRecPtr	cutoff;
+	DIR		   *mappings_dir;
+	struct dirent *mapping_de;
+	char		path[MAXPGPATH + 20];
+
+	cutoff = CustodianGetLogicalRewriteCutoff();
+
+	mappings_dir = AllocateDir("pg_logical/mappings");
+	while ((mapping_de = ReadDir(mappings_dir, "pg_logical/mappings")) != NULL)
+	{
+		Oid			dboid;
+		Oid			relid;
+		XLogRecPtr	lsn;
+		TransactionId rewrite_xid;
+		TransactionId create_xid;
+		uint32		hi,
+					lo;
+		PGFileType	de_type;
+
+		if (strcmp(mapping_de->d_name, ".") == 0 ||
+			strcmp(mapping_de->d_name, "..") == 0)
+			continue;
+
+		snprintf(path, sizeof(path), "pg_logical/mappings/%s", mapping_de->d_name);
+		de_type = get_dirent_type(path, mapping_de, false, DEBUG1);
+
+		if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_REG)
+			continue;
+
+		/* Skip over files that cannot be ours. */
+		if (strncmp(mapping_de->d_name, "map-", 4) != 0)
+			continue;
+
+		if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
+				   &dboid, &relid, &hi, &lo, &rewrite_xid, &create_xid) != 6)
+			elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
+
+		lsn = ((uint64) hi) << 32 | lo;
+		if (lsn >= cutoff && cutoff != InvalidXLogRecPtr)
+			continue;
+
+		elog(DEBUG1, "removing logical rewrite file \"%s\"", path);
+		if (unlink(path) < 0)
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not remove file \"%s\": %m", path)));
+	}
+	FreeDir(mappings_dir);
+}
diff --git a/src/backend/postmaster/custodian.c b/src/backend/postmaster/custodian.c
index 855a756ca0..d4be19e5de 100644
--- a/src/backend/postmaster/custodian.c
+++ b/src/backend/postmaster/custodian.c
@@ -21,6 +21,7 @@
  */
 #include "postgres.h"
 
+#include "access/rewriteheap.h"
 #include "libpq/pqsignal.h"
 #include "pgstat.h"
 #include "postmaster/custodian.h"
@@ -33,11 +34,13 @@
 #include "storage/procsignal.h"
 #include "storage/smgr.h"
 #include "utils/memutils.h"
+#include "utils/pg_lsn.h"
 
 static void DoCustodianTasks(bool retry);
 static CustodianTask CustodianGetNextTask(void);
 static void CustodianEnqueueTask(CustodianTask task);
 static const struct cust_task_funcs_entry *LookupCustodianFunctions(CustodianTask task);
+static void CustodianSetLogicalRewriteCutoff(Datum arg);
 
 typedef struct
 {
@@ -45,6 +48,8 @@ typedef struct
 
 	CustodianTask task_queue_elems[NUM_CUSTODIAN_TASKS];
 	int			task_queue_head;
+
+	XLogRecPtr  logical_rewrite_mappings_cutoff;    /* can remove older mappings */
 } CustodianShmemStruct;
 
 static CustodianShmemStruct *CustodianShmem;
@@ -73,6 +78,7 @@ struct cust_task_funcs_entry
 static const struct cust_task_funcs_entry cust_task_functions[] = {
 	{CUSTODIAN_REMOVE_TEMP_FILES, RemovePgTempFiles, NULL},
 	{CUSTODIAN_REMOVE_SERIALIZED_SNAPSHOTS, RemoveOldSerializedSnapshots, NULL},
+	{CUSTODIAN_REMOVE_REWRITE_MAPPINGS, RemoveOldLogicalRewriteMappings, CustodianSetLogicalRewriteCutoff},
 	{INVALID_CUSTODIAN_TASK, NULL, NULL}	/* must be last */
 };
 
@@ -384,3 +390,40 @@ LookupCustodianFunctions(CustodianTask task)
 	elog(ERROR, "could not lookup functions for custodian task %d", task);
 	pg_unreachable();
 }
+
+/*
+ * Stores the provided cutoff LSN in the custodian's shared memory.
+ *
+ * It's okay if the cutoff LSN is updated before a previously set cutoff has
+ * been used for cleaning up files.  If that happens, it just means that the
+ * next invocation of RemoveOldLogicalRewriteMappings() will use a more accurate
+ * cutoff.
+ */
+static void
+CustodianSetLogicalRewriteCutoff(Datum arg)
+{
+	SpinLockAcquire(&CustodianShmem->cust_lck);
+	CustodianShmem->logical_rewrite_mappings_cutoff = DatumGetLSN(arg);
+	SpinLockRelease(&CustodianShmem->cust_lck);
+
+	/* if pass-by-ref, free Datum memory */
+#ifndef USE_FLOAT8_BYVAL
+	pfree(DatumGetPointer(arg));
+#endif
+}
+
+/*
+ * Used by the custodian to determine which logical rewrite mapping files it can
+ * remove.
+ */
+XLogRecPtr
+CustodianGetLogicalRewriteCutoff(void)
+{
+	XLogRecPtr  cutoff;
+
+	SpinLockAcquire(&CustodianShmem->cust_lck);
+	cutoff = CustodianShmem->logical_rewrite_mappings_cutoff;
+	SpinLockRelease(&CustodianShmem->cust_lck);
+
+	return cutoff;
+}
diff --git a/src/include/access/rewriteheap.h b/src/include/access/rewriteheap.h
index 5cc04756a5..bc875330d7 100644
--- a/src/include/access/rewriteheap.h
+++ b/src/include/access/rewriteheap.h
@@ -53,5 +53,6 @@ typedef struct LogicalRewriteMappingData
  */
 #define LOGICAL_REWRITE_FORMAT "map-%x-%x-%X_%X-%x-%x"
 extern void CheckPointLogicalRewriteHeap(void);
+extern void RemoveOldLogicalRewriteMappings(void);
 
 #endif							/* REWRITE_HEAP_H */
diff --git a/src/include/postmaster/custodian.h b/src/include/postmaster/custodian.h
index 37334941cc..f177d55159 100644
--- a/src/include/postmaster/custodian.h
+++ b/src/include/postmaster/custodian.h
@@ -12,6 +12,8 @@
 #ifndef _CUSTODIAN_H
 #define _CUSTODIAN_H
 
+#include "access/xlogdefs.h"
+
 /*
  * If you add a new task here, be sure to add its corresponding function
  * pointers to cust_task_functions in custodian.c.
@@ -20,6 +22,7 @@ typedef enum CustodianTask
 {
 	CUSTODIAN_REMOVE_TEMP_FILES,
 	CUSTODIAN_REMOVE_SERIALIZED_SNAPSHOTS,
+	CUSTODIAN_REMOVE_REWRITE_MAPPINGS,
 
 	NUM_CUSTODIAN_TASKS,			/* new tasks go above */
 	INVALID_CUSTODIAN_TASK
@@ -29,5 +32,6 @@ extern void CustodianMain(void) pg_attribute_noreturn();
 extern Size CustodianShmemSize(void);
 extern void CustodianShmemInit(void);
 extern void RequestCustodian(CustodianTask task, bool immediate, Datum arg);
+extern XLogRecPtr CustodianGetLogicalRewriteCutoff(void);
 
 #endif						/* _CUSTODIAN_H */
-- 
2.25.1

Reply via email to