I think there are some good ideas here.  I started to take a look at the
patches, and I've attached a rebased version of the patch set.  Apologies
if I am repeating any discussions from upthread.

First, I tested the time difference in ALTER TABLE SET UNLOGGED/LOGGED with
the patch applied, and the results looked pretty impressive.

        before:
        postgres=# alter table test set unlogged;
        ALTER TABLE
        Time: 5108.071 ms (00:05.108)
        postgres=# alter table test set logged;
        ALTER TABLE
        Time: 6747.648 ms (00:06.748)

        after:
        postgres=# alter table test set unlogged;
        ALTER TABLE
        Time: 25.609 ms
        postgres=# alter table test set logged;
        ALTER TABLE
        Time: 1241.800 ms (00:01.242)

My first question is whether 0001 is a prerequisite to 0002.  I'm assuming
it is, but the reason wasn't immediately obvious to me.  If it's just
nice-to-have, perhaps we could simplify the patch set a bit.  I see that
Heikki had some general concerns with the marker file approach [0], so
perhaps it is at least worth brainstorming some alternatives if we _do_
need it.

[0] https://postgr.es/m/9827ebd3-de2e-fd52-4091-a568387b1fc2%40iki.fi

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From 5a4fb063a8b5e8a731373c4d06e51ad7fbeebebd Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyota....@gmail.com>
Date: Thu, 2 Mar 2023 17:25:12 +0900
Subject: [PATCH v29 1/3] Introduce storage mark files

In specific scenarios, certain operations followed by a crash-restart
may generate orphaned storage files that cannot be removed through
standard procedures or cause the server to fail during restart. This
commit introduces 'mark files' to convey information about the storage
file. In particular, an "UNCOMMITTED" mark file is implemented to
identify uncommitted files for removal during the subsequent startup.
---
 src/backend/access/rmgrdesc/smgrdesc.c    |  37 +++
 src/backend/access/transam/README         |  10 +
 src/backend/access/transam/xact.c         |   7 +
 src/backend/access/transam/xlogrecovery.c |  18 ++
 src/backend/backup/basebackup.c           |   9 +-
 src/backend/catalog/storage.c             | 268 +++++++++++++++++++-
 src/backend/storage/file/fd.c             |   4 +-
 src/backend/storage/file/reinit.c         | 287 +++++++++++++++-------
 src/backend/storage/smgr/md.c             |  94 ++++++-
 src/backend/storage/smgr/smgr.c           |  32 +++
 src/backend/storage/sync/sync.c           |  26 +-
 src/bin/pg_rewind/parsexlog.c             |  16 ++
 src/common/relpath.c                      |  47 ++--
 src/include/catalog/storage.h             |   3 +
 src/include/catalog/storage_xlog.h        |  35 ++-
 src/include/common/relpath.h              |   9 +-
 src/include/storage/fd.h                  |   1 +
 src/include/storage/md.h                  |   8 +-
 src/include/storage/reinit.h              |   8 +-
 src/include/storage/smgr.h                |  17 ++
 src/test/recovery/t/013_crash_restart.pl  |  21 ++
 src/tools/pgindent/typedefs.list          |   6 +
 22 files changed, 834 insertions(+), 129 deletions(-)

diff --git a/src/backend/access/rmgrdesc/smgrdesc.c b/src/backend/access/rmgrdesc/smgrdesc.c
index bd841b96e8..f8187385c4 100644
--- a/src/backend/access/rmgrdesc/smgrdesc.c
+++ b/src/backend/access/rmgrdesc/smgrdesc.c
@@ -40,6 +40,37 @@ smgr_desc(StringInfo buf, XLogReaderState *record)
 						 xlrec->blkno, xlrec->flags);
 		pfree(path);
 	}
+	else if (info == XLOG_SMGR_UNLINK)
+	{
+		xl_smgr_unlink *xlrec = (xl_smgr_unlink *) rec;
+		char	   *path = relpathperm(xlrec->rlocator, xlrec->forkNum);
+
+		appendStringInfoString(buf, path);
+		pfree(path);
+	}
+	else if (info == XLOG_SMGR_MARK)
+	{
+		xl_smgr_mark *xlrec = (xl_smgr_mark *) rec;
+		char	   *path = GetRelationPath(xlrec->rlocator.dbOid,
+										   xlrec->rlocator.spcOid,
+										   xlrec->rlocator.relNumber,
+										   InvalidBackendId,
+										   xlrec->forkNum, xlrec->mark);
+		char	   *action = "<none>";
+
+		switch (xlrec->action)
+		{
+			case XLOG_SMGR_MARK_CREATE:
+				action = "CREATE";
+				break;
+			case XLOG_SMGR_MARK_UNLINK:
+				action = "DELETE";
+				break;
+		}
+
+		appendStringInfo(buf, "%s %s", action, path);
+		pfree(path);
+	}
 }
 
 const char *
@@ -55,6 +86,12 @@ smgr_identify(uint8 info)
 		case XLOG_SMGR_TRUNCATE:
 			id = "TRUNCATE";
 			break;
+		case XLOG_SMGR_UNLINK:
+			id = "UNLINK";
+			break;
+		case XLOG_SMGR_MARK:
+			id = "MARK";
+			break;
 	}
 
 	return id;
diff --git a/src/backend/access/transam/README b/src/backend/access/transam/README
index 22c8ae9755..e10f6af0e3 100644
--- a/src/backend/access/transam/README
+++ b/src/backend/access/transam/README
@@ -741,6 +741,16 @@ we must panic and abort recovery.  The DBA will have to manually clean up
 then restart recovery.  This is part of the reason for not writing a WAL
 entry until we've successfully done the original action.
 
+================================
+Smgr MARK files
+--------------------------------
+
+A storage manger (smgr) mark file is an empty file created alongside a
+new relation storage file, indicating that the storage file requires
+cleanup during the recovery process.  Unlike the previous four actions
+mentioned, failure to remove these marker files may lead to data loss,
+causing the server to shut down.
+
 
 Skipping WAL for New RelFileLocator
 --------------------------------
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 8daaa535ed..2b0af2a938 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -2224,6 +2224,9 @@ CommitTransaction(void)
 	 */
 	smgrDoPendingSyncs(true, is_parallel_worker);
 
+	/* Likewise delete mark files for files created during this transaction. */
+	smgrDoPendingCleanups(true);
+
 	/* close large objects before lower-level cleanup */
 	AtEOXact_LargeObject(true);
 
@@ -2475,6 +2478,9 @@ PrepareTransaction(void)
 	 */
 	smgrDoPendingSyncs(true, false);
 
+	/* Likewise delete mark files for files created during this transaction. */
+	smgrDoPendingCleanups(true);
+
 	/* close large objects before lower-level cleanup */
 	AtEOXact_LargeObject(true);
 
@@ -2799,6 +2805,7 @@ AbortTransaction(void)
 	AfterTriggerEndXact(false); /* 'false' means it's abort */
 	AtAbort_Portals();
 	smgrDoPendingSyncs(false, is_parallel_worker);
+	smgrDoPendingCleanups(false);
 	AtEOXact_LargeObject(false);
 	AtAbort_Notify();
 	AtEOXact_RelationMap(false, is_parallel_worker);
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index becc2bda62..8e09936993 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -42,6 +42,7 @@
 #include "access/xlogutils.h"
 #include "backup/basebackup.h"
 #include "catalog/pg_control.h"
+#include "catalog/storage.h"
 #include "commands/tablespace.h"
 #include "common/file_utils.h"
 #include "miscadmin.h"
@@ -56,6 +57,7 @@
 #include "storage/pmsignal.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
+#include "storage/reinit.h"
 #include "storage/spin.h"
 #include "utils/builtins.h"
 #include "utils/datetime.h"
@@ -1795,6 +1797,14 @@ PerformWalRecovery(void)
 
 		RmgrCleanup();
 
+		/* cleanup garbage files left during crash recovery */
+		if (!InArchiveRecovery)
+			ResetUnloggedRelations(UNLOGGED_RELATION_DROP_BUFFER |
+								   UNLOGGED_RELATION_CLEANUP);
+
+		/* run rollback cleanup if any */
+		smgrDoPendingDeletes(false);
+
 		ereport(LOG,
 				(errmsg("redo done at %X/%X system usage: %s",
 						LSN_FORMAT_ARGS(xlogreader->ReadRecPtr),
@@ -3153,6 +3163,14 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
 			{
 				ereport(DEBUG1,
 						(errmsg_internal("reached end of WAL in pg_wal, entering archive recovery")));
+
+				/* cleanup garbage files left during crash recovery */
+				ResetUnloggedRelations(UNLOGGED_RELATION_DROP_BUFFER |
+									   UNLOGGED_RELATION_CLEANUP);
+
+				/* run rollback cleanup if any */
+				smgrDoPendingDeletes(false);
+
 				InArchiveRecovery = true;
 				if (StandbyModeRequested)
 					EnableStandbyMode();
diff --git a/src/backend/backup/basebackup.c b/src/backend/backup/basebackup.c
index 45be21131c..9af0982fe1 100644
--- a/src/backend/backup/basebackup.c
+++ b/src/backend/backup/basebackup.c
@@ -1191,6 +1191,7 @@ sendDir(bbsink *sink, const char *path, int basepathlen, bool sizeonly,
 		ForkNumber	relForkNum; /* Type of fork if file is a relation */
 		int			relnumchars;	/* Chars in filename that are the
 									 * relnumber */
+		StorageMarks mark;		/* marker file sign */
 
 		/* Skip special stuff */
 		if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0)
@@ -1241,7 +1242,7 @@ sendDir(bbsink *sink, const char *path, int basepathlen, bool sizeonly,
 		/* Exclude all forks for unlogged tables except the init fork */
 		if (isDbDir &&
 			parse_filename_for_nontemp_relation(de->d_name, &relnumchars,
-												&relForkNum))
+												&relForkNum, &mark))
 		{
 			/* Never exclude init forks */
 			if (relForkNum != INIT_FORKNUM)
@@ -1448,6 +1449,7 @@ is_checksummed_file(const char *fullpath, const char *filename)
 		strncmp(fullpath, "/", 1) == 0)
 	{
 		int			excludeIdx;
+		char	   *p;
 
 		/* Compare file against noChecksumFiles skip list */
 		for (excludeIdx = 0; noChecksumFiles[excludeIdx].name != NULL; excludeIdx++)
@@ -1461,6 +1463,11 @@ is_checksummed_file(const char *fullpath, const char *filename)
 				return false;
 		}
 
+		/* exclude mark files */
+		p = strchr(filename, '.');
+		if (p && isalpha(p[1]) && p[2] == 0)
+			return false;
+
 		return true;
 	}
 	else
diff --git a/src/backend/catalog/storage.c b/src/backend/catalog/storage.c
index 2add053489..fe06c3c31d 100644
--- a/src/backend/catalog/storage.c
+++ b/src/backend/catalog/storage.c
@@ -19,6 +19,7 @@
 
 #include "postgres.h"
 
+#include "access/amapi.h"
 #include "access/parallel.h"
 #include "access/visibilitymap.h"
 #include "access/xact.h"
@@ -66,6 +67,21 @@ typedef struct PendingRelDelete
 	struct PendingRelDelete *next;	/* linked-list link */
 } PendingRelDelete;
 
+#define	PCOP_UNLINK_FORK		(1 << 0)
+#define	PCOP_UNLINK_MARK		(1 << 1)
+
+typedef struct PendingCleanup
+{
+	RelFileLocator rlocator;	/* relation that need a cleanup */
+	int			op;				/* operation mask */
+	ForkNumber	unlink_forknum; /* forknum to unlink */
+	StorageMarks unlink_mark;	/* mark to unlink */
+	BackendId	backend;		/* InvalidBackendId if not a temp rel */
+	bool		atCommit;		/* T=delete at commit; F=delete at abort */
+	int			nestLevel;		/* xact nesting level of request */
+	struct PendingCleanup *next;	/* linked-list link */
+}			PendingCleanup;
+
 typedef struct PendingRelSync
 {
 	RelFileLocator rlocator;
@@ -73,6 +89,7 @@ typedef struct PendingRelSync
 } PendingRelSync;
 
 static PendingRelDelete *pendingDeletes = NULL; /* head of linked list */
+static PendingCleanup * pendingCleanups = NULL; /* head of linked list */
 static HTAB *pendingSyncHash = NULL;
 
 
@@ -123,6 +140,7 @@ RelationCreateStorage(RelFileLocator rlocator, char relpersistence,
 	SMgrRelation srel;
 	BackendId	backend;
 	bool		needs_wal;
+	PendingCleanup *pendingclean;
 
 	Assert(!IsInParallelMode());	/* couldn't update pendingSyncHash */
 
@@ -145,7 +163,21 @@ RelationCreateStorage(RelFileLocator rlocator, char relpersistence,
 			return NULL;		/* placate compiler */
 	}
 
+	/*
+	 * We are going to create a new storage file. If server crashes before the
+	 * current transaction ends the file needs to be cleaned up. The
+	 * SMGR_MARK_UNCOMMITED mark file prompts that work at the next startup.
+	 * We don't need this during WAL-loggged CREATE DATABASE. See
+	 * CreateAndCopyRelationData for detail.
+	 */
 	srel = smgropen(rlocator, backend);
+
+	if (register_delete)
+	{
+		log_smgrcreatemark(&rlocator, MAIN_FORKNUM, SMGR_MARK_UNCOMMITTED);
+		smgrcreatemark(srel, MAIN_FORKNUM, SMGR_MARK_UNCOMMITTED, false);
+	}
+
 	smgrcreate(srel, MAIN_FORKNUM, false);
 
 	if (needs_wal)
@@ -157,16 +189,29 @@ RelationCreateStorage(RelFileLocator rlocator, char relpersistence,
 	 */
 	if (register_delete)
 	{
-		PendingRelDelete *pending;
+		PendingRelDelete *pendingdel;
 
-		pending = (PendingRelDelete *)
+		pendingdel = (PendingRelDelete *)
 			MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
-		pending->rlocator = rlocator;
-		pending->backend = backend;
-		pending->atCommit = false;	/* delete if abort */
-		pending->nestLevel = GetCurrentTransactionNestLevel();
-		pending->next = pendingDeletes;
-		pendingDeletes = pending;
+		pendingdel->rlocator = rlocator;
+		pendingdel->backend = backend;
+		pendingdel->atCommit = false;	/* delete if abort */
+		pendingdel->nestLevel = GetCurrentTransactionNestLevel();
+		pendingdel->next = pendingDeletes;
+		pendingDeletes = pendingdel;
+
+		/* drop mark files at commit */
+		pendingclean = (PendingCleanup *)
+			MemoryContextAlloc(TopMemoryContext, sizeof(PendingCleanup));
+		pendingclean->rlocator = rlocator;
+		pendingclean->op = PCOP_UNLINK_MARK;
+		pendingclean->unlink_forknum = MAIN_FORKNUM;
+		pendingclean->unlink_mark = SMGR_MARK_UNCOMMITTED;
+		pendingclean->backend = backend;
+		pendingclean->atCommit = true;
+		pendingclean->nestLevel = GetCurrentTransactionNestLevel();
+		pendingclean->next = pendingCleanups;
+		pendingCleanups = pendingclean;
 	}
 
 	if (relpersistence == RELPERSISTENCE_PERMANENT && !XLogIsNeeded())
@@ -197,6 +242,69 @@ log_smgrcreate(const RelFileLocator *rlocator, ForkNumber forkNum)
 	XLogInsert(RM_SMGR_ID, XLOG_SMGR_CREATE | XLR_SPECIAL_REL_UPDATE);
 }
 
+/*
+ * Perform XLogInsert of an XLOG_SMGR_UNLINK record to WAL.
+ */
+void
+log_smgrunlink(const RelFileLocator *rlocator, ForkNumber forkNum)
+{
+	xl_smgr_unlink xlrec;
+
+	/*
+	 * Make an XLOG entry reporting the file unlink.
+	 */
+	xlrec.rlocator = *rlocator;
+	xlrec.forkNum = forkNum;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+	XLogInsert(RM_SMGR_ID, XLOG_SMGR_UNLINK | XLR_SPECIAL_REL_UPDATE);
+}
+
+/*
+ * Perform XLogInsert of an XLOG_SMGR_CREATEMARK record to WAL.
+ */
+void
+log_smgrcreatemark(const RelFileLocator *rlocator, ForkNumber forkNum,
+				   StorageMarks mark)
+{
+	xl_smgr_mark xlrec;
+
+	/*
+	 * Make an XLOG entry reporting the file creation.
+	 */
+	xlrec.rlocator = *rlocator;
+	xlrec.forkNum = forkNum;
+	xlrec.mark = mark;
+	xlrec.action = XLOG_SMGR_MARK_CREATE;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+	XLogInsert(RM_SMGR_ID, XLOG_SMGR_MARK | XLR_SPECIAL_REL_UPDATE);
+}
+
+/*
+ * Perform XLogInsert of an XLOG_SMGR_UNLINKMARK record to WAL.
+ */
+void
+log_smgrunlinkmark(const RelFileLocator *rlocator, ForkNumber forkNum,
+				   StorageMarks mark)
+{
+	xl_smgr_mark xlrec;
+
+	/*
+	 * Make an XLOG entry reporting the file creation.
+	 */
+	xlrec.rlocator = *rlocator;
+	xlrec.forkNum = forkNum;
+	xlrec.mark = mark;
+	xlrec.action = XLOG_SMGR_MARK_UNLINK;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+	XLogInsert(RM_SMGR_ID, XLOG_SMGR_MARK | XLR_SPECIAL_REL_UPDATE);
+}
+
 /*
  * RelationDropStorage
  *		Schedule unlinking of physical storage at transaction commit.
@@ -711,6 +819,76 @@ smgrDoPendingDeletes(bool isCommit)
 	}
 }
 
+/*
+ *	smgrDoPendingUnmark() -- Clean up work that emits WAL records
+ *
+ *  The operations handled in the function emits WAL records, which must be
+ *  part of the current transaction.
+ */
+void
+smgrDoPendingCleanups(bool isCommit)
+{
+	int			nestLevel = GetCurrentTransactionNestLevel();
+	PendingCleanup *pending;
+	PendingCleanup *prev;
+	PendingCleanup *next;
+
+	prev = NULL;
+	for (pending = pendingCleanups; pending != NULL; pending = next)
+	{
+		next = pending->next;
+		if (pending->nestLevel < nestLevel)
+		{
+			/* outer-level entries should not be processed yet */
+			prev = pending;
+		}
+		else
+		{
+			/* unlink list entry first, so we don't retry on failure */
+			if (prev)
+				prev->next = next;
+			else
+				pendingCleanups = next;
+
+			/* do cleanup if called for */
+			if (pending->atCommit == isCommit)
+			{
+				SMgrRelation srel;
+
+				srel = smgropen(pending->rlocator, pending->backend);
+
+				Assert((pending->op &
+						~(PCOP_UNLINK_FORK | PCOP_UNLINK_MARK)) == 0);
+
+				if (pending->op & PCOP_UNLINK_FORK)
+				{
+					/* Don't emit wal while recovery. */
+					if (!InRecovery)
+						log_smgrunlink(&pending->rlocator,
+									   pending->unlink_forknum);
+					smgrunlink(srel, pending->unlink_forknum, false);
+				}
+
+				if (pending->op & PCOP_UNLINK_MARK)
+				{
+					if (!InRecovery)
+						log_smgrunlinkmark(&pending->rlocator,
+										   pending->unlink_forknum,
+										   pending->unlink_mark);
+
+					smgrunlinkmark(srel, pending->unlink_forknum,
+								   pending->unlink_mark, InRecovery);
+					smgrclose(srel);
+				}
+			}
+
+			/* must explicitly free the list entry */
+			pfree(pending);
+			/* prev does not change */
+		}
+	}
+}
+
 /*
  *	smgrDoPendingSyncs() -- Take care of relation syncs at end of xact.
  */
@@ -971,6 +1149,15 @@ smgr_redo(XLogReaderState *record)
 		reln = smgropen(xlrec->rlocator, InvalidBackendId);
 		smgrcreate(reln, xlrec->forkNum, true);
 	}
+	else if (info == XLOG_SMGR_UNLINK)
+	{
+		xl_smgr_unlink *xlrec = (xl_smgr_unlink *) XLogRecGetData(record);
+		SMgrRelation reln;
+
+		reln = smgropen(xlrec->rlocator, InvalidBackendId);
+		smgrunlink(reln, xlrec->forkNum, true);
+		smgrclose(reln);
+	}
 	else if (info == XLOG_SMGR_TRUNCATE)
 	{
 		xl_smgr_truncate *xlrec = (xl_smgr_truncate *) XLogRecGetData(record);
@@ -1059,6 +1246,71 @@ smgr_redo(XLogReaderState *record)
 
 		FreeFakeRelcacheEntry(rel);
 	}
+	else if (info == XLOG_SMGR_MARK)
+	{
+		xl_smgr_mark *xlrec = (xl_smgr_mark *) XLogRecGetData(record);
+		SMgrRelation reln;
+		PendingCleanup *pending;
+		bool		created = false;
+
+		reln = smgropen(xlrec->rlocator, InvalidBackendId);
+
+		switch (xlrec->action)
+		{
+			case XLOG_SMGR_MARK_CREATE:
+				smgrcreatemark(reln, xlrec->forkNum, xlrec->mark, true);
+				created = true;
+				break;
+			case XLOG_SMGR_MARK_UNLINK:
+				smgrunlinkmark(reln, xlrec->forkNum, xlrec->mark, true);
+				break;
+			default:
+				elog(ERROR, "unknown smgr_mark action \"%c\"", xlrec->mark);
+		}
+
+		if (created)
+		{
+			/* revert mark file operation at abort */
+			pending = (PendingCleanup *)
+				MemoryContextAlloc(TopMemoryContext, sizeof(PendingCleanup));
+			pending->rlocator = xlrec->rlocator;
+			pending->op = PCOP_UNLINK_MARK;
+			pending->unlink_forknum = xlrec->forkNum;
+			pending->unlink_mark = xlrec->mark;
+			pending->backend = InvalidBackendId;
+			pending->atCommit = false;
+			pending->nestLevel = GetCurrentTransactionNestLevel();
+			pending->next = pendingCleanups;
+			pendingCleanups = pending;
+		}
+		else
+		{
+			/*
+			 * Delete pending action for this mark file if any. We should have
+			 * at most one entry for this action.
+			 */
+			PendingCleanup *prev = NULL;
+
+			for (pending = pendingCleanups; pending != NULL;
+				 pending = pending->next)
+			{
+				if (RelFileLocatorEquals(xlrec->rlocator, pending->rlocator) &&
+					pending->unlink_forknum == xlrec->forkNum &&
+					(pending->op & PCOP_UNLINK_MARK) != 0)
+				{
+					if (prev)
+						prev->next = pending->next;
+					else
+						pendingCleanups = pending->next;
+
+					pfree(pending);
+					break;
+				}
+
+				prev = pending;
+			}
+		}
+	}
 	else
 		elog(PANIC, "smgr_redo: unknown op code %u", info);
 }
diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c
index a027a8aabc..a841c2eab8 100644
--- a/src/backend/storage/file/fd.c
+++ b/src/backend/storage/file/fd.c
@@ -351,8 +351,6 @@ static void pre_sync_fname(const char *fname, bool isdir, int elevel);
 static void datadir_fsync_fname(const char *fname, bool isdir, int elevel);
 static void unlink_if_exists_fname(const char *fname, bool isdir, int elevel);
 
-static int	fsync_parent_path(const char *fname, int elevel);
-
 
 /*
  * pg_fsync --- do fsync with or without writethrough
@@ -3814,7 +3812,7 @@ fsync_fname_ext(const char *fname, bool isdir, bool ignore_perm, int elevel)
  * This is aimed at making file operations persistent on disk in case of
  * an OS crash or power failure.
  */
-static int
+int
 fsync_parent_path(const char *fname, int elevel)
 {
 	char		parentpath[MAXPGPATH];
diff --git a/src/backend/storage/file/reinit.c b/src/backend/storage/file/reinit.c
index fb55371b1b..e84fcbf884 100644
--- a/src/backend/storage/file/reinit.c
+++ b/src/backend/storage/file/reinit.c
@@ -16,29 +16,45 @@
 
 #include <unistd.h>
 
+#include "access/xlogrecovery.h"
+#include "catalog/pg_tablespace_d.h"
 #include "common/relpath.h"
 #include "postmaster/startup.h"
+#include "storage/bufmgr.h"
 #include "storage/copydir.h"
 #include "storage/fd.h"
+#include "storage/md.h"
 #include "storage/reinit.h"
+#include "storage/smgr.h"
 #include "utils/hsearch.h"
 #include "utils/memutils.h"
 
 static void ResetUnloggedRelationsInTablespaceDir(const char *tsdirname,
-												  int op);
+												  Oid tspid, int op);
 static void ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname,
-											   int op);
+											   Oid tspid, Oid dbid, int op);
 
 typedef struct
 {
-	Oid			reloid;			/* hash key */
-} unlogged_relation_entry;
+	RelFileNumber	relNumber;		/* hash key */
+	bool			has_init;		/* has INIT fork */
+	bool			dirty_all;		/* needs to remove all forks */
+}  relfile_entry;
 
 /*
- * Reset unlogged relations from before the last restart.
+ * Clean up and reset relation files from before the last restart.
  *
- * If op includes UNLOGGED_RELATION_CLEANUP, we remove all forks of any
- * relation with an "init" fork, except for the "init" fork itself.
+ * If op includes UNLOGGED_RELATION_CLEANUP, we perform different operations
+ * depending on the existence of mark files.
+ *
+ * If SMGR_MARK_UNCOMMITTED mark file for main fork is present we remove the
+ * whole relation along with the mark file.
+ *
+ * Otherwise, if the "init" fork is found.  we remove all forks of any relation
+ * with the "init" fork, except for the "init" fork itself.
+ *
+ * If op includes UNLOGGED_RELATION_DROP_BUFFER, we drop all buffers for all
+ * relations that are to be cleaned up.
  *
  * If op includes UNLOGGED_RELATION_INIT, we copy the "init" fork to the main
  * fork.
@@ -72,7 +88,7 @@ ResetUnloggedRelations(int op)
 	/*
 	 * First process unlogged files in pg_default ($PGDATA/base)
 	 */
-	ResetUnloggedRelationsInTablespaceDir("base", op);
+	ResetUnloggedRelationsInTablespaceDir("base", DEFAULTTABLESPACE_OID, op);
 
 	/*
 	 * Cycle through directories for all non-default tablespaces.
@@ -81,13 +97,19 @@ ResetUnloggedRelations(int op)
 
 	while ((spc_de = ReadDir(spc_dir, "pg_tblspc")) != NULL)
 	{
+		Oid			tspid;
+
 		if (strcmp(spc_de->d_name, ".") == 0 ||
 			strcmp(spc_de->d_name, "..") == 0)
 			continue;
 
 		snprintf(temp_path, sizeof(temp_path), "pg_tblspc/%s/%s",
 				 spc_de->d_name, TABLESPACE_VERSION_DIRECTORY);
-		ResetUnloggedRelationsInTablespaceDir(temp_path, op);
+
+		tspid = atooid(spc_de->d_name);
+
+		Assert(tspid != 0);
+		ResetUnloggedRelationsInTablespaceDir(temp_path, tspid, op);
 	}
 
 	FreeDir(spc_dir);
@@ -103,7 +125,8 @@ ResetUnloggedRelations(int op)
  * Process one tablespace directory for ResetUnloggedRelations
  */
 static void
-ResetUnloggedRelationsInTablespaceDir(const char *tsdirname, int op)
+ResetUnloggedRelationsInTablespaceDir(const char *tsdirname,
+									  Oid tspid, int op)
 {
 	DIR		   *ts_dir;
 	struct dirent *de;
@@ -130,6 +153,8 @@ ResetUnloggedRelationsInTablespaceDir(const char *tsdirname, int op)
 
 	while ((de = ReadDir(ts_dir, tsdirname)) != NULL)
 	{
+		Oid			dbid;
+
 		/*
 		 * We're only interested in the per-database directories, which have
 		 * numeric names.  Note that this code will also (properly) ignore "."
@@ -148,7 +173,10 @@ ResetUnloggedRelationsInTablespaceDir(const char *tsdirname, int op)
 			ereport_startup_progress("resetting unlogged relations (cleanup), elapsed time: %ld.%02d s, current path: %s",
 									 dbspace_path);
 
-		ResetUnloggedRelationsInDbspaceDir(dbspace_path, op);
+		dbid = atooid(de->d_name);
+		Assert(dbid != 0);
+
+		ResetUnloggedRelationsInDbspaceDir(dbspace_path, tspid, dbid, op);
 	}
 
 	FreeDir(ts_dir);
@@ -158,125 +186,200 @@ ResetUnloggedRelationsInTablespaceDir(const char *tsdirname, int op)
  * Process one per-dbspace directory for ResetUnloggedRelations
  */
 static void
-ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
+ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname,
+								   Oid tspid, Oid dbid, int op)
 {
 	DIR		   *dbspace_dir;
 	struct dirent *de;
 	char		rm_path[MAXPGPATH * 2];
+	HTAB	   *hash;
+	HASHCTL		ctl;
 
 	/* Caller must specify at least one operation. */
-	Assert((op & (UNLOGGED_RELATION_CLEANUP | UNLOGGED_RELATION_INIT)) != 0);
+	Assert((op & (UNLOGGED_RELATION_CLEANUP |
+				  UNLOGGED_RELATION_DROP_BUFFER |
+				  UNLOGGED_RELATION_INIT)) != 0);
 
 	/*
 	 * Cleanup is a two-pass operation.  First, we go through and identify all
 	 * the files with init forks.  Then, we go through again and nuke
 	 * everything with the same OID except the init fork.
 	 */
-	if ((op & UNLOGGED_RELATION_CLEANUP) != 0)
+
+	/*
+	 * It's possible that someone could create tons of unlogged relations in
+	 * the same database & tablespace, so we'd better use a hash table rather
+	 * than an array or linked list to keep track of which files need to be
+	 * reset.  Otherwise, this cleanup operation would be O(n^2).
+	 */
+	memset(&ctl, 0, sizeof(ctl));
+	ctl.keysize = sizeof(RelFileNumber);
+	ctl.entrysize = sizeof(relfile_entry);
+	hash = hash_create("unlogged relation RelFileNumbers",
+					   32, &ctl, HASH_ELEM | HASH_BLOBS);
+
+	/* Collect INIT fork and mark files in the directory. */
+	dbspace_dir = AllocateDir(dbspacedirname);
+	while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
 	{
-		HTAB	   *hash;
-		HASHCTL		ctl;
+		ForkNumber	forkNum;
+		int			relnumchars;
+		StorageMarks mark;
 
-		/*
-		 * It's possible that someone could create a ton of unlogged relations
-		 * in the same database & tablespace, so we'd better use a hash table
-		 * rather than an array or linked list to keep track of which files
-		 * need to be reset.  Otherwise, this cleanup operation would be
-		 * O(n^2).
-		 */
-		ctl.keysize = sizeof(Oid);
-		ctl.entrysize = sizeof(unlogged_relation_entry);
-		ctl.hcxt = CurrentMemoryContext;
-		hash = hash_create("unlogged relation OIDs", 32, &ctl,
-						   HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+		/* Skip anything that doesn't look like a relation data file. */
+		if (!parse_filename_for_nontemp_relation(de->d_name, &relnumchars,
+												 &forkNum, &mark))
+			continue;
 
-		/* Scan the directory. */
-		dbspace_dir = AllocateDir(dbspacedirname);
-		while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
+		if (forkNum == INIT_FORKNUM || mark == SMGR_MARK_UNCOMMITTED)
 		{
-			ForkNumber	forkNum;
-			int			relnumchars;
-			unlogged_relation_entry ent;
-
-			/* Skip anything that doesn't look like a relation data file. */
-			if (!parse_filename_for_nontemp_relation(de->d_name, &relnumchars,
-													 &forkNum))
-				continue;
-
-			/* Also skip it unless this is the init fork. */
-			if (forkNum != INIT_FORKNUM)
-				continue;
+			RelFileNumber key;
+			relfile_entry *ent;
+			bool		found;
 
 			/*
-			 * Put the OID portion of the name into the hash table, if it
-			 * isn't already.
+			 * Put the OID portion of the name into the hash table,
+			 * if it isn't already.  If it has SMGR_MARK_UNCOMMITTED mark
+			 * files, the storage file is in dirty state, where clean up is
+			 * needed.
 			 */
-			ent.reloid = atooid(de->d_name);
-			(void) hash_search(hash, &ent, HASH_ENTER, NULL);
+			key = atooid(de->d_name);
+			ent = hash_search(hash, &key, HASH_ENTER, &found);
+
+			if (!found)
+			{
+				ent->has_init = false;
+				ent->dirty_all = false;
+			}
+
+			if (forkNum == MAIN_FORKNUM && mark == SMGR_MARK_UNCOMMITTED)
+				ent->dirty_all = true;
+			else
+			{
+				Assert(forkNum == INIT_FORKNUM);
+				ent->has_init = true;
+			}
 		}
+	}
 
-		/* Done with the first pass. */
-		FreeDir(dbspace_dir);
+	/* Done with the first pass. */
+	FreeDir(dbspace_dir);
+
+	/* nothing to do if we don't have init nor cleanup forks */
+	if (hash_get_num_entries(hash) < 1)
+	{
+		hash_destroy(hash);
+		return;
+	}
 
+	if ((op & UNLOGGED_RELATION_DROP_BUFFER) != 0)
+	{
 		/*
-		 * If we didn't find any init forks, there's no point in continuing;
-		 * we can bail out now.
+		 * When we come here after recovery, smgr object for this file might
+		 * have been created. In that case we need to drop all buffers then the
+		 * smgr object.  Otherwise checkpointer wrongly tries to flush buffers
+		 * for nonexistent relation storage. This is safe as far as no other
+		 * backends have accessed the relation before starting archive
+		 * recovery.
 		 */
-		if (hash_get_num_entries(hash) == 0)
+		HASH_SEQ_STATUS status;
+		relfile_entry *ent;
+		SMgrRelation *srels = palloc(sizeof(SMgrRelation) * 8);
+		int			maxrels = 8;
+		int			nrels = 0;
+		int			i;
+
+		Assert(!HotStandbyActive());
+
+		hash_seq_init(&status, hash);
+		while ((ent = (relfile_entry *) hash_seq_search(&status)) != NULL)
 		{
-			hash_destroy(hash);
-			return;
+			RelFileLocatorBackend rel;
+
+			if (maxrels <= nrels)
+			{
+				maxrels *= 2;
+				srels = repalloc(srels, sizeof(SMgrRelation) * maxrels);
+			}
+
+			rel.backend = InvalidBackendId;
+			rel.locator.spcOid = tspid;
+			rel.locator.dbOid = dbid;
+			rel.locator.relNumber = ent->relNumber;
+
+			srels[nrels++] = smgropen(rel.locator, InvalidBackendId);
 		}
 
-		/*
-		 * Now, make a second pass and remove anything that matches.
-		 */
+		DropRelationsAllBuffers(srels, nrels);
+
+		for (i = 0; i < nrels; i++)
+			smgrclose(srels[i]);
+	}
+
+	/*
+	 * Now, make a second pass and remove anything that matches.
+	 */
+	if ((op & UNLOGGED_RELATION_CLEANUP) != 0)
+	{
 		dbspace_dir = AllocateDir(dbspacedirname);
 		while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
 		{
 			ForkNumber	forkNum;
+			StorageMarks mark;
 			int			relnumchars;
-			unlogged_relation_entry ent;
+			RelFileNumber key;
+			relfile_entry *ent;
+			RelFileLocatorBackend rel;
 
 			/* Skip anything that doesn't look like a relation data file. */
 			if (!parse_filename_for_nontemp_relation(de->d_name, &relnumchars,
-													 &forkNum))
-				continue;
-
-			/* We never remove the init fork. */
-			if (forkNum == INIT_FORKNUM)
+													 &forkNum, &mark))
 				continue;
 
 			/*
 			 * See whether the OID portion of the name shows up in the hash
 			 * table.  If so, nuke it!
 			 */
-			ent.reloid = atooid(de->d_name);
-			if (hash_search(hash, &ent, HASH_FIND, NULL))
+			key = atooid(de->d_name);
+			ent = hash_search(hash, &key, HASH_FIND, NULL);
+
+			if (!ent)
+				continue;
+
+			if (!ent->dirty_all)
 			{
-				snprintf(rm_path, sizeof(rm_path), "%s/%s",
-						 dbspacedirname, de->d_name);
-				if (unlink(rm_path) < 0)
-					ereport(ERROR,
-							(errcode_for_file_access(),
-							 errmsg("could not remove file \"%s\": %m",
-									rm_path)));
-				else
-					elog(DEBUG2, "unlinked file \"%s\"", rm_path);
+				/* clean permanent relations don't need cleanup */
+				if (!ent->has_init)
+					continue;
+
+				if (forkNum == INIT_FORKNUM && mark == SMGR_MARK_NONE)
+					continue;
 			}
+
+			/* so, nuke it! */
+			snprintf(rm_path, sizeof(rm_path), "%s/%s",
+					 dbspacedirname, de->d_name);
+			if (unlink(rm_path) < 0)
+				ereport(ERROR,
+						errcode_for_file_access(),
+						errmsg("could not remove file \"%s\": %m",
+							   rm_path));
+
+			rel.backend = InvalidBackendId;
+			rel.locator.spcOid = tspid;
+			rel.locator.dbOid = dbid;
+			rel.locator.relNumber = atooid(de->d_name);
+
+			ForgetRelationForkSyncRequests(rel, forkNum);
 		}
 
 		/* Cleanup is complete. */
 		FreeDir(dbspace_dir);
-		hash_destroy(hash);
 	}
 
 	/*
 	 * Initialization happens after cleanup is complete: we copy each init
-	 * fork file to the corresponding main fork file.  Note that if we are
-	 * asked to do both cleanup and init, we may never get here: if the
-	 * cleanup code determines that there are no init forks in this dbspace,
-	 * it will return before we get to this point.
+	 * fork file to the corresponding main fork file.
 	 */
 	if ((op & UNLOGGED_RELATION_INIT) != 0)
 	{
@@ -285,6 +388,7 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
 		while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
 		{
 			ForkNumber	forkNum;
+			StorageMarks mark;
 			int			relnumchars;
 			char		relnumbuf[OIDCHARS + 1];
 			char		srcpath[MAXPGPATH * 2];
@@ -292,9 +396,11 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
 
 			/* Skip anything that doesn't look like a relation data file. */
 			if (!parse_filename_for_nontemp_relation(de->d_name, &relnumchars,
-													 &forkNum))
+													 &forkNum, &mark))
 				continue;
 
+			Assert(mark == SMGR_MARK_NONE);
+
 			/* Also skip it unless this is the init fork. */
 			if (forkNum != INIT_FORKNUM)
 				continue;
@@ -328,15 +434,18 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
 		while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
 		{
 			ForkNumber	forkNum;
+			StorageMarks mark;
 			int			relnumchars;
 			char		relnumbuf[OIDCHARS + 1];
 			char		mainpath[MAXPGPATH];
 
 			/* Skip anything that doesn't look like a relation data file. */
 			if (!parse_filename_for_nontemp_relation(de->d_name, &relnumchars,
-													 &forkNum))
+													 &forkNum, &mark))
 				continue;
 
+			Assert(mark == SMGR_MARK_NONE);
+
 			/* Also skip it unless this is the init fork. */
 			if (forkNum != INIT_FORKNUM)
 				continue;
@@ -379,7 +488,7 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
  */
 bool
 parse_filename_for_nontemp_relation(const char *name, int *relnumchars,
-									ForkNumber *fork)
+									ForkNumber *fork, StorageMarks *mark)
 {
 	int			pos;
 
@@ -410,11 +519,19 @@ parse_filename_for_nontemp_relation(const char *name, int *relnumchars,
 
 		for (segchar = 1; isdigit((unsigned char) name[pos + segchar]); ++segchar)
 			;
-		if (segchar <= 1)
-			return false;
-		pos += segchar;
+		if (segchar > 1)
+			pos += segchar;
 	}
 
+	/* mark file? */
+	if (name[pos] == '.' && name[pos + 1] != 0)
+	{
+		*mark = name[pos + 1];
+		pos += 2;
+	}
+	else
+		*mark = SMGR_MARK_NONE;
+
 	/* Now we should be at the end. */
 	if (name[pos] != '\0')
 		return false;
diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c
index fdecbad170..6eb50d40a0 100644
--- a/src/backend/storage/smgr/md.c
+++ b/src/backend/storage/smgr/md.c
@@ -141,6 +141,8 @@ static MdfdVec *_mdfd_getseg(SMgrRelation reln, ForkNumber forknum,
 							 BlockNumber blkno, bool skipFsync, int behavior);
 static BlockNumber _mdnblocks(SMgrRelation reln, ForkNumber forknum,
 							  MdfdVec *seg);
+static bool mdmarkexists(SMgrRelation reln, ForkNumber forkNum,
+						 StorageMarks mark);
 
 static inline int
 _mdfd_open_flags(void)
@@ -183,6 +185,82 @@ mdexists(SMgrRelation reln, ForkNumber forknum)
 	return (mdopenfork(reln, forknum, EXTENSION_RETURN_NULL) != NULL);
 }
 
+/*
+ *  mdcreatemark() -- Create a mark file.
+ *
+ * If isRedo is true, it's okay for the file to exist already.
+ */
+void
+mdcreatemark(SMgrRelation reln, ForkNumber forkNum, StorageMarks mark,
+			 bool isRedo)
+{
+	char	   *path = markpath(reln->smgr_rlocator, forkNum, mark);
+	int			fd;
+
+	/* See mdcreate for details.. */
+	TablespaceCreateDbspace(reln->smgr_rlocator.locator.spcOid,
+							reln->smgr_rlocator.locator.dbOid,
+							isRedo);
+
+	fd = BasicOpenFile(path, O_WRONLY | O_CREAT | O_EXCL);
+	if (fd < 0 && (!isRedo || errno != EEXIST))
+		ereport(ERROR,
+				errcode_for_file_access(),
+				errmsg("could not create mark file \"%s\": %m", path));
+
+	pg_fsync(fd);
+	close(fd);
+
+	/*
+	 * To guarantee that the creation of the file is persistent, fsync its
+	 * parent directory.
+	 */
+	fsync_parent_path(path, ERROR);
+
+	pfree(path);
+}
+
+
+/*
+ *  mdunlinkmark()  -- Delete the mark file
+ *
+ * If isRedo is true, it's okay for the file being not found.
+ */
+void
+mdunlinkmark(SMgrRelation reln, ForkNumber forkNum, StorageMarks mark,
+			 bool isRedo)
+{
+	char	   *path = markpath(reln->smgr_rlocator, forkNum, mark);
+
+	if (!isRedo || mdmarkexists(reln, forkNum, mark))
+		durable_unlink(path, ERROR);
+
+	pfree(path);
+}
+
+/*
+ *  mdmarkexists()  -- Check if the file exists.
+ */
+static bool
+mdmarkexists(SMgrRelation reln, ForkNumber forkNum, StorageMarks mark)
+{
+	char	   *path = markpath(reln->smgr_rlocator, forkNum, mark);
+	int			fd;
+
+	fd = BasicOpenFile(path, O_RDONLY);
+	if (fd < 0 && errno != ENOENT)
+		ereport(ERROR,
+				errcode_for_file_access(),
+				errmsg("could not access mark file \"%s\": %m", path));
+	pfree(path);
+
+	if (fd < 0)
+		return false;
+
+	close(fd);
+	return true;
+}
+
 /*
  * mdcreate() -- Create a new relation on magnetic disk.
  *
@@ -1227,6 +1305,16 @@ register_forget_request(RelFileLocatorBackend rlocator, ForkNumber forknum,
 	RegisterSyncRequest(&tag, SYNC_FORGET_REQUEST, true /* retryOnError */ );
 }
 
+/*
+ * ForgetRelationForkSyncRequests -- forget any fsyncs and unlinks for a fork
+ */
+void
+ForgetRelationForkSyncRequests(RelFileLocatorBackend rlocator,
+							   ForkNumber forknum)
+{
+	register_forget_request(rlocator, forknum, 0);
+}
+
 /*
  * ForgetDatabaseSyncRequests -- forget any fsyncs and unlinks for a DB
  */
@@ -1592,12 +1680,14 @@ mdsyncfiletag(const FileTag *ftag, char *path)
  * Return 0 on success, -1 on failure, with errno set.
  */
 int
-mdunlinkfiletag(const FileTag *ftag, char *path)
+mdunlinkfiletag(const FileTag *ftag, char *path, StorageMarks mark)
 {
 	char	   *p;
 
 	/* Compute the path. */
-	p = relpathperm(ftag->rlocator, MAIN_FORKNUM);
+	p = GetRelationPath(ftag->rlocator.dbOid, ftag->rlocator.spcOid,
+						ftag->rlocator.relNumber,InvalidBackendId,
+						MAIN_FORKNUM, mark);
 	strlcpy(path, p, MAXPGPATH);
 	pfree(p);
 
diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c
index f76c4605db..eafe14bb5e 100644
--- a/src/backend/storage/smgr/smgr.c
+++ b/src/backend/storage/smgr/smgr.c
@@ -65,6 +65,10 @@ typedef struct f_smgr
 	void		(*smgr_truncate) (SMgrRelation reln, ForkNumber forknum,
 								  BlockNumber nblocks);
 	void		(*smgr_immedsync) (SMgrRelation reln, ForkNumber forknum);
+	void		(*smgr_createmark) (SMgrRelation reln, ForkNumber forknum,
+									StorageMarks mark, bool isRedo);
+	void		(*smgr_unlinkmark) (SMgrRelation reln, ForkNumber forknum,
+									StorageMarks mark, bool isRedo);
 } f_smgr;
 
 static const f_smgr smgrsw[] = {
@@ -86,6 +90,8 @@ static const f_smgr smgrsw[] = {
 		.smgr_nblocks = mdnblocks,
 		.smgr_truncate = mdtruncate,
 		.smgr_immedsync = mdimmedsync,
+		.smgr_createmark = mdcreatemark,
+		.smgr_unlinkmark = mdunlinkmark,
 	}
 };
 
@@ -375,6 +381,26 @@ smgrcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo)
 	smgrsw[reln->smgr_which].smgr_create(reln, forknum, isRedo);
 }
 
+/*
+ *	smgrcreatemark() -- Create a mark file
+ */
+void
+smgrcreatemark(SMgrRelation reln, ForkNumber forknum, StorageMarks mark,
+			   bool isRedo)
+{
+	smgrsw[reln->smgr_which].smgr_createmark(reln, forknum, mark, isRedo);
+}
+
+/*
+ *	smgrunlinkmark() -- Delete a mark file
+ */
+void
+smgrunlinkmark(SMgrRelation reln, ForkNumber forknum, StorageMarks mark,
+			   bool isRedo)
+{
+	smgrsw[reln->smgr_which].smgr_unlinkmark(reln, forknum, mark, isRedo);
+}
+
 /*
  * smgrdosyncall() -- Immediately sync all forks of all given relations
  *
@@ -722,6 +748,12 @@ smgrimmedsync(SMgrRelation reln, ForkNumber forknum)
 	smgrsw[reln->smgr_which].smgr_immedsync(reln, forknum);
 }
 
+void
+smgrunlink(SMgrRelation reln, ForkNumber forknum, bool isRedo)
+{
+	smgrsw[reln->smgr_which].smgr_unlink(reln->smgr_rlocator, forknum, isRedo);
+}
+
 /*
  * AtEOXact_SMgr
  *
diff --git a/src/backend/storage/sync/sync.c b/src/backend/storage/sync/sync.c
index 04fcb06056..6b072bcdd0 100644
--- a/src/backend/storage/sync/sync.c
+++ b/src/backend/storage/sync/sync.c
@@ -89,7 +89,8 @@ static CycleCtr checkpoint_cycle_ctr = 0;
 typedef struct SyncOps
 {
 	int			(*sync_syncfiletag) (const FileTag *ftag, char *path);
-	int			(*sync_unlinkfiletag) (const FileTag *ftag, char *path);
+	int			(*sync_unlinkfiletag) (const FileTag *ftag, char *path,
+									   StorageMarks mark);
 	bool		(*sync_filetagmatches) (const FileTag *ftag,
 										const FileTag *candidate);
 } SyncOps;
@@ -233,7 +234,8 @@ SyncPostCheckpoint(void)
 
 		/* Unlink the file */
 		if (syncsw[entry->tag.handler].sync_unlinkfiletag(&entry->tag,
-														  path) < 0)
+														  path,
+														  SMGR_MARK_NONE) < 0)
 		{
 			/*
 			 * There's a race condition, when the database is dropped at the
@@ -242,6 +244,26 @@ SyncPostCheckpoint(void)
 			 * here. rmtree() also has to ignore ENOENT errors, to deal with
 			 * the possibility that we delete the file first.
 			 */
+			if (errno != ENOENT)
+				ereport(WARNING,
+						errcode_for_file_access(),
+						errmsg("could not remove file \"%s\": %m", path));
+		}
+		else if (syncsw[entry->tag.handler].sync_unlinkfiletag(&entry->tag,
+															   path,
+															   SMGR_MARK_UNCOMMITTED)
+				 < 0)
+		{
+			/*
+			 * We might also have SMGR_MARK_UNCOMMITTED file.  Remove it if the
+			 * fork file has been successfully removed. It's fine if the file
+			 * does not exist. Since we have successfully removed the storage
+			 * file, it's no big deal if the mark file can't be removed. It
+			 * will be eventually removed during a future startup. If that
+			 * removal fails, the leftover mark file prevents the creation of
+			 * the corresponding storage file so that mark files won't result
+			 * in unexpected removal of the correct storage files.
+			 */
 			if (errno != ENOENT)
 				ereport(WARNING,
 						(errcode_for_file_access(),
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 27782237d0..e9e4bafb01 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -418,6 +418,22 @@ extractPageInfo(XLogReaderState *record)
 		 * source system.
 		 */
 	}
+	else if (rmid == RM_SMGR_ID && rminfo == XLOG_SMGR_UNLINK)
+	{
+		/*
+		 * We can safely ignore there.  We'll see that the file don't exist in
+		 * the target data dir, and copy them in from the source system. No
+		 * need to do anything special here.
+		 */
+	}
+	else if (rmid == RM_SMGR_ID && rminfo == XLOG_SMGR_MARK)
+	{
+		/*
+		 * We can safely ignore these, The file will be removed from the
+		 * target, if it doesn't exist in the source system.  The files are
+		 * empty so we don't need to bother the content.
+		 */
+	}
 	else if (rmid == RM_XACT_ID &&
 			 ((rminfo & XLOG_XACT_OPMASK) == XLOG_XACT_COMMIT ||
 			  (rminfo & XLOG_XACT_OPMASK) == XLOG_XACT_COMMIT_PREPARED ||
diff --git a/src/common/relpath.c b/src/common/relpath.c
index 87de5f6c96..b1f6832cfa 100644
--- a/src/common/relpath.c
+++ b/src/common/relpath.c
@@ -139,9 +139,15 @@ GetDatabasePath(Oid dbOid, Oid spcOid)
  */
 char *
 GetRelationPath(Oid dbOid, Oid spcOid, RelFileNumber relNumber,
-				int backendId, ForkNumber forkNumber)
+				int backendId, ForkNumber forkNumber, char mark)
 {
 	char	   *path;
+	char		markstr[4];
+
+	if (mark == 0)
+		markstr[0] = 0;
+	else
+		snprintf(markstr, sizeof(markstr), ".%c", mark);
 
 	if (spcOid == GLOBALTABLESPACE_OID)
 	{
@@ -149,10 +155,10 @@ GetRelationPath(Oid dbOid, Oid spcOid, RelFileNumber relNumber,
 		Assert(dbOid == 0);
 		Assert(backendId == InvalidBackendId);
 		if (forkNumber != MAIN_FORKNUM)
-			path = psprintf("global/%u_%s",
-							relNumber, forkNames[forkNumber]);
+			path = psprintf("global/%u_%s%s",
+							relNumber, forkNames[forkNumber], markstr);
 		else
-			path = psprintf("global/%u", relNumber);
+			path = psprintf("global/%u%s", relNumber, markstr);
 	}
 	else if (spcOid == DEFAULTTABLESPACE_OID)
 	{
@@ -160,22 +166,22 @@ GetRelationPath(Oid dbOid, Oid spcOid, RelFileNumber relNumber,
 		if (backendId == InvalidBackendId)
 		{
 			if (forkNumber != MAIN_FORKNUM)
-				path = psprintf("base/%u/%u_%s",
+				path = psprintf("base/%u/%u_%s%s",
 								dbOid, relNumber,
-								forkNames[forkNumber]);
+								forkNames[forkNumber], markstr);
 			else
-				path = psprintf("base/%u/%u",
-								dbOid, relNumber);
+				path = psprintf("base/%u/%u%s",
+								dbOid, relNumber, markstr);
 		}
 		else
 		{
 			if (forkNumber != MAIN_FORKNUM)
-				path = psprintf("base/%u/t%d_%u_%s",
+				path = psprintf("base/%u/t%d_%u_%s%s",
 								dbOid, backendId, relNumber,
-								forkNames[forkNumber]);
+								forkNames[forkNumber], markstr);
 			else
-				path = psprintf("base/%u/t%d_%u",
-								dbOid, backendId, relNumber);
+				path = psprintf("base/%u/t%d_%u%s",
+								dbOid, backendId, relNumber, markstr);
 		}
 	}
 	else
@@ -184,27 +190,28 @@ GetRelationPath(Oid dbOid, Oid spcOid, RelFileNumber relNumber,
 		if (backendId == InvalidBackendId)
 		{
 			if (forkNumber != MAIN_FORKNUM)
-				path = psprintf("pg_tblspc/%u/%s/%u/%u_%s",
+				path = psprintf("pg_tblspc/%u/%s/%u/%u_%s%s",
 								spcOid, TABLESPACE_VERSION_DIRECTORY,
 								dbOid, relNumber,
-								forkNames[forkNumber]);
+								forkNames[forkNumber], markstr);
 			else
-				path = psprintf("pg_tblspc/%u/%s/%u/%u",
+				path = psprintf("pg_tblspc/%u/%s/%u/%u%s",
 								spcOid, TABLESPACE_VERSION_DIRECTORY,
-								dbOid, relNumber);
+								dbOid, relNumber, markstr);
 		}
 		else
 		{
 			if (forkNumber != MAIN_FORKNUM)
-				path = psprintf("pg_tblspc/%u/%s/%u/t%d_%u_%s",
+				path = psprintf("pg_tblspc/%u/%s/%u/t%d_%u_%s%s",
 								spcOid, TABLESPACE_VERSION_DIRECTORY,
 								dbOid, backendId, relNumber,
-								forkNames[forkNumber]);
+								forkNames[forkNumber], markstr);
 			else
-				path = psprintf("pg_tblspc/%u/%s/%u/t%d_%u",
+				path = psprintf("pg_tblspc/%u/%s/%u/t%d_%u%s",
 								spcOid, TABLESPACE_VERSION_DIRECTORY,
-								dbOid, backendId, relNumber);
+								dbOid, backendId, relNumber, markstr);
 		}
 	}
+
 	return path;
 }
diff --git a/src/include/catalog/storage.h b/src/include/catalog/storage.h
index 45a3c7835c..0b39c6ef56 100644
--- a/src/include/catalog/storage.h
+++ b/src/include/catalog/storage.h
@@ -25,6 +25,8 @@ extern PGDLLIMPORT int wal_skip_threshold;
 extern SMgrRelation RelationCreateStorage(RelFileLocator rlocator,
 										  char relpersistence,
 										  bool register_delete);
+extern void RelationCreateInitFork(Relation rel);
+extern void RelationDropInitFork(Relation rel);
 extern void RelationDropStorage(Relation rel);
 extern void RelationPreserveStorage(RelFileLocator rlocator, bool atCommit);
 extern void RelationPreTruncate(Relation rel);
@@ -43,6 +45,7 @@ extern void RestorePendingSyncs(char *startAddress);
 extern void smgrDoPendingDeletes(bool isCommit);
 extern void smgrDoPendingSyncs(bool isCommit, bool isParallelWorker);
 extern int	smgrGetPendingDeletes(bool forCommit, RelFileLocator **ptr);
+extern void smgrDoPendingCleanups(bool isCommit);
 extern void AtSubCommit_smgr(void);
 extern void AtSubAbort_smgr(void);
 extern void PostPrepare_smgr(void);
diff --git a/src/include/catalog/storage_xlog.h b/src/include/catalog/storage_xlog.h
index 6b0a7aa3df..a36646c6ee 100644
--- a/src/include/catalog/storage_xlog.h
+++ b/src/include/catalog/storage_xlog.h
@@ -18,17 +18,23 @@
 #include "lib/stringinfo.h"
 #include "storage/block.h"
 #include "storage/relfilelocator.h"
+#include "storage/smgr.h"
 
 /*
  * Declarations for smgr-related XLOG records
  *
- * Note: we log file creation and truncation here, but logging of deletion
- * actions is handled by xact.c, because it is part of transaction commit.
+ * Note: we log file creation, truncation and buffer persistence change here,
+ * but logging of deletion actions is handled mainly by xact.c, because it is
+ * part of transaction commit in most cases.  However, there's a case where
+ * init forks are deleted outside control of transaction.
  */
 
 /* XLOG gives us high 4 bits */
 #define XLOG_SMGR_CREATE	0x10
 #define XLOG_SMGR_TRUNCATE	0x20
+#define XLOG_SMGR_UNLINK	0x30
+#define XLOG_SMGR_MARK		0x40
+#define XLOG_SMGR_BUFPERSISTENCE	0x50
 
 typedef struct xl_smgr_create
 {
@@ -36,6 +42,26 @@ typedef struct xl_smgr_create
 	ForkNumber	forkNum;
 } xl_smgr_create;
 
+typedef struct xl_smgr_unlink
+{
+	RelFileLocator rlocator;
+	ForkNumber	forkNum;
+} xl_smgr_unlink;
+
+typedef enum smgr_mark_action
+{
+	XLOG_SMGR_MARK_CREATE = 'c',
+	XLOG_SMGR_MARK_UNLINK = 'u'
+} smgr_mark_action;
+
+typedef struct xl_smgr_mark
+{
+	RelFileLocator rlocator;
+	ForkNumber	forkNum;
+	StorageMarks mark;
+	smgr_mark_action action;
+} xl_smgr_mark;
+
 /* flags for xl_smgr_truncate */
 #define SMGR_TRUNCATE_HEAP		0x0001
 #define SMGR_TRUNCATE_VM		0x0002
@@ -51,6 +77,11 @@ typedef struct xl_smgr_truncate
 } xl_smgr_truncate;
 
 extern void log_smgrcreate(const RelFileLocator *rlocator, ForkNumber forkNum);
+extern void log_smgrunlink(const RelFileLocator *rlocator, ForkNumber forkNum);
+extern void log_smgrcreatemark(const RelFileLocator *rlocator,
+							   ForkNumber forkNum, StorageMarks mark);
+extern void log_smgrunlinkmark(const RelFileLocator *rlocator,
+							   ForkNumber forkNum, StorageMarks mark);
 
 extern void smgr_redo(XLogReaderState *record);
 extern void smgr_desc(StringInfo buf, XLogReaderState *record);
diff --git a/src/include/common/relpath.h b/src/include/common/relpath.h
index 511c21682e..28c9dbcd13 100644
--- a/src/include/common/relpath.h
+++ b/src/include/common/relpath.h
@@ -74,7 +74,7 @@ extern int	forkname_chars(const char *str, ForkNumber *fork);
 extern char *GetDatabasePath(Oid dbOid, Oid spcOid);
 
 extern char *GetRelationPath(Oid dbOid, Oid spcOid, RelFileNumber relNumber,
-							 int backendId, ForkNumber forkNumber);
+							 int backendId, ForkNumber forkNumber, char mark);
 
 /*
  * Wrapper macros for GetRelationPath.  Beware of multiple
@@ -84,7 +84,7 @@ extern char *GetRelationPath(Oid dbOid, Oid spcOid, RelFileNumber relNumber,
 /* First argument is a RelFileLocator */
 #define relpathbackend(rlocator, backend, forknum) \
 	GetRelationPath((rlocator).dbOid, (rlocator).spcOid, (rlocator).relNumber, \
-					backend, forknum)
+					backend, forknum, 0)
 
 /* First argument is a RelFileLocator */
 #define relpathperm(rlocator, forknum) \
@@ -94,4 +94,9 @@ extern char *GetRelationPath(Oid dbOid, Oid spcOid, RelFileNumber relNumber,
 #define relpath(rlocator, forknum) \
 	relpathbackend((rlocator).locator, (rlocator).backend, forknum)
 
+/* First argument is a RelFileLocatorBackend */
+#define markpath(rlocator, forknum, mark)								\
+	GetRelationPath((rlocator).locator.dbOid, (rlocator).locator.spcOid, \
+					(rlocator).locator.relNumber,						\
+					(rlocator).backend, forknum, mark)
 #endif							/* RELPATH_H */
diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h
index 6791a406fc..35d022d8e1 100644
--- a/src/include/storage/fd.h
+++ b/src/include/storage/fd.h
@@ -190,6 +190,7 @@ extern void pg_flush_data(int fd, off_t offset, off_t nbytes);
 extern int	pg_truncate(const char *path, off_t length);
 extern void fsync_fname(const char *fname, bool isdir);
 extern int	fsync_fname_ext(const char *fname, bool isdir, bool ignore_perm, int elevel);
+extern int	fsync_parent_path(const char *fname, int elevel);
 extern int	durable_rename(const char *oldfile, const char *newfile, int elevel);
 extern int	durable_unlink(const char *fname, int elevel);
 extern void SyncDataDirectory(void);
diff --git a/src/include/storage/md.h b/src/include/storage/md.h
index 941879ee6a..de49863245 100644
--- a/src/include/storage/md.h
+++ b/src/include/storage/md.h
@@ -23,6 +23,10 @@
 extern void mdinit(void);
 extern void mdopen(SMgrRelation reln);
 extern void mdclose(SMgrRelation reln, ForkNumber forknum);
+extern void mdcreatemark(SMgrRelation reln, ForkNumber forknum,
+						 StorageMarks mark, bool isRedo);
+extern void mdunlinkmark(SMgrRelation reln, ForkNumber forknum,
+						 StorageMarks mark, bool isRedo);
 extern void mdcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo);
 extern bool mdexists(SMgrRelation reln, ForkNumber forknum);
 extern void mdunlink(RelFileLocatorBackend rlocator, ForkNumber forknum, bool isRedo);
@@ -43,12 +47,14 @@ extern void mdtruncate(SMgrRelation reln, ForkNumber forknum,
 					   BlockNumber nblocks);
 extern void mdimmedsync(SMgrRelation reln, ForkNumber forknum);
 
+extern void ForgetRelationForkSyncRequests(RelFileLocatorBackend rlocator,
+										   ForkNumber forknum);
 extern void ForgetDatabaseSyncRequests(Oid dbid);
 extern void DropRelationFiles(RelFileLocator *delrels, int ndelrels, bool isRedo);
 
 /* md sync callbacks */
 extern int	mdsyncfiletag(const FileTag *ftag, char *path);
-extern int	mdunlinkfiletag(const FileTag *ftag, char *path);
+extern int	mdunlinkfiletag(const FileTag *ftag, char *path, StorageMarks mark);
 extern bool mdfiletagmatches(const FileTag *ftag, const FileTag *candidate);
 
 #endif							/* MD_H */
diff --git a/src/include/storage/reinit.h b/src/include/storage/reinit.h
index e2bbb5abe9..119dac1505 100644
--- a/src/include/storage/reinit.h
+++ b/src/include/storage/reinit.h
@@ -16,14 +16,16 @@
 #define REINIT_H
 
 #include "common/relpath.h"
-
+#include "storage/smgr.h"
 
 extern void ResetUnloggedRelations(int op);
 extern bool parse_filename_for_nontemp_relation(const char *name,
 												int *relnumchars,
-												ForkNumber *fork);
+												ForkNumber *fork,
+												StorageMarks *mark);
 
 #define UNLOGGED_RELATION_CLEANUP		0x0001
-#define UNLOGGED_RELATION_INIT			0x0002
+#define UNLOGGED_RELATION_DROP_BUFFER	0x0002
+#define UNLOGGED_RELATION_INIT			0x0004
 
 #endif							/* REINIT_H */
diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h
index a9a179aaba..4964146106 100644
--- a/src/include/storage/smgr.h
+++ b/src/include/storage/smgr.h
@@ -18,6 +18,18 @@
 #include "storage/block.h"
 #include "storage/relfilelocator.h"
 
+/*
+ * Storage marks is a file of which existence suggests something about a
+ * file. The name of such files is "<filename>.<mark>", where the mark is one
+ * of the values of StorageMarks. Since ".<digit>" means segment files so don't
+ * use digits for the mark character.
+ */
+typedef enum StorageMarks
+{
+	SMGR_MARK_NONE = 0,
+	SMGR_MARK_UNCOMMITTED = 'u' /* the file is not committed yet */
+} StorageMarks;
+
 /*
  * smgr.c maintains a table of SMgrRelation objects, which are essentially
  * cached file handles.  An SMgrRelation is created (if not already present)
@@ -87,7 +99,12 @@ extern void smgrcloseall(void);
 extern void smgrcloserellocator(RelFileLocatorBackend rlocator);
 extern void smgrrelease(SMgrRelation reln);
 extern void smgrreleaseall(void);
+extern void smgrcreatemark(SMgrRelation reln, ForkNumber forknum,
+						   StorageMarks mark, bool isRedo);
+extern void smgrunlinkmark(SMgrRelation reln, ForkNumber forknum,
+						   StorageMarks mark, bool isRedo);
 extern void smgrcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo);
+extern void smgrunlink(SMgrRelation reln, ForkNumber forknum, bool isRedo);
 extern void smgrdosyncall(SMgrRelation *rels, int nrels);
 extern void smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo);
 extern void smgrextend(SMgrRelation reln, ForkNumber forknum,
diff --git a/src/test/recovery/t/013_crash_restart.pl b/src/test/recovery/t/013_crash_restart.pl
index ce57792f31..d32a1b7aa8 100644
--- a/src/test/recovery/t/013_crash_restart.pl
+++ b/src/test/recovery/t/013_crash_restart.pl
@@ -86,6 +86,24 @@ ok( pump_until(
 $killme_stdout = '';
 $killme_stderr = '';
 
+#create a table that should *not* survive, but has rows.
+#the table's contents is requried to cause access to the storage file
+#after a restart.
+$killme_stdin .= q[
+CREATE TABLE not_alive AS SELECT 1 as a;
+SELECT pg_relation_filepath('not_alive');
+];
+ok( pump_until(
+		$killme,         $psql_timeout,
+		\$killme_stdout, qr/[[:alnum:]\/]+[\r\n]$/m),
+	'added in-creation table');
+my $not_alive_relfile = $node->data_dir . "/" . $killme_stdout;
+chomp($not_alive_relfile);
+$killme_stdout = '';
+$killme_stderr = '';
+
+# The relfile must be exists now
+ok ( -e $not_alive_relfile, 'relfile for in-creation table');
 
 # Start longrunning query in second session; its failure will signal that
 # crash-restart has occurred.  The initial wait for the trivial select is to
@@ -144,6 +162,9 @@ $killme->run();
 ($monitor_stdin, $monitor_stdout, $monitor_stderr) = ('', '', '');
 $monitor->run();
 
+# The relfile must have been removed due to the recent restart.
+ok ( ! -e $not_alive_relfile,
+	 'relfile for the in-creation table should be removed after restart');
 
 # Acquire pid of new backend
 $killme_stdin .= q[
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 51b7951ad8..057e3d3104 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1997,6 +1997,7 @@ PatternInfoArray
 Pattern_Prefix_Status
 Pattern_Type
 PendingFsyncEntry
+PendingMarkCleanup
 PendingRelDelete
 PendingRelSync
 PendingUnlinkEntry
@@ -2639,6 +2640,7 @@ StdRdOptIndexCleanup
 StdRdOptions
 Step
 StopList
+StorageMarks
 StrategyNumber
 StreamCtl
 StreamStopReason
@@ -3674,6 +3676,7 @@ registered_buffer
 regmatch_t
 regoff_t
 regproc
+relfile_entry
 relopt_bool
 relopt_enum
 relopt_enum_elt_def
@@ -3729,6 +3732,7 @@ slist_iter
 slist_mutable_iter
 slist_node
 slock_t
+smgr_mark_action
 socket_set
 socklen_t
 spgBulkDeleteState
@@ -3936,7 +3940,9 @@ xl_restore_point
 xl_running_xacts
 xl_seq_rec
 xl_smgr_create
+xl_smgr_mark
 xl_smgr_truncate
+xl_smgr_unlink
 xl_standby_lock
 xl_standby_locks
 xl_tblspc_create_rec
-- 
2.25.1

>From 1ba019a026c66c71bafdc9c71393bef4251917b2 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyota....@gmail.com>
Date: Tue, 25 Apr 2023 15:49:10 +0900
Subject: [PATCH v29 2/3] In-place table persistence change

Previously, the command caused a large amount of file I/O due to heap
rewrites, even though ALTER TABLE SET UNLOGGED does not require any
data rewrites.  This patch eliminates the need for
rewrites. Additionally, ALTER TABLE SET LOGGED is updated to emit
XLOG_FPI records instead of numerous HEAP_INSERTs when wal_level >
minimal, reducing resource consumption.
---
 src/backend/access/rmgrdesc/smgrdesc.c |  12 +
 src/backend/catalog/storage.c          | 295 ++++++++++++++++++++++++-
 src/backend/commands/tablecmds.c       | 270 ++++++++++++++++++----
 src/backend/storage/buffer/bufmgr.c    |  84 +++++++
 src/backend/storage/file/reinit.c      |  51 ++++-
 src/bin/pg_rewind/parsexlog.c          |   6 +
 src/include/catalog/storage_xlog.h     |   8 +
 src/include/storage/bufmgr.h           |   2 +
 src/tools/pgindent/typedefs.list       |   1 +
 9 files changed, 675 insertions(+), 54 deletions(-)

diff --git a/src/backend/access/rmgrdesc/smgrdesc.c b/src/backend/access/rmgrdesc/smgrdesc.c
index f8187385c4..e2998a3ee4 100644
--- a/src/backend/access/rmgrdesc/smgrdesc.c
+++ b/src/backend/access/rmgrdesc/smgrdesc.c
@@ -71,6 +71,15 @@ smgr_desc(StringInfo buf, XLogReaderState *record)
 		appendStringInfo(buf, "%s %s", action, path);
 		pfree(path);
 	}
+	else if (info == XLOG_SMGR_BUFPERSISTENCE)
+	{
+		xl_smgr_bufpersistence *xlrec = (xl_smgr_bufpersistence *) rec;
+		char	   *path = relpathperm(xlrec->rlocator, MAIN_FORKNUM);
+
+		appendStringInfoString(buf, path);
+		appendStringInfo(buf, " persistence %d", xlrec->persistence);
+		pfree(path);
+	}
 }
 
 const char *
@@ -92,6 +101,9 @@ smgr_identify(uint8 info)
 		case XLOG_SMGR_MARK:
 			id = "MARK";
 			break;
+		case XLOG_SMGR_BUFPERSISTENCE:
+			id = "BUFPERSISTENCE";
+			break;
 	}
 
 	return id;
diff --git a/src/backend/catalog/storage.c b/src/backend/catalog/storage.c
index fe06c3c31d..6106376525 100644
--- a/src/backend/catalog/storage.c
+++ b/src/backend/catalog/storage.c
@@ -69,11 +69,13 @@ typedef struct PendingRelDelete
 
 #define	PCOP_UNLINK_FORK		(1 << 0)
 #define	PCOP_UNLINK_MARK		(1 << 1)
+#define	PCOP_SET_PERSISTENCE	(1 << 2)
 
 typedef struct PendingCleanup
 {
 	RelFileLocator rlocator;	/* relation that need a cleanup */
 	int			op;				/* operation mask */
+	bool		bufpersistence; /* buffer persistence to set */
 	ForkNumber	unlink_forknum; /* forknum to unlink */
 	StorageMarks unlink_mark;	/* mark to unlink */
 	BackendId	backend;		/* InvalidBackendId if not a temp rel */
@@ -223,6 +225,202 @@ RelationCreateStorage(RelFileLocator rlocator, char relpersistence,
 	return srel;
 }
 
+/*
+ * RelationCreateInitFork
+ *		Create physical storage for the init fork of a relation.
+ *
+ * Create the init fork for the relation.
+ *
+ * This function is transactional. The creation is WAL-logged, and if the
+ * transaction aborts later on, the init fork will be removed.
+ */
+void
+RelationCreateInitFork(Relation rel)
+{
+	RelFileLocator rlocator = rel->rd_locator;
+	PendingCleanup *pending;
+	PendingCleanup *prev;
+	PendingCleanup *next;
+	SMgrRelation srel;
+	bool		create = true;
+
+	/* switch buffer persistence */
+	SetRelationBuffersPersistence(RelationGetSmgr(rel), false, false);
+
+	/*
+	 * If a pending-unlink exists for this relation's init-fork, it indicates
+	 * the init-fork's existed before the current transaction; this function
+	 * reverts the pending-unlink by removing the entry. See
+	 * RelationDropInitFork.
+	 */
+	prev = NULL;
+	for (pending = pendingCleanups; pending != NULL; pending = next)
+	{
+		next = pending->next;
+
+		if (RelFileLocatorEquals(rlocator, pending->rlocator) &&
+			pending->unlink_forknum == INIT_FORKNUM)
+		{
+			if (prev)
+				prev->next = next;
+			else
+				pendingCleanups = next;
+
+			pfree(pending);
+			/* prev does not change */
+
+			create = false;
+		}
+		else
+			prev = pending;
+	}
+
+	if (!create)
+		return;
+
+	/* create the init fork, along with the mark file */
+	srel = smgropen(rlocator, InvalidBackendId);
+	log_smgrcreatemark(&rlocator, INIT_FORKNUM, SMGR_MARK_UNCOMMITTED);
+	smgrcreatemark(srel, INIT_FORKNUM, SMGR_MARK_UNCOMMITTED, false);
+
+	/* We don't have existing init fork, create it. */
+	smgrcreate(srel, INIT_FORKNUM, false);
+
+	/*
+	 * For index relations, WAL-logging and file sync are handled by
+	 * ambuildempty. In contrast, for heap relations, these tasks are performed
+	 * directly.
+	 */
+	if (rel->rd_rel->relkind == RELKIND_INDEX)
+		rel->rd_indam->ambuildempty(rel);
+	else
+	{
+		log_smgrcreate(&rlocator, INIT_FORKNUM);
+		smgrimmedsync(srel, INIT_FORKNUM);
+	}
+
+	/* drop the init fork, mark file then revert persistence at abort */
+	pending = (PendingCleanup *)
+		MemoryContextAlloc(TopMemoryContext, sizeof(PendingCleanup));
+	pending->rlocator = rlocator;
+	pending->op = PCOP_UNLINK_FORK | PCOP_UNLINK_MARK | PCOP_SET_PERSISTENCE;
+	pending->unlink_forknum = INIT_FORKNUM;
+	pending->unlink_mark = SMGR_MARK_UNCOMMITTED;
+	pending->bufpersistence = true;
+	pending->backend = InvalidBackendId;
+	pending->atCommit = false;
+	pending->nestLevel = GetCurrentTransactionNestLevel();
+	pending->next = pendingCleanups;
+	pendingCleanups = pending;
+
+	/* drop mark file at commit */
+	pending = (PendingCleanup *)
+		MemoryContextAlloc(TopMemoryContext, sizeof(PendingCleanup));
+	pending->rlocator = rlocator;
+	pending->op = PCOP_UNLINK_MARK;
+	pending->unlink_forknum = INIT_FORKNUM;
+	pending->unlink_mark = SMGR_MARK_UNCOMMITTED;
+	pending->backend = InvalidBackendId;
+	pending->atCommit = true;
+	pending->nestLevel = GetCurrentTransactionNestLevel();
+	pending->next = pendingCleanups;
+	pendingCleanups = pending;
+}
+
+/*
+ * RelationDropInitFork
+ *		Delete physical storage for the init fork of a relation.
+ */
+void
+RelationDropInitFork(Relation rel)
+{
+	RelFileLocator rlocator = rel->rd_locator;
+	PendingCleanup *pending;
+	PendingCleanup *prev;
+	PendingCleanup *next;
+	bool		inxact_created = false;
+
+	/* switch buffer persistence */
+	SetRelationBuffersPersistence(RelationGetSmgr(rel), true, false);
+
+	/*
+	 * Search for a pending-unlink associated with the init-fork of the
+	 * relation. Its presence indicates that the init-fork was created within
+	 * the current transaction.
+	 */
+	prev = NULL;
+	for (pending = pendingCleanups; pending != NULL; pending = next)
+	{
+		next = pending->next;
+
+		if (RelFileLocatorEquals(rlocator, pending->rlocator) &&
+			pending->unlink_forknum != INIT_FORKNUM)
+		{
+			/* unlink list entry */
+			if (prev)
+				prev->next = next;
+			else
+				pendingCleanups = next;
+
+			pfree(pending);
+
+			/* prev does not change */
+
+			inxact_created = true;
+		}
+		else
+			prev = pending;
+	}
+
+	/*
+	 * If the init-fork was created in this transaction, remove both the
+	 * init-fork and mark file. Otherwise, register an at-commit pending-unlink
+	 * for the existing init-fork. See RelationCreateInitFork.
+	 */
+	if (inxact_created)
+	{
+		SMgrRelation srel = smgropen(rlocator, InvalidBackendId);
+		ForkNumber	 forknum = INIT_FORKNUM;
+		BlockNumber	 firstblock = 0;
+
+		/*
+		 * Some AMs initialize init-fork via the buffer manager. To properly
+		 * drop the init-fork, first drop all buffers for the init-fork, then
+		 * unlink the init-fork and the mark file.
+		 */
+		DropRelationBuffers(srel, &forknum, 1, &firstblock);
+		log_smgrunlinkmark(&rlocator, INIT_FORKNUM, SMGR_MARK_UNCOMMITTED);
+		smgrunlinkmark(srel, INIT_FORKNUM, SMGR_MARK_UNCOMMITTED, false);
+		log_smgrunlink(&rlocator, INIT_FORKNUM);
+		smgrunlink(srel, INIT_FORKNUM, false);
+		return;
+	}
+
+	/* register drop of this init fork file at commit */
+	pending = (PendingCleanup *)
+		MemoryContextAlloc(TopMemoryContext, sizeof(PendingCleanup));
+	pending->rlocator = rlocator;
+	pending->op = PCOP_UNLINK_FORK;
+	pending->unlink_forknum = INIT_FORKNUM;
+	pending->backend = InvalidBackendId;
+	pending->atCommit = true;
+	pending->nestLevel = GetCurrentTransactionNestLevel();
+	pending->next = pendingCleanups;
+	pendingCleanups = pending;
+
+	/* revert buffer-persistence changes at abort */
+	pending = (PendingCleanup *)
+		MemoryContextAlloc(TopMemoryContext, sizeof(PendingCleanup));
+	pending->rlocator = rlocator;
+	pending->op = PCOP_SET_PERSISTENCE;
+	pending->bufpersistence = false;
+	pending->backend = InvalidBackendId;
+	pending->atCommit = false;
+	pending->nestLevel = GetCurrentTransactionNestLevel();
+	pending->next = pendingCleanups;
+	pendingCleanups = pending;
+}
+
 /*
  * Perform XLogInsert of an XLOG_SMGR_CREATE record to WAL.
  */
@@ -305,6 +503,25 @@ log_smgrunlinkmark(const RelFileLocator *rlocator, ForkNumber forkNum,
 	XLogInsert(RM_SMGR_ID, XLOG_SMGR_MARK | XLR_SPECIAL_REL_UPDATE);
 }
 
+/*
+ * Perform XLogInsert of an XLOG_SMGR_BUFPERSISTENCE record to WAL.
+ */
+void
+log_smgrbufpersistence(const RelFileLocator rlocator, bool persistence)
+{
+	xl_smgr_bufpersistence xlrec;
+
+	/*
+	 * Make an XLOG entry reporting the change of buffer persistence.
+	 */
+	xlrec.rlocator = rlocator;
+	xlrec.persistence = persistence;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+	XLogInsert(RM_SMGR_ID, XLOG_SMGR_BUFPERSISTENCE | XLR_SPECIAL_REL_UPDATE);
+}
+
 /*
  * RelationDropStorage
  *		Schedule unlinking of physical storage at transaction commit.
@@ -858,10 +1075,29 @@ smgrDoPendingCleanups(bool isCommit)
 				srel = smgropen(pending->rlocator, pending->backend);
 
 				Assert((pending->op &
-						~(PCOP_UNLINK_FORK | PCOP_UNLINK_MARK)) == 0);
+						~(PCOP_UNLINK_FORK | PCOP_UNLINK_MARK |
+						  PCOP_SET_PERSISTENCE)) == 0);
+
+				if (pending->op & PCOP_SET_PERSISTENCE)
+				{
+					SetRelationBuffersPersistence(srel, pending->bufpersistence,
+												  InRecovery);
+				}
 
 				if (pending->op & PCOP_UNLINK_FORK)
 				{
+					BlockNumber firstblock = 0;
+
+					/*
+					 * Unlink the fork file. Currently this operation is
+					 * applied only to init-forks. As it is not ceratin that
+					 * the init-fork is not loaded on shared buffers, drop all
+					 * buffers for it.
+					 */
+					Assert(pending->unlink_forknum == INIT_FORKNUM);
+					DropRelationBuffers(srel, &pending->unlink_forknum, 1,
+										&firstblock);
+
 					/* Don't emit wal while recovery. */
 					if (!InRecovery)
 						log_smgrunlink(&pending->rlocator,
@@ -1286,8 +1522,8 @@ smgr_redo(XLogReaderState *record)
 		else
 		{
 			/*
-			 * Delete pending action for this mark file if any. We should have
-			 * at most one entry for this action.
+			 * Delete any pending action for this mark file, if present. There
+			 * should be at most one entry for this action.
 			 */
 			PendingCleanup *prev = NULL;
 
@@ -1311,6 +1547,59 @@ smgr_redo(XLogReaderState *record)
 			}
 		}
 	}
+	else if (info == XLOG_SMGR_BUFPERSISTENCE)
+	{
+		xl_smgr_bufpersistence *xlrec =
+		(xl_smgr_bufpersistence *) XLogRecGetData(record);
+		SMgrRelation reln;
+		PendingCleanup *pending;
+		PendingCleanup *prev = NULL;
+
+		reln = smgropen(xlrec->rlocator, InvalidBackendId);
+		SetRelationBuffersPersistence(reln, xlrec->persistence, true);
+
+		/*
+		 * Delete any pending action for persistence change, if present. There
+		 * should be at most one entry for this action.
+		 */
+		for (pending = pendingCleanups; pending != NULL;
+			 pending = pending->next)
+		{
+			if (RelFileLocatorEquals(xlrec->rlocator, pending->rlocator) &&
+				(pending->op & PCOP_SET_PERSISTENCE) != 0)
+			{
+				Assert(pending->bufpersistence == xlrec->persistence);
+
+				if (prev)
+					prev->next = pending->next;
+				else
+					pendingCleanups = pending->next;
+
+				pfree(pending);
+				break;
+			}
+
+			prev = pending;
+		}
+
+		/*
+		 * During abort, revert any changes to buffer persistence made made in
+		 * this transaction.
+		 */
+		if (!pending)
+		{
+			pending = (PendingCleanup *)
+				MemoryContextAlloc(TopMemoryContext, sizeof(PendingCleanup));
+			pending->rlocator = xlrec->rlocator;
+			pending->op = PCOP_SET_PERSISTENCE;
+			pending->bufpersistence = !xlrec->persistence;
+			pending->backend = InvalidBackendId;
+			pending->atCommit = false;
+			pending->nestLevel = GetCurrentTransactionNestLevel();
+			pending->next = pendingCleanups;
+			pendingCleanups = pending;
+		}
+	}
 	else
 		elog(PANIC, "smgr_redo: unknown op code %u", info);
 }
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 727f151750..3192a97b0e 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -55,6 +55,7 @@
 #include "commands/defrem.h"
 #include "commands/event_trigger.h"
 #include "commands/policy.h"
+#include "commands/progress.h"
 #include "commands/sequence.h"
 #include "commands/tablecmds.h"
 #include "commands/tablespace.h"
@@ -5464,6 +5465,188 @@ ATParseTransformCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
 	return newcmd;
 }
 
+/*
+ * RelationChangePersistence: perform in-place persistence change of a relation
+ */
+static void
+RelationChangePersistence(AlteredTableInfo *tab, char persistence,
+						  LOCKMODE lockmode)
+{
+	Relation	rel;
+	Relation	classRel;
+	HeapTuple	tuple,
+				newtuple;
+	Datum		new_val[Natts_pg_class];
+	bool		new_null[Natts_pg_class],
+				new_repl[Natts_pg_class];
+	int			i;
+	List	   *relids;
+	ListCell   *lc_oid;
+
+	Assert(tab->rewrite == AT_REWRITE_ALTER_PERSISTENCE);
+	Assert(lockmode == AccessExclusiveLock);
+
+	/*
+	 * Use ATRewriteTable instead of this function under the following
+	 * condition.
+	 */
+	Assert(tab->constraints == NULL && tab->partition_constraint == NULL &&
+		   tab->newvals == NULL && !tab->verify_new_notnull);
+
+	rel = table_open(tab->relid, lockmode);
+
+	Assert(rel->rd_rel->relpersistence != persistence);
+
+	elog(DEBUG1, "perform in-place persistence change");
+
+	/*
+	 * Initially, gather all relations that require a persistence change.
+	 */
+
+	/* Collect OIDs of indexes and toast relations */
+	relids = RelationGetIndexList(rel);
+	relids = lcons_oid(rel->rd_id, relids);
+
+	/* Add toast relation if any */
+	if (OidIsValid(rel->rd_rel->reltoastrelid))
+	{
+		List	   *toastidx;
+		Relation	toastrel = table_open(rel->rd_rel->reltoastrelid, lockmode);
+
+		relids = lappend_oid(relids, rel->rd_rel->reltoastrelid);
+		toastidx = RelationGetIndexList(toastrel);
+		relids = list_concat(relids, toastidx);
+		pfree(toastidx);
+		table_close(toastrel, NoLock);
+	}
+
+	table_close(rel, NoLock);
+
+	/* Make changes in storage */
+	classRel = table_open(RelationRelationId, RowExclusiveLock);
+
+	foreach(lc_oid, relids)
+	{
+		Oid			reloid = lfirst_oid(lc_oid);
+		Relation	r = relation_open(reloid, lockmode);
+
+		/*
+		 * XXXX: Some access methods don't support in-place persistence
+		 * changes. GiST uses page LSNs to figure out whether a block has been
+		 * modified. However, UNLOGGED GiST indexes use fake LSNs, which are
+		 * incompatible with the real LSNs used for LOGGED indexes.
+		 *
+		 * Potentially, if gistGetFakeLSN behaved similarly for both permanent
+		 * and unlogged indexes, we could avoid index rebuilds by emitting
+		 * extra WAL records while the index is unlogged.
+		 *
+		 * Compare relam against a positive list to ensure the hard way is
+		 * taken for unknown AMs.
+		 */
+		if (r->rd_rel->relkind == RELKIND_INDEX &&
+		/* GiST is excluded */
+			r->rd_rel->relam != BTREE_AM_OID &&
+			r->rd_rel->relam != HASH_AM_OID &&
+			r->rd_rel->relam != GIN_AM_OID &&
+			r->rd_rel->relam != SPGIST_AM_OID &&
+			r->rd_rel->relam != BRIN_AM_OID)
+		{
+			int			reindex_flags;
+			ReindexParams params = {0};
+
+			/* reindex doesn't allow concurrent use of the index */
+			table_close(r, NoLock);
+
+			reindex_flags =
+				REINDEX_REL_SUPPRESS_INDEX_USE |
+				REINDEX_REL_CHECK_CONSTRAINTS;
+
+			/* Set the same persistence with the parent relation. */
+			if (persistence == RELPERSISTENCE_UNLOGGED)
+				reindex_flags |= REINDEX_REL_FORCE_INDEXES_UNLOGGED;
+			else
+				reindex_flags |= REINDEX_REL_FORCE_INDEXES_PERMANENT;
+
+			reindex_index(reloid, reindex_flags, persistence, &params);
+
+			continue;
+		}
+
+		/* Create or drop init fork */
+		if (persistence == RELPERSISTENCE_UNLOGGED)
+			RelationCreateInitFork(r);
+		else
+			RelationDropInitFork(r);
+
+		/*
+		 * If this relation becomes WAL-logged, immediately sync all files
+		 * except the init-fork to establish the initial state on storage.  The
+		 * buffers should have already been flushed out by
+		 * RelationCreate(Drop)InitFork called just above. The init-fork should
+		 * already be synchronized as required.
+		 */
+		if (persistence == RELPERSISTENCE_PERMANENT)
+		{
+			for (i = 0; i < INIT_FORKNUM; i++)
+			{
+				if (smgrexists(RelationGetSmgr(r), i))
+					smgrimmedsync(RelationGetSmgr(r), i);
+			}
+		}
+
+		/* Update catalog */
+		tuple = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(reloid));
+		if (!HeapTupleIsValid(tuple))
+			elog(ERROR, "cache lookup failed for relation %u", reloid);
+
+		memset(new_val, 0, sizeof(new_val));
+		memset(new_null, false, sizeof(new_null));
+		memset(new_repl, false, sizeof(new_repl));
+
+		new_val[Anum_pg_class_relpersistence - 1] = CharGetDatum(persistence);
+		new_null[Anum_pg_class_relpersistence - 1] = false;
+		new_repl[Anum_pg_class_relpersistence - 1] = true;
+
+		newtuple = heap_modify_tuple(tuple, RelationGetDescr(classRel),
+									 new_val, new_null, new_repl);
+
+		CatalogTupleUpdate(classRel, &newtuple->t_self, newtuple);
+		heap_freetuple(newtuple);
+
+		/*
+		 * If wal_level >= replica, switching to LOGGED necessitates WAL-logging
+		 * the relation content for later recovery. This is not emitted when
+		 * wal_level = minimal.
+		 */
+		if (persistence == RELPERSISTENCE_PERMANENT && XLogIsNeeded())
+		{
+			ForkNumber	fork;
+			xl_smgr_truncate xlrec;
+
+			xlrec.blkno = 0;
+			xlrec.rlocator = r->rd_locator;
+			xlrec.flags = SMGR_TRUNCATE_ALL;
+
+			XLogBeginInsert();
+			XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+
+			XLogInsert(RM_SMGR_ID, XLOG_SMGR_TRUNCATE | XLR_SPECIAL_REL_UPDATE);
+
+			for (fork = 0; fork < INIT_FORKNUM; fork++)
+			{
+				if (smgrexists(RelationGetSmgr(r), fork))
+					log_newpage_range(r, fork, 0,
+									  smgrnblocks(RelationGetSmgr(r), fork),
+									  false);
+			}
+		}
+
+		table_close(r, NoLock);
+	}
+
+	table_close(classRel, NoLock);
+}
+
 /*
  * ATRewriteTables: ALTER TABLE phase 3
  */
@@ -5594,48 +5777,55 @@ ATRewriteTables(AlterTableStmt *parsetree, List **wqueue, LOCKMODE lockmode,
 										 tab->relid,
 										 tab->rewrite);
 
-			/*
-			 * Create transient table that will receive the modified data.
-			 *
-			 * Ensure it is marked correctly as logged or unlogged.  We have
-			 * to do this here so that buffers for the new relfilenumber will
-			 * have the right persistence set, and at the same time ensure
-			 * that the original filenumbers's buffers will get read in with
-			 * the correct setting (i.e. the original one).  Otherwise a
-			 * rollback after the rewrite would possibly result with buffers
-			 * for the original filenumbers having the wrong persistence
-			 * setting.
-			 *
-			 * NB: This relies on swap_relation_files() also swapping the
-			 * persistence. That wouldn't work for pg_class, but that can't be
-			 * unlogged anyway.
-			 */
-			OIDNewHeap = make_new_heap(tab->relid, NewTableSpace, NewAccessMethod,
-									   persistence, lockmode);
+			if (tab->rewrite == AT_REWRITE_ALTER_PERSISTENCE)
+				RelationChangePersistence(tab, persistence, lockmode);
+			else
+			{
+				/*
+				 * Create transient table that will receive the modified data.
+				 *
+				 * Ensure it is marked correctly as logged or unlogged.  We
+				 * have to do this here so that buffers for the new
+				 * relfilenumber will have the right persistence set, and at
+				 * the same time ensure that the original filenumbers's buffers
+				 * will get read in with the correct setting (i.e. the original
+				 * one).  Otherwise a rollback after the rewrite would possibly
+				 * result with buffers for the original filenumbers having the
+				 * wrong persistence setting.
+				 *
+				 * NB: This relies on swap_relation_files() also swapping the
+				 * persistence. That wouldn't work for pg_class, but that
+				 * can't be unlogged anyway.
+				 */
+				OIDNewHeap = make_new_heap(tab->relid, NewTableSpace,
+										   NewAccessMethod,
+										   persistence, lockmode);
 
-			/*
-			 * Copy the heap data into the new table with the desired
-			 * modifications, and test the current data within the table
-			 * against new constraints generated by ALTER TABLE commands.
-			 */
-			ATRewriteTable(tab, OIDNewHeap, lockmode);
+				/*
+				 * Copy the heap data into the new table with the desired
+				 * modifications, and test the current data within the table
+				 * against new constraints generated by ALTER TABLE commands.
+				 */
+				ATRewriteTable(tab, OIDNewHeap, lockmode);
 
-			/*
-			 * Swap the physical files of the old and new heaps, then rebuild
-			 * indexes and discard the old heap.  We can use RecentXmin for
-			 * the table's new relfrozenxid because we rewrote all the tuples
-			 * in ATRewriteTable, so no older Xid remains in the table.  Also,
-			 * we never try to swap toast tables by content, since we have no
-			 * interest in letting this code work on system catalogs.
-			 */
-			finish_heap_swap(tab->relid, OIDNewHeap,
-							 false, false, true,
-							 !OidIsValid(tab->newTableSpace),
-							 RecentXmin,
-							 ReadNextMultiXactId(),
-							 persistence);
-
-			InvokeObjectPostAlterHook(RelationRelationId, tab->relid, 0);
+				/*
+				 * Swap the physical files of the old and new heaps, then
+				 * rebuild indexes and discard the old heap.  We can use
+				 * RecentXmin for the table's new relfrozenxid because we
+				 * rewrote all the tuples in ATRewriteTable, so no older Xid
+				 * remains in the table.  Also, we never try to swap toast
+				 * tables by content, since we have no interest in letting
+				 * this code work on system catalogs.
+				 */
+				finish_heap_swap(tab->relid, OIDNewHeap,
+								 false, false, true,
+								 !OidIsValid(tab->newTableSpace),
+								 RecentXmin,
+								 ReadNextMultiXactId(),
+								 persistence);
+
+				InvokeObjectPostAlterHook(RelationRelationId, tab->relid, 0);
+			}
 		}
 		else if (tab->rewrite > 0 && tab->relkind == RELKIND_SEQUENCE)
 		{
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index df22aaa1c5..7d21fd5ac5 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -3708,6 +3708,90 @@ DropRelationBuffers(SMgrRelation smgr_reln, ForkNumber *forkNum,
 	}
 }
 
+/* ---------------------------------------------------------------------
+ *		SetRelationBuffersPersistence
+ *
+ *		This function changes the persistence of all buffer pages of a relation
+ *		then writes all dirty pages to disk (or kernel disk buffers) when
+ *		switching to PERMANENT, ensuring the kernel has an up-to-date view of
+ *		the relation.
+ *
+ *		The caller must be holding AccessExclusiveLock on the target relation
+ *		to ensure no other backend is busy dirtying more blocks.
+ *
+ *		XXX currently it sequentially searches the buffer pool; consider
+ *		implementing more efficient search methods.  This routine isn't used in
+ *		performance-critical code paths, so it's not worth additional overhead
+ *		to make it go faster; see also DropRelationBuffers.
+ *		--------------------------------------------------------------------
+ */
+void
+SetRelationBuffersPersistence(SMgrRelation srel, bool permanent, bool isRedo)
+{
+	int			i;
+	RelFileLocatorBackend rlocator = srel->smgr_rlocator;
+
+	Assert(!RelFileLocatorBackendIsTemp(rlocator));
+
+	if (!isRedo)
+		log_smgrbufpersistence(srel->smgr_rlocator.locator, permanent);
+
+	ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
+
+	for (i = 0; i < NBuffers; i++)
+	{
+		BufferDesc *bufHdr = GetBufferDescriptor(i);
+		uint32		buf_state;
+
+		if (!RelFileLocatorEquals(BufTagGetRelFileLocator(&bufHdr->tag),
+								  rlocator.locator))
+			continue;
+
+		ReservePrivateRefCountEntry();
+
+		buf_state = LockBufHdr(bufHdr);
+
+		if (!RelFileLocatorEquals(BufTagGetRelFileLocator(&bufHdr->tag),
+								  rlocator.locator))
+		{
+			UnlockBufHdr(bufHdr, buf_state);
+			continue;
+		}
+
+		if (permanent)
+		{
+			/* The init fork is being dropped, drop buffers for it. */
+			if (BufTagGetForkNum(&bufHdr->tag) == INIT_FORKNUM)
+			{
+				InvalidateBuffer(bufHdr);
+				continue;
+			}
+
+			buf_state |= BM_PERMANENT;
+			pg_atomic_write_u32(&bufHdr->state, buf_state);
+
+			/* flush this buffer when switching to PERMANENT */
+			if ((buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY))
+			{
+				PinBuffer_Locked(bufHdr);
+				LWLockAcquire(BufferDescriptorGetContentLock(bufHdr),
+							  LW_SHARED);
+				FlushBuffer(bufHdr, srel, IOOBJECT_RELATION, IOCONTEXT_NORMAL);
+				LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
+				UnpinBuffer(bufHdr);
+			}
+			else
+				UnlockBufHdr(bufHdr, buf_state);
+		}
+		else
+		{
+			/* There shouldn't be an init fork */
+			Assert(BufTagGetForkNum(&bufHdr->tag) != INIT_FORKNUM);
+			UnlockBufHdr(bufHdr, buf_state);
+		}
+	}
+}
+
 /* ---------------------------------------------------------------------
  *		DropRelationsAllBuffers
  *
diff --git a/src/backend/storage/file/reinit.c b/src/backend/storage/file/reinit.c
index e84fcbf884..a5d8763e15 100644
--- a/src/backend/storage/file/reinit.c
+++ b/src/backend/storage/file/reinit.c
@@ -38,6 +38,7 @@ typedef struct
 {
 	RelFileNumber	relNumber;		/* hash key */
 	bool			has_init;		/* has INIT fork */
+	bool			dirty_init;		/* needs to remove INIT fork */
 	bool			dirty_all;		/* needs to remove all forks */
 }  relfile_entry;
 
@@ -45,7 +46,10 @@ typedef struct
  * Clean up and reset relation files from before the last restart.
  *
  * If op includes UNLOGGED_RELATION_CLEANUP, we perform different operations
- * depending on the existence of mark files.
+ * depending on the existence of the "cleanup" forks.
+ *
+ * If SMGR_MARK_UNCOMMITTED mark file for init fork is present, we remove the
+ * init fork along with the mark file.
  *
  * If SMGR_MARK_UNCOMMITTED mark file for main fork is present we remove the
  * whole relation along with the mark file.
@@ -54,7 +58,7 @@ typedef struct
  * with the "init" fork, except for the "init" fork itself.
  *
  * If op includes UNLOGGED_RELATION_DROP_BUFFER, we drop all buffers for all
- * relations that are to be cleaned up.
+ * relations that have the "cleanup" and/or the "init" forks.
  *
  * If op includes UNLOGGED_RELATION_INIT, we copy the "init" fork to the main
  * fork.
@@ -241,7 +245,7 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname,
 			 * Put the OID portion of the name into the hash table,
 			 * if it isn't already.  If it has SMGR_MARK_UNCOMMITTED mark
 			 * files, the storage file is in dirty state, where clean up is
-			 * needed.
+			 * needed.  isn't already.
 			 */
 			key = atooid(de->d_name);
 			ent = hash_search(hash, &key, HASH_ENTER, &found);
@@ -249,10 +253,13 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname,
 			if (!found)
 			{
 				ent->has_init = false;
+				ent->dirty_init = false;
 				ent->dirty_all = false;
 			}
 
-			if (forkNum == MAIN_FORKNUM && mark == SMGR_MARK_UNCOMMITTED)
+			if (forkNum == INIT_FORKNUM && mark == SMGR_MARK_UNCOMMITTED)
+				ent->dirty_init = true;
+			else if (forkNum == MAIN_FORKNUM && mark == SMGR_MARK_UNCOMMITTED)
 				ent->dirty_all = true;
 			else
 			{
@@ -276,11 +283,10 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname,
 	{
 		/*
 		 * When we come here after recovery, smgr object for this file might
-		 * have been created. In that case we need to drop all buffers then the
-		 * smgr object.  Otherwise checkpointer wrongly tries to flush buffers
-		 * for nonexistent relation storage. This is safe as far as no other
-		 * backends have accessed the relation before starting archive
-		 * recovery.
+		 * have been created. In that case we need to drop all buffers then
+		 * the smgr object before initializing the unlogged relation.  This is
+		 * safe as far as no other backends have accessed the relation before
+		 * starting archive recovery.
 		 */
 		HASH_SEQ_STATUS status;
 		relfile_entry *ent;
@@ -296,6 +302,13 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname,
 		{
 			RelFileLocatorBackend rel;
 
+			/*
+			 * The relation is persistent and stays persistent. Don't drop the
+			 * buffers for this relation.
+			 */
+			if (ent->has_init && ent->dirty_init)
+				continue;
+
 			if (maxrels <= nrels)
 			{
 				maxrels *= 2;
@@ -352,8 +365,24 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname,
 				if (!ent->has_init)
 					continue;
 
-				if (forkNum == INIT_FORKNUM && mark == SMGR_MARK_NONE)
-					continue;
+				if (ent->dirty_init)
+				{
+					/*
+					 * The crashed transaction did SET UNLOGGED. This relation
+					 * is restored to a LOGGED relation.
+					 */
+					if (forkNum != INIT_FORKNUM)
+						continue;
+				}
+				else
+				{
+					/*
+					 * we don't remove the INIT fork of a non-dirty
+					 * relation files.
+					 */
+					if (forkNum == INIT_FORKNUM && mark == SMGR_MARK_NONE)
+						continue;
+				}
 			}
 
 			/* so, nuke it! */
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index e9e4bafb01..ddc8014e55 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -434,6 +434,12 @@ extractPageInfo(XLogReaderState *record)
 		 * empty so we don't need to bother the content.
 		 */
 	}
+	else if (rmid == RM_SMGR_ID && rminfo == XLOG_SMGR_BUFPERSISTENCE)
+	{
+		/*
+		 * We can safely ignore these. These don't make any on-disk changes.
+		 */
+	}
 	else if (rmid == RM_XACT_ID &&
 			 ((rminfo & XLOG_XACT_OPMASK) == XLOG_XACT_COMMIT ||
 			  (rminfo & XLOG_XACT_OPMASK) == XLOG_XACT_COMMIT_PREPARED ||
diff --git a/src/include/catalog/storage_xlog.h b/src/include/catalog/storage_xlog.h
index a36646c6ee..847660b6af 100644
--- a/src/include/catalog/storage_xlog.h
+++ b/src/include/catalog/storage_xlog.h
@@ -62,6 +62,12 @@ typedef struct xl_smgr_mark
 	smgr_mark_action action;
 } xl_smgr_mark;
 
+typedef struct xl_smgr_bufpersistence
+{
+	RelFileLocator rlocator;
+	bool		persistence;
+} xl_smgr_bufpersistence;
+
 /* flags for xl_smgr_truncate */
 #define SMGR_TRUNCATE_HEAP		0x0001
 #define SMGR_TRUNCATE_VM		0x0002
@@ -82,6 +88,8 @@ extern void log_smgrcreatemark(const RelFileLocator *rlocator,
 							   ForkNumber forkNum, StorageMarks mark);
 extern void log_smgrunlinkmark(const RelFileLocator *rlocator,
 							   ForkNumber forkNum, StorageMarks mark);
+extern void log_smgrbufpersistence(const RelFileLocator rlocator,
+								   bool persistence);
 
 extern void smgr_redo(XLogReaderState *record);
 extern void smgr_desc(StringInfo buf, XLogReaderState *record);
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 0f5fb6be00..e3d9273710 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -222,6 +222,8 @@ extern void DropRelationBuffers(struct SMgrRelationData *smgr_reln,
 								int nforks, BlockNumber *firstDelBlock);
 extern void DropRelationsAllBuffers(struct SMgrRelationData **smgr_reln,
 									int nlocators);
+extern void SetRelationBuffersPersistence(struct SMgrRelationData *srel,
+										  bool permanent, bool isRedo);
 extern void DropDatabaseBuffers(Oid dbid);
 
 #define RelationGetNumberOfBlocks(reln) \
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 057e3d3104..f72c5c8a9e 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3939,6 +3939,7 @@ xl_replorigin_set
 xl_restore_point
 xl_running_xacts
 xl_seq_rec
+xl_smgr_bufpersistence
 xl_smgr_create
 xl_smgr_mark
 xl_smgr_truncate
-- 
2.25.1

>From 429251a8d0db5cef2a21226a969ea6cc482b811d Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyota....@gmail.com>
Date: Tue, 25 Apr 2023 16:12:23 +0900
Subject: [PATCH v29 3/3] New command ALTER TABLE ALL IN TABLESPACE SET
 LOGGED/UNLOGGED

Simplifies ALTER TABLE SET LOGGED/UNLOGGED invocation by allowing
users to specify relations based on tablespace or owner.
---
 doc/src/sgml/ref/alter_table.sgml        |  15 +++
 src/backend/commands/tablecmds.c         | 140 +++++++++++++++++++++++
 src/backend/parser/gram.y                |  42 +++++++
 src/backend/tcop/utility.c               |  11 ++
 src/include/commands/tablecmds.h         |   2 +
 src/include/nodes/parsenodes.h           |  10 ++
 src/test/regress/expected/tablespace.out |  76 ++++++++++++
 src/test/regress/sql/tablespace.sql      |  41 +++++++
 src/tools/pgindent/typedefs.list         |   1 +
 9 files changed, 338 insertions(+)

diff --git a/doc/src/sgml/ref/alter_table.sgml b/doc/src/sgml/ref/alter_table.sgml
index d4d93eeb7c..7ee09ca9cf 100644
--- a/doc/src/sgml/ref/alter_table.sgml
+++ b/doc/src/sgml/ref/alter_table.sgml
@@ -33,6 +33,8 @@ ALTER TABLE [ IF EXISTS ] <replaceable class="parameter">name</replaceable>
     SET SCHEMA <replaceable class="parameter">new_schema</replaceable>
 ALTER TABLE ALL IN TABLESPACE <replaceable class="parameter">name</replaceable> [ OWNED BY <replaceable class="parameter">role_name</replaceable> [, ... ] ]
     SET TABLESPACE <replaceable class="parameter">new_tablespace</replaceable> [ NOWAIT ]
+ALTER TABLE ALL IN TABLESPACE <replaceable class="parameter">name</replaceable> [ OWNED BY <replaceable class="parameter">role_name</replaceable> [, ... ] ]
+    SET { LOGGED | UNLOGGED } [ NOWAIT ]
 ALTER TABLE [ IF EXISTS ] <replaceable class="parameter">name</replaceable>
     ATTACH PARTITION <replaceable class="parameter">partition_name</replaceable> { FOR VALUES <replaceable class="parameter">partition_bound_spec</replaceable> | DEFAULT }
 ALTER TABLE [ IF EXISTS ] <replaceable class="parameter">name</replaceable>
@@ -769,6 +771,19 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
       (for identity or serial columns).  However, it is also possible to
       change the persistence of such sequences separately.
      </para>
+     <para>
+      All tables in the current database in a tablespace can be changed by
+      using the <literal>ALL IN TABLESPACE</literal> form, which will first
+      lock all tables to be changed and then change each one.  This form also
+      supports
+      <literal>OWNED BY</literal>, which will only change tables owned by the
+      specified roles.  If the <literal>NOWAIT</literal> option is specified,
+      then the command will fail if it is unable to immediately acquire all of
+      the locks required.  The <literal>information_schema</literal> relations
+      are not considered part of the system catalogs and will be changed.  See
+      also
+      <link linkend="sql-createtablespace"><command>CREATE TABLESPACE</command></link>.
+     </para>
     </listitem>
    </varlistentry>
 
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 3192a97b0e..24c6dd2aeb 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -14966,6 +14966,146 @@ AlterTableMoveAll(AlterTableMoveAllStmt *stmt)
 	return new_tablespaceoid;
 }
 
+/*
+ * Alter Table ALL ... SET LOGGED/UNLOGGED
+ *
+ * Allows a user to modify the persistence of all objects in a specific
+ * tablespace in the current database.  Objects can be filtered by owner,
+ * enabling users to update the persistence of only their objects. The primary
+ * permission handling is managed by the lower-level change persistence
+ * function.
+ *
+ * All objects to be modified are locked first. If NOWAIT is specified and the
+ * lock can't be acquired, an ERROR is thrown.
+ */
+void
+AlterTableSetLoggedAll(AlterTableSetLoggedAllStmt * stmt)
+{
+	List	   *relations = NIL;
+	ListCell   *l;
+	ScanKeyData key[1];
+	Relation	rel;
+	TableScanDesc scan;
+	HeapTuple	tuple;
+	Oid			tablespaceoid;
+	List	   *role_oids = roleSpecsToIds(stmt->roles);
+
+	/* Ensure we were not asked to change something we can't */
+	if (stmt->objtype != OBJECT_TABLE)
+		ereport(ERROR,
+				errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				errmsg("only tables can be specified"));
+
+	/* Get the tablespace OID */
+	tablespaceoid = get_tablespace_oid(stmt->tablespacename, false);
+
+	/*
+	 * Now that the checks are done, check if we should set either to
+	 * InvalidOid because it is our database's default tablespace.
+	 */
+	if (tablespaceoid == MyDatabaseTableSpace)
+		tablespaceoid = InvalidOid;
+
+	/*
+	 * Walk the list of objects in the tablespace to pick up them. This will
+	 * only find objects in our database, of course.
+	 */
+	ScanKeyInit(&key[0],
+				Anum_pg_class_reltablespace,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(tablespaceoid));
+
+	rel = table_open(RelationRelationId, AccessShareLock);
+	scan = table_beginscan_catalog(rel, 1, key);
+	while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+	{
+		Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
+		Oid			relOid = relForm->oid;
+
+		/*
+		 * Do not pick-up objects in pg_catalog as part of this, if an admin
+		 * really wishes to do so, they can issue the individual ALTER
+		 * commands directly.
+		 *
+		 * Also, explicitly avoid any shared tables, temp tables, or TOAST
+		 * (TOAST will be changed with the main table).
+		 */
+		if (IsCatalogNamespace(relForm->relnamespace) ||
+			relForm->relisshared ||
+			isAnyTempNamespace(relForm->relnamespace) ||
+			IsToastNamespace(relForm->relnamespace))
+			continue;
+
+		/* Only pick up the object type requested */
+		if (relForm->relkind != RELKIND_RELATION)
+			continue;
+
+		/* Check if we are only picking-up objects owned by certain roles */
+		if (role_oids != NIL && !list_member_oid(role_oids, relForm->relowner))
+			continue;
+
+		/*
+		 * Handle permissions-checking here since we are locking the tables
+		 * and also to avoid doing a bunch of work only to fail part-way. Note
+		 * that permissions will also be checked by AlterTableInternal().
+		 *
+		 * Caller must be considered an owner on the table of which we're
+		 * going to change persistence.
+		 */
+		if (!object_ownercheck(RelationRelationId, relOid, GetUserId()))
+			aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(get_rel_relkind(relOid)),
+						   NameStr(relForm->relname));
+
+		if (stmt->nowait &&
+			!ConditionalLockRelationOid(relOid, AccessExclusiveLock))
+			ereport(ERROR,
+					errcode(ERRCODE_OBJECT_IN_USE),
+					errmsg("aborting because lock on relation \"%s.%s\" is not available",
+						   get_namespace_name(relForm->relnamespace),
+						   NameStr(relForm->relname)));
+		else
+			LockRelationOid(relOid, AccessExclusiveLock);
+
+		/*
+		 * Add to our list of objects of which we're going to change
+		 * persistence.
+		 */
+		relations = lappend_oid(relations, relOid);
+	}
+
+	table_endscan(scan);
+	table_close(rel, AccessShareLock);
+
+	if (relations == NIL)
+		ereport(NOTICE,
+				errcode(ERRCODE_NO_DATA_FOUND),
+				errmsg("no matching relations in tablespace \"%s\" found",
+					   tablespaceoid == InvalidOid ? "(database default)" :
+					   get_tablespace_name(tablespaceoid)));
+
+	/*
+	 * Everything is locked, loop through and change persistence of all of the
+	 * relations.
+	 */
+	foreach(l, relations)
+	{
+		List	   *cmds = NIL;
+		AlterTableCmd *cmd = makeNode(AlterTableCmd);
+
+		if (stmt->logged)
+			cmd->subtype = AT_SetLogged;
+		else
+			cmd->subtype = AT_SetUnLogged;
+
+		cmds = lappend(cmds, cmd);
+
+		EventTriggerAlterTableStart((Node *) stmt);
+		/* OID is set by AlterTableInternal */
+		AlterTableInternal(lfirst_oid(l), cmds, false);
+		EventTriggerAlterTableEnd();
+	}
+}
+
 static void
 index_copy_data(Relation rel, RelFileLocator newrlocator)
 {
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index b3bdf947b6..5b2ae92608 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -2110,6 +2110,48 @@ AlterTableStmt:
 					n->nowait = $13;
 					$$ = (Node *) n;
 				}
+		|	ALTER TABLE ALL IN_P TABLESPACE name SET LOGGED opt_nowait
+				{
+					AlterTableSetLoggedAllStmt *n =
+						makeNode(AlterTableSetLoggedAllStmt);
+					n->tablespacename = $6;
+					n->objtype = OBJECT_TABLE;
+					n->logged = true;
+					n->nowait = $9;
+					$$ = (Node *)n;
+				}
+		|	ALTER TABLE ALL IN_P TABLESPACE name OWNED BY role_list SET LOGGED opt_nowait
+				{
+					AlterTableSetLoggedAllStmt *n =
+						makeNode(AlterTableSetLoggedAllStmt);
+					n->tablespacename = $6;
+					n->objtype = OBJECT_TABLE;
+					n->roles = $9;
+					n->logged = true;
+					n->nowait = $12;
+					$$ = (Node *)n;
+				}
+		|	ALTER TABLE ALL IN_P TABLESPACE name SET UNLOGGED opt_nowait
+				{
+					AlterTableSetLoggedAllStmt *n =
+						makeNode(AlterTableSetLoggedAllStmt);
+					n->tablespacename = $6;
+					n->objtype = OBJECT_TABLE;
+					n->logged = false;
+					n->nowait = $9;
+					$$ = (Node *)n;
+				}
+		|	ALTER TABLE ALL IN_P TABLESPACE name OWNED BY role_list SET UNLOGGED opt_nowait
+				{
+					AlterTableSetLoggedAllStmt *n =
+						makeNode(AlterTableSetLoggedAllStmt);
+					n->tablespacename = $6;
+					n->objtype = OBJECT_TABLE;
+					n->roles = $9;
+					n->logged = false;
+					n->nowait = $12;
+					$$ = (Node *)n;
+				}
 		|	ALTER INDEX qualified_name alter_table_cmds
 				{
 					AlterTableStmt *n = makeNode(AlterTableStmt);
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index e3ccf6c7f7..fcf550c839 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -164,6 +164,7 @@ ClassifyUtilityCommandAsReadOnly(Node *parsetree)
 		case T_AlterTSConfigurationStmt:
 		case T_AlterTSDictionaryStmt:
 		case T_AlterTableMoveAllStmt:
+		case T_AlterTableSetLoggedAllStmt:
 		case T_AlterTableSpaceOptionsStmt:
 		case T_AlterTableStmt:
 		case T_AlterTypeStmt:
@@ -1771,6 +1772,12 @@ ProcessUtilitySlow(ParseState *pstate,
 				commandCollected = true;
 				break;
 
+			case T_AlterTableSetLoggedAllStmt:
+				AlterTableSetLoggedAll((AlterTableSetLoggedAllStmt *) parsetree);
+				/* commands are stashed in AlterTableSetLoggedAll */
+				commandCollected = true;
+				break;
+
 			case T_DropStmt:
 				ExecDropStmt((DropStmt *) parsetree, isTopLevel);
 				/* no commands stashed for DROP */
@@ -2699,6 +2706,10 @@ CreateCommandTag(Node *parsetree)
 			tag = AlterObjectTypeCommandTag(((AlterTableMoveAllStmt *) parsetree)->objtype);
 			break;
 
+		case T_AlterTableSetLoggedAllStmt:
+			tag = AlterObjectTypeCommandTag(((AlterTableSetLoggedAllStmt *) parsetree)->objtype);
+			break;
+
 		case T_AlterTableStmt:
 			tag = AlterObjectTypeCommandTag(((AlterTableStmt *) parsetree)->objtype);
 			break;
diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h
index 16b6126669..28ef0dc8c0 100644
--- a/src/include/commands/tablecmds.h
+++ b/src/include/commands/tablecmds.h
@@ -42,6 +42,8 @@ extern void AlterTableInternal(Oid relid, List *cmds, bool recurse);
 
 extern Oid	AlterTableMoveAll(AlterTableMoveAllStmt *stmt);
 
+extern void AlterTableSetLoggedAll(AlterTableSetLoggedAllStmt * stmt);
+
 extern ObjectAddress AlterTableNamespace(AlterObjectSchemaStmt *stmt,
 										 Oid *oldschema);
 
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 2565348303..373c8c5650 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -2681,6 +2681,16 @@ typedef struct AlterTableMoveAllStmt
 	bool		nowait;
 } AlterTableMoveAllStmt;
 
+typedef struct AlterTableSetLoggedAllStmt
+{
+	NodeTag		type;
+	char	   *tablespacename;
+	ObjectType	objtype;		/* Object type to move */
+	List	   *roles;			/* List of roles to change objects of */
+	bool		logged;
+	bool		nowait;
+}			AlterTableSetLoggedAllStmt;
+
 /* ----------------------
  *		Create/Alter Extension Statements
  * ----------------------
diff --git a/src/test/regress/expected/tablespace.out b/src/test/regress/expected/tablespace.out
index 9aabb85349..35b150b297 100644
--- a/src/test/regress/expected/tablespace.out
+++ b/src/test/regress/expected/tablespace.out
@@ -964,5 +964,81 @@ drop cascades to table testschema.part
 drop cascades to table testschema.atable
 drop cascades to materialized view testschema.amv
 drop cascades to table testschema.tablespace_acl
+--
+-- Check persistence change in a tablespace
+CREATE SCHEMA testschema;
+GRANT CREATE ON SCHEMA testschema TO regress_tablespace_user1;
+CREATE TABLESPACE regress_tablespace LOCATION '';
+GRANT CREATE ON TABLESPACE regress_tablespace TO regress_tablespace_user1;
+CREATE TABLE testschema.lsu(a int) TABLESPACE regress_tablespace;
+CREATE UNLOGGED TABLE testschema.usu(a int) TABLESPACE regress_tablespace;
+CREATE TABLE testschema._lsu(a int) TABLESPACE pg_default;
+CREATE UNLOGGED TABLE testschema._usu(a int) TABLESPACE pg_default;
+SET ROLE regress_tablespace_user1;
+CREATE TABLE testschema.lu1(a int) TABLESPACE regress_tablespace;
+CREATE UNLOGGED TABLE testschema.uu1(a int) TABLESPACE regress_tablespace;
+CREATE TABLE testschema._lu1(a int) TABLESPACE pg_default;
+CREATE UNLOGGED TABLE testschema._uu1(a int) TABLESPACE pg_default;
+SELECT relname, t.spcname, relpersistence
+ FROM pg_class c LEFT JOIN pg_tablespace t ON (c.reltablespace = t.oid)
+ WHERE relnamespace = 'testschema'::regnamespace ORDER BY spcname, c.oid;
+ relname |      spcname       | relpersistence 
+---------+--------------------+----------------
+ lsu     | regress_tablespace | p
+ usu     | regress_tablespace | u
+ lu1     | regress_tablespace | p
+ uu1     | regress_tablespace | u
+ _lsu    |                    | p
+ _usu    |                    | u
+ _lu1    |                    | p
+ _uu1    |                    | u
+(8 rows)
+
+ALTER TABLE ALL IN TABLESPACE regress_tablespace
+	  OWNED BY regress_tablespace_user1 SET LOGGED;
+SELECT relname, t.spcname, relpersistence
+ FROM pg_class c LEFT JOIN pg_tablespace t ON (c.reltablespace = t.oid)
+ WHERE relnamespace = 'testschema'::regnamespace ORDER BY spcname, c.oid;
+ relname |      spcname       | relpersistence 
+---------+--------------------+----------------
+ lsu     | regress_tablespace | p
+ usu     | regress_tablespace | u
+ lu1     | regress_tablespace | p
+ uu1     | regress_tablespace | p
+ _lsu    |                    | p
+ _usu    |                    | u
+ _lu1    |                    | p
+ _uu1    |                    | u
+(8 rows)
+
+RESET ROLE;
+ALTER TABLE ALL IN TABLESPACE regress_tablespace SET UNLOGGED;
+SELECT relname, t.spcname, relpersistence
+ FROM pg_class c LEFT JOIN pg_tablespace t ON (c.reltablespace = t.oid)
+ WHERE relnamespace = 'testschema'::regnamespace ORDER BY spcname, c.oid;
+ relname |      spcname       | relpersistence 
+---------+--------------------+----------------
+ lsu     | regress_tablespace | u
+ usu     | regress_tablespace | u
+ lu1     | regress_tablespace | u
+ uu1     | regress_tablespace | u
+ _lsu    |                    | p
+ _usu    |                    | u
+ _lu1    |                    | p
+ _uu1    |                    | u
+(8 rows)
+
+-- Should succeed
+DROP SCHEMA testschema CASCADE;
+NOTICE:  drop cascades to 8 other objects
+DETAIL:  drop cascades to table testschema.lsu
+drop cascades to table testschema.usu
+drop cascades to table testschema._lsu
+drop cascades to table testschema._usu
+drop cascades to table testschema.lu1
+drop cascades to table testschema.uu1
+drop cascades to table testschema._lu1
+drop cascades to table testschema._uu1
+DROP TABLESPACE regress_tablespace;
 DROP ROLE regress_tablespace_user1;
 DROP ROLE regress_tablespace_user2;
diff --git a/src/test/regress/sql/tablespace.sql b/src/test/regress/sql/tablespace.sql
index d274d9615e..eb8e247a1d 100644
--- a/src/test/regress/sql/tablespace.sql
+++ b/src/test/regress/sql/tablespace.sql
@@ -429,5 +429,46 @@ DROP TABLESPACE regress_tblspace_renamed;
 
 DROP SCHEMA testschema CASCADE;
 
+
+--
+-- Check persistence change in a tablespace
+CREATE SCHEMA testschema;
+GRANT CREATE ON SCHEMA testschema TO regress_tablespace_user1;
+CREATE TABLESPACE regress_tablespace LOCATION '';
+GRANT CREATE ON TABLESPACE regress_tablespace TO regress_tablespace_user1;
+
+CREATE TABLE testschema.lsu(a int) TABLESPACE regress_tablespace;
+CREATE UNLOGGED TABLE testschema.usu(a int) TABLESPACE regress_tablespace;
+CREATE TABLE testschema._lsu(a int) TABLESPACE pg_default;
+CREATE UNLOGGED TABLE testschema._usu(a int) TABLESPACE pg_default;
+SET ROLE regress_tablespace_user1;
+CREATE TABLE testschema.lu1(a int) TABLESPACE regress_tablespace;
+CREATE UNLOGGED TABLE testschema.uu1(a int) TABLESPACE regress_tablespace;
+CREATE TABLE testschema._lu1(a int) TABLESPACE pg_default;
+CREATE UNLOGGED TABLE testschema._uu1(a int) TABLESPACE pg_default;
+
+SELECT relname, t.spcname, relpersistence
+ FROM pg_class c LEFT JOIN pg_tablespace t ON (c.reltablespace = t.oid)
+ WHERE relnamespace = 'testschema'::regnamespace ORDER BY spcname, c.oid;
+
+ALTER TABLE ALL IN TABLESPACE regress_tablespace
+	  OWNED BY regress_tablespace_user1 SET LOGGED;
+
+SELECT relname, t.spcname, relpersistence
+ FROM pg_class c LEFT JOIN pg_tablespace t ON (c.reltablespace = t.oid)
+ WHERE relnamespace = 'testschema'::regnamespace ORDER BY spcname, c.oid;
+
+RESET ROLE;
+
+ALTER TABLE ALL IN TABLESPACE regress_tablespace SET UNLOGGED;
+
+SELECT relname, t.spcname, relpersistence
+ FROM pg_class c LEFT JOIN pg_tablespace t ON (c.reltablespace = t.oid)
+ WHERE relnamespace = 'testschema'::regnamespace ORDER BY spcname, c.oid;
+
+-- Should succeed
+DROP SCHEMA testschema CASCADE;
+DROP TABLESPACE regress_tablespace;
+
 DROP ROLE regress_tablespace_user1;
 DROP ROLE regress_tablespace_user2;
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index f72c5c8a9e..8a6c101bdf 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -97,6 +97,7 @@ AlterTSConfigurationStmt
 AlterTSDictionaryStmt
 AlterTableCmd
 AlterTableMoveAllStmt
+AlterTableSetLoggedAllStmt
 AlterTableSpaceOptionsStmt
 AlterTableStmt
 AlterTableType
-- 
2.25.1

Reply via email to