On Fri, Aug 27, 2021 at 10:56 AM houzj.f...@fujitsu.com
<houzj.f...@fujitsu.com> wrote:
>
> On Thu, Aug 26, 2021 2:18 PM Dilip Kumar <dilipbal...@gmail.com> wrote:
>
> > The patch looks good to me, I have rebased 0002 atop
> > this patch and also done some cosmetic fixes in 0002.
>
> Here are some comments for the 0002 patch.
>
> 1)
>
> - * toplevel transaction. Each subscription has a separate set of files.
> + * toplevel transaction. Each subscription has a separate files.
>
> a separate files => a separate file

Done

> 2)
> +        * Otherwise, just open it file for writing, in append mode.
>          */
>
> open it file => open the file

Done

>
> 3)
>         if (subxact_data.nsubxacts == 0)
>         {
> -               if (ent->subxact_fileset)
> -               {
> -                       cleanup_subxact_info();
> -                       FileSetDeleteAll(ent->subxact_fileset);
> -                       pfree(ent->subxact_fileset);
> -                       ent->subxact_fileset = NULL;
> -               }
> +               cleanup_subxact_info();
> +               BufFileDeleteFileSet(stream_fileset, path, true);
> +
>
> Before applying the patch, the code only invoke cleanup_subxact_info() when 
> the
> file exists. After applying the patch, it will invoke cleanup_subxact_info()
> either the file exists or not. Is it correct ?

I think this is just structure resetting at the end of the stream.
Earlier the hash was telling us whether we have ever dirtied that
structure or not but now we are not maintaining that hash so we just
reset it at the end of the stream.  I don't think its any bad, in fact
I think this is much cheaper compared to maining the hash.

>
> 4)
>         /*
> -        * If this is the first streamed segment, the file must not exist, so 
> make
> -        * sure we're the ones creating it. Otherwise just open the file for
> -        * writing, in append mode.
> +        * If this is the first streamed segment, create the changes file.
> +        * Otherwise, just open it file for writing, in append mode.
>          */
>         if (first_segment)
> -       {
>         ...
> -               if (found)
> -                       ereport(ERROR,
> -                                       (errcode(ERRCODE_PROTOCOL_VIOLATION),
> -                                        errmsg_internal("incorrect 
> first-segment flag for streamed replication transaction")));
>         ...
> -       }
> +               stream_fd = BufFileCreateFileSet(stream_fileset, path);
>
>
> Since the function BufFileCreateFileSet() doesn't check the file's existence,
> the change here seems remove the file existence check which the old code did.

Not really, we were just doing a sanity check of the in memory hash
entry, now we don't maintain that so we don't need to do that.

-- 
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com
From dfae7d9e3ae875c8db2de792e9a95ef86fef2b00 Mon Sep 17 00:00:00 2001
From: Amit Kapila <akap...@postgresql.org>
Date: Wed, 25 Aug 2021 15:00:19 +0530
Subject: [PATCH v6 1/2] Refactor sharedfileset.c to separate out fileset
 implementation.

Move fileset related implementation out of sharedfileset.c to allow its
usage by backends that don't want to share filesets among different
processes. After this split, fileset infrastructure is used by both
sharedfileset.c and worker.c for the named temporary files that survive
across transactions.

Author: Dilip Kumar, based on suggestion by Andres Freund
Reviewed-by: Hou Zhijie, Masahiko Sawada, Amit Kapila
Discussion: https://postgr.es/m/e1mcc6u-0004ik...@gemulon.postgresql.org
---
 src/backend/replication/logical/launcher.c |   3 +
 src/backend/replication/logical/worker.c   |  82 ++++++----
 src/backend/storage/file/Makefile          |   1 +
 src/backend/storage/file/buffile.c         |  84 +++++-----
 src/backend/storage/file/fd.c              |   2 +-
 src/backend/storage/file/fileset.c         | 205 ++++++++++++++++++++++++
 src/backend/storage/file/sharedfileset.c   | 244 +----------------------------
 src/backend/utils/sort/logtape.c           |   8 +-
 src/backend/utils/sort/sharedtuplestore.c  |   5 +-
 src/include/replication/worker_internal.h  |   1 +
 src/include/storage/buffile.h              |  14 +-
 src/include/storage/fileset.h              |  40 +++++
 src/include/storage/sharedfileset.h        |  14 +-
 src/tools/pgindent/typedefs.list           |   1 +
 14 files changed, 368 insertions(+), 336 deletions(-)
 create mode 100644 src/backend/storage/file/fileset.c
 create mode 100644 src/include/storage/fileset.h

diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index e3b11da..8b1772d 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -648,6 +648,9 @@ logicalrep_worker_onexit(int code, Datum arg)
 
 	logicalrep_worker_detach();
 
+	/* Cleanup filesets used for streaming transactions. */
+	logicalrep_worker_cleanupfileset();
+
 	ApplyLauncherWakeup();
 }
 
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 295b1e0..bfb7d1a 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -39,13 +39,13 @@
  * BufFile infrastructure supports temporary files that exceed the OS file size
  * limit, (b) provides a way for automatic clean up on the error and (c) provides
  * a way to survive these files across local transactions and allow to open and
- * close at stream start and close. We decided to use SharedFileSet
+ * close at stream start and close. We decided to use FileSet
  * infrastructure as without that it deletes the files on the closure of the
  * file and if we decide to keep stream files open across the start/stop stream
  * then it will consume a lot of memory (more than 8K for each BufFile and
  * there could be multiple such BufFiles as the subscriber could receive
  * multiple start/stop streams for different transactions before getting the
- * commit). Moreover, if we don't use SharedFileSet then we also need to invent
+ * commit). Moreover, if we don't use FileSet then we also need to invent
  * a new way to pass filenames to BufFile APIs so that we are allowed to open
  * the file we desired across multiple stream-open calls for the same
  * transaction.
@@ -246,8 +246,8 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
 typedef struct StreamXidHash
 {
 	TransactionId xid;			/* xid is the hash key and must be first */
-	SharedFileSet *stream_fileset;	/* shared file set for stream data */
-	SharedFileSet *subxact_fileset; /* shared file set for subxact info */
+	FileSet    *stream_fileset; /* file set for stream data */
+	FileSet    *subxact_fileset;	/* file set for subxact info */
 } StreamXidHash;
 
 static MemoryContext ApplyMessageContext = NULL;
@@ -270,8 +270,8 @@ static bool in_streamed_transaction = false;
 static TransactionId stream_xid = InvalidTransactionId;
 
 /*
- * Hash table for storing the streaming xid information along with shared file
- * set for streaming and subxact files.
+ * Hash table for storing the streaming xid information along with filesets
+ * for streaming and subxact files.
  */
 static HTAB *xidhash = NULL;
 
@@ -1297,11 +1297,11 @@ apply_handle_stream_abort(StringInfo s)
 
 		/* open the changes file */
 		changes_filename(path, MyLogicalRepWorker->subid, xid);
-		fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR);
+		fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDWR);
 
 		/* OK, truncate the file at the right offset */
-		BufFileTruncateShared(fd, subxact_data.subxacts[subidx].fileno,
-							  subxact_data.subxacts[subidx].offset);
+		BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno,
+							   subxact_data.subxacts[subidx].offset);
 		BufFileClose(fd);
 
 		/* discard the subxacts added later */
@@ -1355,7 +1355,7 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
 				 errmsg_internal("transaction %u not found in stream XID hash table",
 								 xid)));
 
-	fd = BufFileOpenShared(ent->stream_fileset, path, O_RDONLY);
+	fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDONLY);
 
 	buffer = palloc(BLCKSZ);
 	initStringInfo(&s2);
@@ -2542,6 +2542,30 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
 }
 
 /*
+ * Cleanup filesets.
+ */
+void
+logicalrep_worker_cleanupfileset(void)
+{
+	HASH_SEQ_STATUS status;
+	StreamXidHash *hentry;
+
+	/* Remove all the pending stream and subxact filesets. */
+	if (xidhash)
+	{
+		hash_seq_init(&status, xidhash);
+		while ((hentry = (StreamXidHash *) hash_seq_search(&status)) != NULL)
+		{
+			FileSetDeleteAll(hentry->stream_fileset);
+
+			/* Delete the subxact fileset iff it is created. */
+			if (hentry->subxact_fileset)
+				FileSetDeleteAll(hentry->subxact_fileset);
+		}
+	}
+}
+
+/*
  * Apply main loop.
  */
 static void
@@ -3024,7 +3048,7 @@ subxact_info_write(Oid subid, TransactionId xid)
 		if (ent->subxact_fileset)
 		{
 			cleanup_subxact_info();
-			SharedFileSetDeleteAll(ent->subxact_fileset);
+			FileSetDeleteAll(ent->subxact_fileset);
 			pfree(ent->subxact_fileset);
 			ent->subxact_fileset = NULL;
 		}
@@ -3042,18 +3066,18 @@ subxact_info_write(Oid subid, TransactionId xid)
 		MemoryContext oldctx;
 
 		/*
-		 * We need to maintain shared fileset across multiple stream
-		 * start/stop calls.  So, need to allocate it in a persistent context.
+		 * We need to maintain fileset across multiple stream start/stop
+		 * calls. So, need to allocate it in a persistent context.
 		 */
 		oldctx = MemoryContextSwitchTo(ApplyContext);
-		ent->subxact_fileset = palloc(sizeof(SharedFileSet));
-		SharedFileSetInit(ent->subxact_fileset, NULL);
+		ent->subxact_fileset = palloc(sizeof(FileSet));
+		FileSetInit(ent->subxact_fileset);
 		MemoryContextSwitchTo(oldctx);
 
-		fd = BufFileCreateShared(ent->subxact_fileset, path);
+		fd = BufFileCreateFileSet(ent->subxact_fileset, path);
 	}
 	else
-		fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDWR);
+		fd = BufFileOpenFileSet(ent->subxact_fileset, path, O_RDWR);
 
 	len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
 
@@ -3107,7 +3131,7 @@ subxact_info_read(Oid subid, TransactionId xid)
 
 	subxact_filename(path, subid, xid);
 
-	fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDONLY);
+	fd = BufFileOpenFileSet(ent->subxact_fileset, path, O_RDONLY);
 
 	/* read number of subxact items */
 	if (BufFileRead(fd, &subxact_data.nsubxacts,
@@ -3264,7 +3288,7 @@ stream_cleanup_files(Oid subid, TransactionId xid)
 
 	/* Delete the change file and release the stream fileset memory */
 	changes_filename(path, subid, xid);
-	SharedFileSetDeleteAll(ent->stream_fileset);
+	FileSetDeleteAll(ent->stream_fileset);
 	pfree(ent->stream_fileset);
 	ent->stream_fileset = NULL;
 
@@ -3272,7 +3296,7 @@ stream_cleanup_files(Oid subid, TransactionId xid)
 	if (ent->subxact_fileset)
 	{
 		subxact_filename(path, subid, xid);
-		SharedFileSetDeleteAll(ent->subxact_fileset);
+		FileSetDeleteAll(ent->subxact_fileset);
 		pfree(ent->subxact_fileset);
 		ent->subxact_fileset = NULL;
 	}
@@ -3288,8 +3312,8 @@ stream_cleanup_files(Oid subid, TransactionId xid)
  *
  * Open a file for streamed changes from a toplevel transaction identified
  * by stream_xid (global variable). If it's the first chunk of streamed
- * changes for this transaction, initialize the shared fileset and create the
- * buffile, otherwise open the previously created file.
+ * changes for this transaction, initialize the fileset and create the buffile,
+ * otherwise open the previously created file.
  *
  * This can only be called at the beginning of a "streaming" block, i.e.
  * between stream_start/stream_stop messages from the upstream.
@@ -3330,7 +3354,7 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
 	if (first_segment)
 	{
 		MemoryContext savectx;
-		SharedFileSet *fileset;
+		FileSet    *fileset;
 
 		if (found)
 			ereport(ERROR,
@@ -3338,16 +3362,16 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
 					 errmsg_internal("incorrect first-segment flag for streamed replication transaction")));
 
 		/*
-		 * We need to maintain shared fileset across multiple stream
-		 * start/stop calls. So, need to allocate it in a persistent context.
+		 * We need to maintain fileset across multiple stream start/stop
+		 * calls. So, need to allocate it in a persistent context.
 		 */
 		savectx = MemoryContextSwitchTo(ApplyContext);
-		fileset = palloc(sizeof(SharedFileSet));
+		fileset = palloc(sizeof(FileSet));
 
-		SharedFileSetInit(fileset, NULL);
+		FileSetInit(fileset);
 		MemoryContextSwitchTo(savectx);
 
-		stream_fd = BufFileCreateShared(fileset, path);
+		stream_fd = BufFileCreateFileSet(fileset, path);
 
 		/* Remember the fileset for the next stream of the same transaction */
 		ent->xid = xid;
@@ -3365,7 +3389,7 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
 		 * Open the file and seek to the end of the file because we always
 		 * append the changes file.
 		 */
-		stream_fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR);
+		stream_fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDWR);
 		BufFileSeek(stream_fd, 0, 0, SEEK_END);
 	}
 
diff --git a/src/backend/storage/file/Makefile b/src/backend/storage/file/Makefile
index 5e1291b..660ac51 100644
--- a/src/backend/storage/file/Makefile
+++ b/src/backend/storage/file/Makefile
@@ -16,6 +16,7 @@ OBJS = \
 	buffile.o \
 	copydir.o \
 	fd.o \
+	fileset.o \
 	reinit.o \
 	sharedfileset.o
 
diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c
index a4be5fe..5e5409d 100644
--- a/src/backend/storage/file/buffile.c
+++ b/src/backend/storage/file/buffile.c
@@ -39,7 +39,7 @@
  * BufFile also supports temporary files that can be used by the single backend
  * when the corresponding files need to be survived across the transaction and
  * need to be opened and closed multiple times.  Such files need to be created
- * as a member of a SharedFileSet.
+ * as a member of a FileSet.
  *-------------------------------------------------------------------------
  */
 
@@ -77,8 +77,8 @@ struct BufFile
 	bool		dirty;			/* does buffer need to be written? */
 	bool		readOnly;		/* has the file been set to read only? */
 
-	SharedFileSet *fileset;		/* space for segment files if shared */
-	const char *name;			/* name of this BufFile if shared */
+	FileSet    *fileset;		/* space for fileset based segment files */
+	const char *name;			/* name of fileset based BufFile */
 
 	/*
 	 * resowner is the ResourceOwner to use for underlying temp files.  (We
@@ -104,7 +104,7 @@ static void extendBufFile(BufFile *file);
 static void BufFileLoadBuffer(BufFile *file);
 static void BufFileDumpBuffer(BufFile *file);
 static void BufFileFlush(BufFile *file);
-static File MakeNewSharedSegment(BufFile *file, int segment);
+static File MakeNewFileSetSegment(BufFile *file, int segment);
 
 /*
  * Create BufFile and perform the common initialization.
@@ -160,7 +160,7 @@ extendBufFile(BufFile *file)
 	if (file->fileset == NULL)
 		pfile = OpenTemporaryFile(file->isInterXact);
 	else
-		pfile = MakeNewSharedSegment(file, file->numFiles);
+		pfile = MakeNewFileSetSegment(file, file->numFiles);
 
 	Assert(pfile >= 0);
 
@@ -214,34 +214,34 @@ BufFileCreateTemp(bool interXact)
  * Build the name for a given segment of a given BufFile.
  */
 static void
-SharedSegmentName(char *name, const char *buffile_name, int segment)
+FileSetSegmentName(char *name, const char *buffile_name, int segment)
 {
 	snprintf(name, MAXPGPATH, "%s.%d", buffile_name, segment);
 }
 
 /*
- * Create a new segment file backing a shared BufFile.
+ * Create a new segment file backing a fileset based BufFile.
  */
 static File
-MakeNewSharedSegment(BufFile *buffile, int segment)
+MakeNewFileSetSegment(BufFile *buffile, int segment)
 {
 	char		name[MAXPGPATH];
 	File		file;
 
 	/*
 	 * It is possible that there are files left over from before a crash
-	 * restart with the same name.  In order for BufFileOpenShared() not to
+	 * restart with the same name.  In order for BufFileOpenFileSet() not to
 	 * get confused about how many segments there are, we'll unlink the next
 	 * segment number if it already exists.
 	 */
-	SharedSegmentName(name, buffile->name, segment + 1);
-	SharedFileSetDelete(buffile->fileset, name, true);
+	FileSetSegmentName(name, buffile->name, segment + 1);
+	FileSetDelete(buffile->fileset, name, true);
 
 	/* Create the new segment. */
-	SharedSegmentName(name, buffile->name, segment);
-	file = SharedFileSetCreate(buffile->fileset, name);
+	FileSetSegmentName(name, buffile->name, segment);
+	file = FileSetCreate(buffile->fileset, name);
 
-	/* SharedFileSetCreate would've errored out */
+	/* FileSetCreate would've errored out */
 	Assert(file > 0);
 
 	return file;
@@ -251,15 +251,15 @@ MakeNewSharedSegment(BufFile *buffile, int segment)
  * Create a BufFile that can be discovered and opened read-only by other
  * backends that are attached to the same SharedFileSet using the same name.
  *
- * The naming scheme for shared BufFiles is left up to the calling code.  The
- * name will appear as part of one or more filenames on disk, and might
+ * The naming scheme for fileset based BufFiles is left up to the calling code.
+ * The name will appear as part of one or more filenames on disk, and might
  * provide clues to administrators about which subsystem is generating
  * temporary file data.  Since each SharedFileSet object is backed by one or
  * more uniquely named temporary directory, names don't conflict with
  * unrelated SharedFileSet objects.
  */
 BufFile *
-BufFileCreateShared(SharedFileSet *fileset, const char *name)
+BufFileCreateFileSet(FileSet *fileset, const char *name)
 {
 	BufFile    *file;
 
@@ -267,7 +267,7 @@ BufFileCreateShared(SharedFileSet *fileset, const char *name)
 	file->fileset = fileset;
 	file->name = pstrdup(name);
 	file->files = (File *) palloc(sizeof(File));
-	file->files[0] = MakeNewSharedSegment(file, 0);
+	file->files[0] = MakeNewFileSetSegment(file, 0);
 	file->readOnly = false;
 
 	return file;
@@ -275,13 +275,13 @@ BufFileCreateShared(SharedFileSet *fileset, const char *name)
 
 /*
  * Open a file that was previously created in another backend (or this one)
- * with BufFileCreateShared in the same SharedFileSet using the same name.
+ * with BufFileCreateFileSet in the same FileSet using the same name.
  * The backend that created the file must have called BufFileClose() or
- * BufFileExportShared() to make sure that it is ready to be opened by other
+ * BufFileExportFileSet() to make sure that it is ready to be opened by other
  * backends and render it read-only.
  */
 BufFile *
-BufFileOpenShared(SharedFileSet *fileset, const char *name, int mode)
+BufFileOpenFileSet(FileSet *fileset, const char *name, int mode)
 {
 	BufFile    *file;
 	char		segment_name[MAXPGPATH];
@@ -304,8 +304,8 @@ BufFileOpenShared(SharedFileSet *fileset, const char *name, int mode)
 			files = repalloc(files, sizeof(File) * capacity);
 		}
 		/* Try to load a segment. */
-		SharedSegmentName(segment_name, name, nfiles);
-		files[nfiles] = SharedFileSetOpen(fileset, segment_name, mode);
+		FileSetSegmentName(segment_name, name, nfiles);
+		files[nfiles] = FileSetOpen(fileset, segment_name, mode);
 		if (files[nfiles] <= 0)
 			break;
 		++nfiles;
@@ -333,18 +333,18 @@ BufFileOpenShared(SharedFileSet *fileset, const char *name, int mode)
 }
 
 /*
- * Delete a BufFile that was created by BufFileCreateShared in the given
- * SharedFileSet using the given name.
+ * Delete a BufFile that was created by BufFileCreateFileSet in the given
+ * FileSet using the given name.
  *
  * It is not necessary to delete files explicitly with this function.  It is
  * provided only as a way to delete files proactively, rather than waiting for
- * the SharedFileSet to be cleaned up.
+ * the FileSet to be cleaned up.
  *
  * Only one backend should attempt to delete a given name, and should know
  * that it exists and has been exported or closed.
  */
 void
-BufFileDeleteShared(SharedFileSet *fileset, const char *name)
+BufFileDeleteFileSet(FileSet *fileset, const char *name)
 {
 	char		segment_name[MAXPGPATH];
 	int			segment = 0;
@@ -357,8 +357,8 @@ BufFileDeleteShared(SharedFileSet *fileset, const char *name)
 	 */
 	for (;;)
 	{
-		SharedSegmentName(segment_name, name, segment);
-		if (!SharedFileSetDelete(fileset, segment_name, true))
+		FileSetSegmentName(segment_name, name, segment);
+		if (!FileSetDelete(fileset, segment_name, true))
 			break;
 		found = true;
 		++segment;
@@ -367,16 +367,16 @@ BufFileDeleteShared(SharedFileSet *fileset, const char *name)
 	}
 
 	if (!found)
-		elog(ERROR, "could not delete unknown shared BufFile \"%s\"", name);
+		elog(ERROR, "could not delete unknown BufFile \"%s\"", name);
 }
 
 /*
- * BufFileExportShared --- flush and make read-only, in preparation for sharing.
+ * BufFileExportFileSet --- flush and make read-only, in preparation for sharing.
  */
 void
-BufFileExportShared(BufFile *file)
+BufFileExportFileSet(BufFile *file)
 {
-	/* Must be a file belonging to a SharedFileSet. */
+	/* Must be a file belonging to a FileSet. */
 	Assert(file->fileset != NULL);
 
 	/* It's probably a bug if someone calls this twice. */
@@ -785,7 +785,7 @@ BufFileTellBlock(BufFile *file)
 #endif
 
 /*
- * Return the current shared BufFile size.
+ * Return the current fileset based BufFile size.
  *
  * Counts any holes left behind by BufFileAppend as part of the size.
  * ereport()s on failure.
@@ -811,8 +811,8 @@ BufFileSize(BufFile *file)
 }
 
 /*
- * Append the contents of source file (managed within shared fileset) to
- * end of target file (managed within same shared fileset).
+ * Append the contents of source file (managed within fileset) to
+ * end of target file (managed within same fileset).
  *
  * Note that operation subsumes ownership of underlying resources from
  * "source".  Caller should never call BufFileClose against source having
@@ -854,11 +854,11 @@ BufFileAppend(BufFile *target, BufFile *source)
 }
 
 /*
- * Truncate a BufFile created by BufFileCreateShared up to the given fileno and
- * the offset.
+ * Truncate a BufFile created by BufFileCreateFileSet up to the given fileno
+ * and the offset.
  */
 void
-BufFileTruncateShared(BufFile *file, int fileno, off_t offset)
+BufFileTruncateFileSet(BufFile *file, int fileno, off_t offset)
 {
 	int			numFiles = file->numFiles;
 	int			newFile = fileno;
@@ -876,12 +876,12 @@ BufFileTruncateShared(BufFile *file, int fileno, off_t offset)
 	{
 		if ((i != fileno || offset == 0) && i != 0)
 		{
-			SharedSegmentName(segment_name, file->name, i);
+			FileSetSegmentName(segment_name, file->name, i);
 			FileClose(file->files[i]);
-			if (!SharedFileSetDelete(file->fileset, segment_name, true))
+			if (!FileSetDelete(file->fileset, segment_name, true))
 				ereport(ERROR,
 						(errcode_for_file_access(),
-						 errmsg("could not delete shared fileset \"%s\": %m",
+						 errmsg("could not delete fileset \"%s\": %m",
 								segment_name)));
 			numFiles--;
 			newOffset = MAX_PHYSICAL_FILESIZE;
diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c
index b58b399..433e283 100644
--- a/src/backend/storage/file/fd.c
+++ b/src/backend/storage/file/fd.c
@@ -1921,7 +1921,7 @@ PathNameDeleteTemporaryFile(const char *path, bool error_on_failure)
 
 	/*
 	 * Unlike FileClose's automatic file deletion code, we tolerate
-	 * non-existence to support BufFileDeleteShared which doesn't know how
+	 * non-existence to support BufFileDeleteFileSet which doesn't know how
 	 * many segments it has to delete until it runs out.
 	 */
 	if (stat_errno == ENOENT)
diff --git a/src/backend/storage/file/fileset.c b/src/backend/storage/file/fileset.c
new file mode 100644
index 0000000..282ff12
--- /dev/null
+++ b/src/backend/storage/file/fileset.c
@@ -0,0 +1,205 @@
+/*-------------------------------------------------------------------------
+ *
+ * fileset.c
+ *	  Management of named temporary files.
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	  src/backend/storage/file/fileset.c
+ *
+ * FileSets provide a temporary namespace (think directory) so that files can
+ * be discovered by name.
+ *
+ * FileSets can be used by backends when the temporary files need to be
+ * opened/closed multiple times and the underlying files need to survive across
+ * transactions.
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include <limits.h>
+
+#include "catalog/pg_tablespace.h"
+#include "commands/tablespace.h"
+#include "common/hashfn.h"
+#include "miscadmin.h"
+#include "storage/ipc.h"
+#include "storage/fileset.h"
+#include "utils/builtins.h"
+
+static void FileSetPath(char *path, FileSet *fileset, Oid tablespace);
+static void FilePath(char *path, FileSet *fileset, const char *name);
+static Oid	ChooseTablespace(const FileSet *fileset, const char *name);
+
+/*
+ * Initialize a space for temporary files. This API can be used by shared
+ * fileset as well as if the temporary files are used only by single backend
+ * but the files need to be opened and closed multiple times and also the
+ * underlying files need to survive across transactions.
+ *
+ * The callers are expected to explicitly remove such files by using
+ * FileSetDelete/FileSetDeleteAll.
+ *
+ * Files will be distributed over the tablespaces configured in
+ * temp_tablespaces.
+ *
+ * Under the covers the set is one or more directories which will eventually
+ * be deleted.
+ */
+void
+FileSetInit(FileSet *fileset)
+{
+	static uint32 counter = 0;
+
+	fileset->creator_pid = MyProcPid;
+	fileset->number = counter;
+	counter = (counter + 1) % INT_MAX;
+
+	/* Capture the tablespace OIDs so that all backends agree on them. */
+	PrepareTempTablespaces();
+	fileset->ntablespaces =
+		GetTempTablespaces(&fileset->tablespaces[0],
+						   lengthof(fileset->tablespaces));
+	if (fileset->ntablespaces == 0)
+	{
+		/* If the GUC is empty, use current database's default tablespace */
+		fileset->tablespaces[0] = MyDatabaseTableSpace;
+		fileset->ntablespaces = 1;
+	}
+	else
+	{
+		int			i;
+
+		/*
+		 * An entry of InvalidOid means use the default tablespace for the
+		 * current database.  Replace that now, to be sure that all users of
+		 * the FileSet agree on what to do.
+		 */
+		for (i = 0; i < fileset->ntablespaces; i++)
+		{
+			if (fileset->tablespaces[i] == InvalidOid)
+				fileset->tablespaces[i] = MyDatabaseTableSpace;
+		}
+	}
+}
+
+/*
+ * Create a new file in the given set.
+ */
+File
+FileSetCreate(FileSet *fileset, const char *name)
+{
+	char		path[MAXPGPATH];
+	File		file;
+
+	FilePath(path, fileset, name);
+	file = PathNameCreateTemporaryFile(path, false);
+
+	/* If we failed, see if we need to create the directory on demand. */
+	if (file <= 0)
+	{
+		char		tempdirpath[MAXPGPATH];
+		char		filesetpath[MAXPGPATH];
+		Oid			tablespace = ChooseTablespace(fileset, name);
+
+		TempTablespacePath(tempdirpath, tablespace);
+		FileSetPath(filesetpath, fileset, tablespace);
+		PathNameCreateTemporaryDir(tempdirpath, filesetpath);
+		file = PathNameCreateTemporaryFile(path, true);
+	}
+
+	return file;
+}
+
+/*
+ * Open a file that was created with FileSetCreate() */
+File
+FileSetOpen(FileSet *fileset, const char *name, int mode)
+{
+	char		path[MAXPGPATH];
+	File		file;
+
+	FilePath(path, fileset, name);
+	file = PathNameOpenTemporaryFile(path, mode);
+
+	return file;
+}
+
+/*
+ * Delete a file that was created with FileSetCreate().
+ *
+ * Return true if the file existed, false if didn't.
+ */
+bool
+FileSetDelete(FileSet *fileset, const char *name,
+			  bool error_on_failure)
+{
+	char		path[MAXPGPATH];
+
+	FilePath(path, fileset, name);
+
+	return PathNameDeleteTemporaryFile(path, error_on_failure);
+}
+
+/*
+ * Delete all files in the set.
+ */
+void
+FileSetDeleteAll(FileSet *fileset)
+{
+	char		dirpath[MAXPGPATH];
+	int			i;
+
+	/*
+	 * Delete the directory we created in each tablespace.  Doesn't fail
+	 * because we use this in error cleanup paths, but can generate LOG
+	 * message on IO error.
+	 */
+	for (i = 0; i < fileset->ntablespaces; ++i)
+	{
+		FileSetPath(dirpath, fileset, fileset->tablespaces[i]);
+		PathNameDeleteTemporaryDir(dirpath);
+	}
+}
+
+/*
+ * Build the path for the directory holding the files backing a FileSet in a
+ * given tablespace.
+ */
+static void
+FileSetPath(char *path, FileSet *fileset, Oid tablespace)
+{
+	char		tempdirpath[MAXPGPATH];
+
+	TempTablespacePath(tempdirpath, tablespace);
+	snprintf(path, MAXPGPATH, "%s/%s%lu.%u.fileset",
+			 tempdirpath, PG_TEMP_FILE_PREFIX,
+			 (unsigned long) fileset->creator_pid, fileset->number);
+}
+
+/*
+ * Sorting has to determine which tablespace a given temporary file belongs in.
+ */
+static Oid
+ChooseTablespace(const FileSet *fileset, const char *name)
+{
+	uint32		hash = hash_any((const unsigned char *) name, strlen(name));
+
+	return fileset->tablespaces[hash % fileset->ntablespaces];
+}
+
+/*
+ * Compute the full path of a file in a FileSet.
+ */
+static void
+FilePath(char *path, FileSet *fileset, const char *name)
+{
+	char		dirpath[MAXPGPATH];
+
+	FileSetPath(dirpath, fileset, ChooseTablespace(fileset, name));
+	snprintf(path, MAXPGPATH, "%s/%s", dirpath, name);
+}
diff --git a/src/backend/storage/file/sharedfileset.c b/src/backend/storage/file/sharedfileset.c
index ed37c94..6a33fac 100644
--- a/src/backend/storage/file/sharedfileset.c
+++ b/src/backend/storage/file/sharedfileset.c
@@ -13,10 +13,6 @@
  * files can be discovered by name, and a shared ownership semantics so that
  * shared files survive until the last user detaches.
  *
- * SharedFileSets can be used by backends when the temporary files need to be
- * opened/closed multiple times and the underlying files need to survive across
- * transactions.
- *
  *-------------------------------------------------------------------------
  */
 
@@ -33,13 +29,7 @@
 #include "storage/sharedfileset.h"
 #include "utils/builtins.h"
 
-static List *filesetlist = NIL;
-
 static void SharedFileSetOnDetach(dsm_segment *segment, Datum datum);
-static void SharedFileSetDeleteOnProcExit(int status, Datum arg);
-static void SharedFileSetPath(char *path, SharedFileSet *fileset, Oid tablespace);
-static void SharedFilePath(char *path, SharedFileSet *fileset, const char *name);
-static Oid	ChooseTablespace(const SharedFileSet *fileset, const char *name);
 
 /*
  * Initialize a space for temporary files that can be opened by other backends.
@@ -47,77 +37,22 @@ static Oid	ChooseTablespace(const SharedFileSet *fileset, const char *name);
  * SharedFileSet with 'seg'.  Any contained files will be deleted when the
  * last backend detaches.
  *
- * We can also use this interface if the temporary files are used only by
- * single backend but the files need to be opened and closed multiple times
- * and also the underlying files need to survive across transactions.  For
- * such cases, dsm segment 'seg' should be passed as NULL.  Callers are
- * expected to explicitly remove such files by using SharedFileSetDelete/
- * SharedFileSetDeleteAll or we remove such files on proc exit.
- *
- * Files will be distributed over the tablespaces configured in
- * temp_tablespaces.
- *
  * Under the covers the set is one or more directories which will eventually
  * be deleted.
  */
 void
 SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg)
 {
-	static uint32 counter = 0;
-
+	/* Initialize the shared fileset specific members. */
 	SpinLockInit(&fileset->mutex);
 	fileset->refcnt = 1;
-	fileset->creator_pid = MyProcPid;
-	fileset->number = counter;
-	counter = (counter + 1) % INT_MAX;
-
-	/* Capture the tablespace OIDs so that all backends agree on them. */
-	PrepareTempTablespaces();
-	fileset->ntablespaces =
-		GetTempTablespaces(&fileset->tablespaces[0],
-						   lengthof(fileset->tablespaces));
-	if (fileset->ntablespaces == 0)
-	{
-		/* If the GUC is empty, use current database's default tablespace */
-		fileset->tablespaces[0] = MyDatabaseTableSpace;
-		fileset->ntablespaces = 1;
-	}
-	else
-	{
-		int			i;
 
-		/*
-		 * An entry of InvalidOid means use the default tablespace for the
-		 * current database.  Replace that now, to be sure that all users of
-		 * the SharedFileSet agree on what to do.
-		 */
-		for (i = 0; i < fileset->ntablespaces; i++)
-		{
-			if (fileset->tablespaces[i] == InvalidOid)
-				fileset->tablespaces[i] = MyDatabaseTableSpace;
-		}
-	}
+	/* Initialize the fileset. */
+	FileSetInit(&fileset->fs);
 
 	/* Register our cleanup callback. */
 	if (seg)
 		on_dsm_detach(seg, SharedFileSetOnDetach, PointerGetDatum(fileset));
-	else
-	{
-		static bool registered_cleanup = false;
-
-		if (!registered_cleanup)
-		{
-			/*
-			 * We must not have registered any fileset before registering the
-			 * fileset clean up.
-			 */
-			Assert(filesetlist == NIL);
-			on_proc_exit(SharedFileSetDeleteOnProcExit, 0);
-			registered_cleanup = true;
-		}
-
-		filesetlist = lcons((void *) fileset, filesetlist);
-	}
 }
 
 /*
@@ -148,86 +83,12 @@ SharedFileSetAttach(SharedFileSet *fileset, dsm_segment *seg)
 }
 
 /*
- * Create a new file in the given set.
- */
-File
-SharedFileSetCreate(SharedFileSet *fileset, const char *name)
-{
-	char		path[MAXPGPATH];
-	File		file;
-
-	SharedFilePath(path, fileset, name);
-	file = PathNameCreateTemporaryFile(path, false);
-
-	/* If we failed, see if we need to create the directory on demand. */
-	if (file <= 0)
-	{
-		char		tempdirpath[MAXPGPATH];
-		char		filesetpath[MAXPGPATH];
-		Oid			tablespace = ChooseTablespace(fileset, name);
-
-		TempTablespacePath(tempdirpath, tablespace);
-		SharedFileSetPath(filesetpath, fileset, tablespace);
-		PathNameCreateTemporaryDir(tempdirpath, filesetpath);
-		file = PathNameCreateTemporaryFile(path, true);
-	}
-
-	return file;
-}
-
-/*
- * Open a file that was created with SharedFileSetCreate(), possibly in
- * another backend.
- */
-File
-SharedFileSetOpen(SharedFileSet *fileset, const char *name, int mode)
-{
-	char		path[MAXPGPATH];
-	File		file;
-
-	SharedFilePath(path, fileset, name);
-	file = PathNameOpenTemporaryFile(path, mode);
-
-	return file;
-}
-
-/*
- * Delete a file that was created with SharedFileSetCreate().
- * Return true if the file existed, false if didn't.
- */
-bool
-SharedFileSetDelete(SharedFileSet *fileset, const char *name,
-					bool error_on_failure)
-{
-	char		path[MAXPGPATH];
-
-	SharedFilePath(path, fileset, name);
-
-	return PathNameDeleteTemporaryFile(path, error_on_failure);
-}
-
-/*
  * Delete all files in the set.
  */
 void
 SharedFileSetDeleteAll(SharedFileSet *fileset)
 {
-	char		dirpath[MAXPGPATH];
-	int			i;
-
-	/*
-	 * Delete the directory we created in each tablespace.  Doesn't fail
-	 * because we use this in error cleanup paths, but can generate LOG
-	 * message on IO error.
-	 */
-	for (i = 0; i < fileset->ntablespaces; ++i)
-	{
-		SharedFileSetPath(dirpath, fileset, fileset->tablespaces[i]);
-		PathNameDeleteTemporaryDir(dirpath);
-	}
-
-	/* Unregister the shared fileset */
-	SharedFileSetUnregister(fileset);
+	FileSetDeleteAll(&fileset->fs);
 }
 
 /*
@@ -255,100 +116,5 @@ SharedFileSetOnDetach(dsm_segment *segment, Datum datum)
 	 * this function so we can safely access its data.
 	 */
 	if (unlink_all)
-		SharedFileSetDeleteAll(fileset);
-}
-
-/*
- * Callback function that will be invoked on the process exit.  This will
- * process the list of all the registered sharedfilesets and delete the
- * underlying files.
- */
-static void
-SharedFileSetDeleteOnProcExit(int status, Datum arg)
-{
-	/*
-	 * Remove all the pending shared fileset entries. We don't use foreach()
-	 * here because SharedFileSetDeleteAll will remove the current element in
-	 * filesetlist. Though we have used foreach_delete_current() to remove the
-	 * element from filesetlist it could only fix up the state of one of the
-	 * loops, see SharedFileSetUnregister.
-	 */
-	while (list_length(filesetlist) > 0)
-	{
-		SharedFileSet *fileset = (SharedFileSet *) linitial(filesetlist);
-
-		SharedFileSetDeleteAll(fileset);
-	}
-
-	filesetlist = NIL;
-}
-
-/*
- * Unregister the shared fileset entry registered for cleanup on proc exit.
- */
-void
-SharedFileSetUnregister(SharedFileSet *input_fileset)
-{
-	ListCell   *l;
-
-	/*
-	 * If the caller is following the dsm based cleanup then we don't maintain
-	 * the filesetlist so return.
-	 */
-	if (filesetlist == NIL)
-		return;
-
-	foreach(l, filesetlist)
-	{
-		SharedFileSet *fileset = (SharedFileSet *) lfirst(l);
-
-		/* Remove the entry from the list */
-		if (input_fileset == fileset)
-		{
-			filesetlist = foreach_delete_current(filesetlist, l);
-			return;
-		}
-	}
-
-	/* Should have found a match */
-	Assert(false);
-}
-
-/*
- * Build the path for the directory holding the files backing a SharedFileSet
- * in a given tablespace.
- */
-static void
-SharedFileSetPath(char *path, SharedFileSet *fileset, Oid tablespace)
-{
-	char		tempdirpath[MAXPGPATH];
-
-	TempTablespacePath(tempdirpath, tablespace);
-	snprintf(path, MAXPGPATH, "%s/%s%lu.%u.sharedfileset",
-			 tempdirpath, PG_TEMP_FILE_PREFIX,
-			 (unsigned long) fileset->creator_pid, fileset->number);
-}
-
-/*
- * Sorting hat to determine which tablespace a given shared temporary file
- * belongs in.
- */
-static Oid
-ChooseTablespace(const SharedFileSet *fileset, const char *name)
-{
-	uint32		hash = hash_any((const unsigned char *) name, strlen(name));
-
-	return fileset->tablespaces[hash % fileset->ntablespaces];
-}
-
-/*
- * Compute the full path of a file in a SharedFileSet.
- */
-static void
-SharedFilePath(char *path, SharedFileSet *fileset, const char *name)
-{
-	char		dirpath[MAXPGPATH];
-
-	SharedFileSetPath(dirpath, fileset, ChooseTablespace(fileset, name));
-	snprintf(path, MAXPGPATH, "%s/%s", dirpath, name);
+		FileSetDeleteAll(&fileset->fs);
 }
diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c
index cafc087..f7994d7 100644
--- a/src/backend/utils/sort/logtape.c
+++ b/src/backend/utils/sort/logtape.c
@@ -564,7 +564,7 @@ ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
 		lt = &lts->tapes[i];
 
 		pg_itoa(i, filename);
-		file = BufFileOpenShared(fileset, filename, O_RDONLY);
+		file = BufFileOpenFileSet(&fileset->fs, filename, O_RDONLY);
 		filesize = BufFileSize(file);
 
 		/*
@@ -610,7 +610,7 @@ ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
 	 * offset).
 	 *
 	 * The only thing that currently prevents writing to the leader tape from
-	 * working is the fact that BufFiles opened using BufFileOpenShared() are
+	 * working is the fact that BufFiles opened using BufFileOpenFileSet() are
 	 * read-only by definition, but that could be changed if it seemed
 	 * worthwhile.  For now, writing to the leader tape will raise a "Bad file
 	 * descriptor" error, so tuplesort must avoid writing to the leader tape
@@ -722,7 +722,7 @@ LogicalTapeSetCreate(int ntapes, bool preallocate, TapeShare *shared,
 		char		filename[MAXPGPATH];
 
 		pg_itoa(worker, filename);
-		lts->pfile = BufFileCreateShared(fileset, filename);
+		lts->pfile = BufFileCreateFileSet(&fileset->fs, filename);
 	}
 	else
 		lts->pfile = BufFileCreateTemp(false);
@@ -1096,7 +1096,7 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share)
 	/* Handle extra steps when caller is to share its tapeset */
 	if (share)
 	{
-		BufFileExportShared(lts->pfile);
+		BufFileExportFileSet(lts->pfile);
 		share->firstblocknumber = lt->firstBlockNumber;
 	}
 }
diff --git a/src/backend/utils/sort/sharedtuplestore.c b/src/backend/utils/sort/sharedtuplestore.c
index 57e35db..504ef1c 100644
--- a/src/backend/utils/sort/sharedtuplestore.c
+++ b/src/backend/utils/sort/sharedtuplestore.c
@@ -310,7 +310,8 @@ sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data,
 
 		/* Create one.  Only this backend will write into it. */
 		sts_filename(name, accessor, accessor->participant);
-		accessor->write_file = BufFileCreateShared(accessor->fileset, name);
+		accessor->write_file =
+			BufFileCreateFileSet(&accessor->fileset->fs, name);
 
 		/* Set up the shared state for this backend's file. */
 		participant = &accessor->sts->participants[accessor->participant];
@@ -559,7 +560,7 @@ sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
 
 				sts_filename(name, accessor, accessor->read_participant);
 				accessor->read_file =
-					BufFileOpenShared(accessor->fileset, name, O_RDONLY);
+					BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY);
 			}
 
 			/* Seek and load the chunk header. */
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 41c7487..a6c9d4e 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -79,6 +79,7 @@ extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
 extern void logicalrep_worker_stop(Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
+extern void logicalrep_worker_cleanupfileset(void);
 
 extern int	logicalrep_sync_worker_count(Oid subid);
 
diff --git a/src/include/storage/buffile.h b/src/include/storage/buffile.h
index 566523d..143eada 100644
--- a/src/include/storage/buffile.h
+++ b/src/include/storage/buffile.h
@@ -26,7 +26,7 @@
 #ifndef BUFFILE_H
 #define BUFFILE_H
 
-#include "storage/sharedfileset.h"
+#include "storage/fileset.h"
 
 /* BufFile is an opaque type whose details are not known outside buffile.c. */
 
@@ -46,11 +46,11 @@ extern int	BufFileSeekBlock(BufFile *file, long blknum);
 extern int64 BufFileSize(BufFile *file);
 extern long BufFileAppend(BufFile *target, BufFile *source);
 
-extern BufFile *BufFileCreateShared(SharedFileSet *fileset, const char *name);
-extern void BufFileExportShared(BufFile *file);
-extern BufFile *BufFileOpenShared(SharedFileSet *fileset, const char *name,
-								  int mode);
-extern void BufFileDeleteShared(SharedFileSet *fileset, const char *name);
-extern void BufFileTruncateShared(BufFile *file, int fileno, off_t offset);
+extern BufFile *BufFileCreateFileSet(FileSet *fileset, const char *name);
+extern void BufFileExportFileSet(BufFile *file);
+extern BufFile *BufFileOpenFileSet(FileSet *fileset, const char *name,
+								   int mode);
+extern void BufFileDeleteFileSet(FileSet *fileset, const char *name);
+extern void BufFileTruncateFileSet(BufFile *file, int fileno, off_t offset);
 
 #endif							/* BUFFILE_H */
diff --git a/src/include/storage/fileset.h b/src/include/storage/fileset.h
new file mode 100644
index 0000000..be0e097
--- /dev/null
+++ b/src/include/storage/fileset.h
@@ -0,0 +1,40 @@
+/*-------------------------------------------------------------------------
+ *
+ * fileset.h
+ *	  Management of named temporary files.
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/storage/fileset.h
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef FILESET_H
+#define FILESET_H
+
+#include "storage/fd.h"
+
+/*
+ * A set of temporary files.
+ */
+typedef struct FileSet
+{
+	pid_t		creator_pid;	/* PID of the creating process */
+	uint32		number;			/* per-PID identifier */
+	int			ntablespaces;	/* number of tablespaces to use */
+	Oid			tablespaces[8]; /* OIDs of tablespaces to use. Assumes that
+								 * it's rare that there more than temp
+								 * tablespaces. */
+} FileSet;
+
+extern void FileSetInit(FileSet *fileset);
+extern File FileSetCreate(FileSet *fileset, const char *name);
+extern File FileSetOpen(FileSet *fileset, const char *name,
+						int mode);
+extern bool FileSetDelete(FileSet *fileset, const char *name,
+						  bool error_on_failure);
+extern void FileSetDeleteAll(FileSet *fileset);
+
+#endif
diff --git a/src/include/storage/sharedfileset.h b/src/include/storage/sharedfileset.h
index 09ba121..59becfb 100644
--- a/src/include/storage/sharedfileset.h
+++ b/src/include/storage/sharedfileset.h
@@ -17,6 +17,7 @@
 
 #include "storage/dsm.h"
 #include "storage/fd.h"
+#include "storage/fileset.h"
 #include "storage/spin.h"
 
 /*
@@ -24,24 +25,13 @@
  */
 typedef struct SharedFileSet
 {
-	pid_t		creator_pid;	/* PID of the creating process */
-	uint32		number;			/* per-PID identifier */
+	FileSet		fs;
 	slock_t		mutex;			/* mutex protecting the reference count */
 	int			refcnt;			/* number of attached backends */
-	int			ntablespaces;	/* number of tablespaces to use */
-	Oid			tablespaces[8]; /* OIDs of tablespaces to use. Assumes that
-								 * it's rare that there more than temp
-								 * tablespaces. */
 } SharedFileSet;
 
 extern void SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg);
 extern void SharedFileSetAttach(SharedFileSet *fileset, dsm_segment *seg);
-extern File SharedFileSetCreate(SharedFileSet *fileset, const char *name);
-extern File SharedFileSetOpen(SharedFileSet *fileset, const char *name,
-							  int mode);
-extern bool SharedFileSetDelete(SharedFileSet *fileset, const char *name,
-								bool error_on_failure);
 extern void SharedFileSetDeleteAll(SharedFileSet *fileset);
-extern void SharedFileSetUnregister(SharedFileSet *input_fileset);
 
 #endif
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 621d0cb..f31a1e4 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -709,6 +709,7 @@ File
 FileFdwExecutionState
 FileFdwPlanState
 FileNameMap
+FileSet
 FileTag
 FinalPathExtraData
 FindColsContext
-- 
1.8.3.1

From 99a71be71a9db8b118890b71aab3797d0904285a Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumar@localhost.localdomain>
Date: Fri, 27 Aug 2021 11:49:12 +0530
Subject: [PATCH v6 2/2] Using fileset more effectively in the apply worker

Do not use separate file sets for each xid, instead use one fileset
for the worker's entire lifetime. Now, the changes/subxacts files
for every streaming transaction will be created under the same fileset
and the files will be deleted after the transaction is completed.
The fileset will be there until the worker exit.
---
 src/backend/replication/logical/launcher.c |   2 +-
 src/backend/replication/logical/worker.c   | 249 ++++++-----------------------
 src/backend/storage/file/buffile.c         |  23 ++-
 src/backend/utils/sort/logtape.c           |   2 +-
 src/backend/utils/sort/sharedtuplestore.c  |   3 +-
 src/include/storage/buffile.h              |   5 +-
 6 files changed, 75 insertions(+), 209 deletions(-)

diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 8b1772d..644a9c2 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -648,7 +648,7 @@ logicalrep_worker_onexit(int code, Datum arg)
 
 	logicalrep_worker_detach();
 
-	/* Cleanup filesets used for streaming transactions. */
+	/* Cleanup fileset used for streaming transactions. */
 	logicalrep_worker_cleanupfileset();
 
 	ApplyLauncherWakeup();
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index bfb7d1a..c3e4114 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -236,20 +236,6 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
 	.ts = 0,
 };
 
-/*
- * Stream xid hash entry. Whenever we see a new xid we create this entry in the
- * xidhash and along with it create the streaming file and store the fileset handle.
- * The subxact file is created iff there is any subxact info under this xid. This
- * entry is used on the subsequent streams for the xid to get the corresponding
- * fileset handles, so storing them in hash makes the search faster.
- */
-typedef struct StreamXidHash
-{
-	TransactionId xid;			/* xid is the hash key and must be first */
-	FileSet    *stream_fileset; /* file set for stream data */
-	FileSet    *subxact_fileset;	/* file set for subxact info */
-} StreamXidHash;
-
 static MemoryContext ApplyMessageContext = NULL;
 MemoryContext ApplyContext = NULL;
 
@@ -270,10 +256,13 @@ static bool in_streamed_transaction = false;
 static TransactionId stream_xid = InvalidTransactionId;
 
 /*
- * Hash table for storing the streaming xid information along with filesets
- * for streaming and subxact files.
+ * The fileset is used by the worker to create the changes and subxact files
+ * for the streaming transaction.  Upon the arrival of the first streaming
+ * transaction, the fileset will be initialized, and it will be deleted when
+ * the worker exits.  Under this, separate buffiles would be created for each
+ * transaction and would be deleted after the transaction is completed.
  */
-static HTAB *xidhash = NULL;
+static FileSet *stream_fileset = NULL;
 
 /* BufFile handle of the current streaming file */
 static BufFile *stream_fd = NULL;
@@ -1118,7 +1107,6 @@ static void
 apply_handle_stream_start(StringInfo s)
 {
 	bool		first_segment;
-	HASHCTL		hash_ctl;
 
 	if (in_streamed_transaction)
 		ereport(ERROR,
@@ -1148,17 +1136,20 @@ apply_handle_stream_start(StringInfo s)
 	set_apply_error_context_xact(stream_xid, 0);
 
 	/*
-	 * Initialize the xidhash table if we haven't yet. This will be used for
+	 * Initialize the stream_fileset if we haven't yet. This will be used for
 	 * the entire duration of the apply worker so create it in permanent
 	 * context.
 	 */
-	if (xidhash == NULL)
+	if (stream_fileset == NULL)
 	{
-		hash_ctl.keysize = sizeof(TransactionId);
-		hash_ctl.entrysize = sizeof(StreamXidHash);
-		hash_ctl.hcxt = ApplyContext;
-		xidhash = hash_create("StreamXidHash", 1024, &hash_ctl,
-							  HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+		MemoryContext oldctx;
+
+		oldctx = MemoryContextSwitchTo(ApplyContext);
+
+		stream_fileset = palloc(sizeof(FileSet));
+		FileSetInit(stream_fileset);
+
+		MemoryContextSwitchTo(oldctx);
 	}
 
 	/* open the spool file for this transaction */
@@ -1253,7 +1244,6 @@ apply_handle_stream_abort(StringInfo s)
 		BufFile    *fd;
 		bool		found = false;
 		char		path[MAXPGPATH];
-		StreamXidHash *ent;
 
 		set_apply_error_context_xact(subxid, 0);
 
@@ -1285,19 +1275,9 @@ apply_handle_stream_abort(StringInfo s)
 			return;
 		}
 
-		ent = (StreamXidHash *) hash_search(xidhash,
-											(void *) &xid,
-											HASH_FIND,
-											NULL);
-		if (!ent)
-			ereport(ERROR,
-					(errcode(ERRCODE_PROTOCOL_VIOLATION),
-					 errmsg_internal("transaction %u not found in stream XID hash table",
-									 xid)));
-
 		/* open the changes file */
 		changes_filename(path, MyLogicalRepWorker->subid, xid);
-		fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDWR);
+		fd = BufFileOpenFileSet(stream_fileset, path, O_RDWR, false);
 
 		/* OK, truncate the file at the right offset */
 		BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno,
@@ -1327,7 +1307,6 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
 	int			nchanges;
 	char		path[MAXPGPATH];
 	char	   *buffer = NULL;
-	StreamXidHash *ent;
 	MemoryContext oldcxt;
 	BufFile    *fd;
 
@@ -1345,17 +1324,7 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
 	changes_filename(path, MyLogicalRepWorker->subid, xid);
 	elog(DEBUG1, "replaying changes from file \"%s\"", path);
 
-	ent = (StreamXidHash *) hash_search(xidhash,
-										(void *) &xid,
-										HASH_FIND,
-										NULL);
-	if (!ent)
-		ereport(ERROR,
-				(errcode(ERRCODE_PROTOCOL_VIOLATION),
-				 errmsg_internal("transaction %u not found in stream XID hash table",
-								 xid)));
-
-	fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDONLY);
+	fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
 
 	buffer = palloc(BLCKSZ);
 	initStringInfo(&s2);
@@ -2542,27 +2511,14 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
 }
 
 /*
- * Cleanup filesets.
+ * Cleanup fileset.
  */
 void
-logicalrep_worker_cleanupfileset(void)
+logicalrep_worker_cleanupfileset()
 {
-	HASH_SEQ_STATUS status;
-	StreamXidHash *hentry;
-
-	/* Remove all the pending stream and subxact filesets. */
-	if (xidhash)
-	{
-		hash_seq_init(&status, xidhash);
-		while ((hentry = (StreamXidHash *) hash_seq_search(&status)) != NULL)
-		{
-			FileSetDeleteAll(hentry->stream_fileset);
-
-			/* Delete the subxact fileset iff it is created. */
-			if (hentry->subxact_fileset)
-				FileSetDeleteAll(hentry->subxact_fileset);
-		}
-	}
+	/* If the fileset is created, clean the underlying files. */
+	if (stream_fileset != NULL)
+		FileSetDeleteAll(stream_fileset);
 }
 
 /*
@@ -3026,58 +2982,29 @@ subxact_info_write(Oid subid, TransactionId xid)
 {
 	char		path[MAXPGPATH];
 	Size		len;
-	StreamXidHash *ent;
 	BufFile    *fd;
 
 	Assert(TransactionIdIsValid(xid));
 
-	/* Find the xid entry in the xidhash */
-	ent = (StreamXidHash *) hash_search(xidhash,
-										(void *) &xid,
-										HASH_FIND,
-										NULL);
-	/* By this time we must have created the transaction entry */
-	Assert(ent);
+	/* Get the subxact filename. */
+	subxact_filename(path, subid, xid);
 
 	/*
-	 * If there is no subtransaction then nothing to do, but if already have
-	 * subxact file then delete that.
+	 * If there are no subtransactions, there is nothing to be done, but if
+	 * subxacts already exist, delete it.
 	 */
 	if (subxact_data.nsubxacts == 0)
 	{
-		if (ent->subxact_fileset)
-		{
-			cleanup_subxact_info();
-			FileSetDeleteAll(ent->subxact_fileset);
-			pfree(ent->subxact_fileset);
-			ent->subxact_fileset = NULL;
-		}
+		cleanup_subxact_info();
+		BufFileDeleteFileSet(stream_fileset, path, true);
+
 		return;
 	}
 
-	subxact_filename(path, subid, xid);
-
-	/*
-	 * Create the subxact file if it not already created, otherwise open the
-	 * existing file.
-	 */
-	if (ent->subxact_fileset == NULL)
-	{
-		MemoryContext oldctx;
-
-		/*
-		 * We need to maintain fileset across multiple stream start/stop
-		 * calls. So, need to allocate it in a persistent context.
-		 */
-		oldctx = MemoryContextSwitchTo(ApplyContext);
-		ent->subxact_fileset = palloc(sizeof(FileSet));
-		FileSetInit(ent->subxact_fileset);
-		MemoryContextSwitchTo(oldctx);
-
-		fd = BufFileCreateFileSet(ent->subxact_fileset, path);
-	}
-	else
-		fd = BufFileOpenFileSet(ent->subxact_fileset, path, O_RDWR);
+	/* Open the subxact file, if it does not exist, create it. */
+	fd = BufFileOpenFileSet(stream_fileset, path, O_RDWR, true);
+	if (fd == NULL)
+		fd = BufFileCreateFileSet(stream_fileset, path);
 
 	len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
 
@@ -3104,34 +3031,20 @@ subxact_info_read(Oid subid, TransactionId xid)
 	char		path[MAXPGPATH];
 	Size		len;
 	BufFile    *fd;
-	StreamXidHash *ent;
 	MemoryContext oldctx;
 
 	Assert(!subxact_data.subxacts);
 	Assert(subxact_data.nsubxacts == 0);
 	Assert(subxact_data.nsubxacts_max == 0);
 
-	/* Find the stream xid entry in the xidhash */
-	ent = (StreamXidHash *) hash_search(xidhash,
-										(void *) &xid,
-										HASH_FIND,
-										NULL);
-	if (!ent)
-		ereport(ERROR,
-				(errcode(ERRCODE_PROTOCOL_VIOLATION),
-				 errmsg_internal("transaction %u not found in stream XID hash table",
-								 xid)));
-
 	/*
-	 * If subxact_fileset is not valid that mean we don't have any subxact
-	 * info
+	 * Open the subxact file for the input streaming xid, just return if the
+	 * file does not exist.
 	 */
-	if (ent->subxact_fileset == NULL)
-		return;
-
 	subxact_filename(path, subid, xid);
-
-	fd = BufFileOpenFileSet(ent->subxact_fileset, path, O_RDONLY);
+	fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, true);
+	if (fd == NULL)
+		return;
 
 	/* read number of subxact items */
 	if (BufFileRead(fd, &subxact_data.nsubxacts,
@@ -3267,42 +3180,20 @@ changes_filename(char *path, Oid subid, TransactionId xid)
  *	  Cleanup files for a subscription / toplevel transaction.
  *
  * Remove files with serialized changes and subxact info for a particular
- * toplevel transaction. Each subscription has a separate set of files.
+ * toplevel transaction. Each subscription has a separate file.
  */
 static void
 stream_cleanup_files(Oid subid, TransactionId xid)
 {
 	char		path[MAXPGPATH];
-	StreamXidHash *ent;
-
-	/* Find the xid entry in the xidhash */
-	ent = (StreamXidHash *) hash_search(xidhash,
-										(void *) &xid,
-										HASH_FIND,
-										NULL);
-	if (!ent)
-		ereport(ERROR,
-				(errcode(ERRCODE_PROTOCOL_VIOLATION),
-				 errmsg_internal("transaction %u not found in stream XID hash table",
-								 xid)));
 
-	/* Delete the change file and release the stream fileset memory */
+	/* Delete the changes file. */
 	changes_filename(path, subid, xid);
-	FileSetDeleteAll(ent->stream_fileset);
-	pfree(ent->stream_fileset);
-	ent->stream_fileset = NULL;
-
-	/* Delete the subxact file and release the memory, if it exist */
-	if (ent->subxact_fileset)
-	{
-		subxact_filename(path, subid, xid);
-		FileSetDeleteAll(ent->subxact_fileset);
-		pfree(ent->subxact_fileset);
-		ent->subxact_fileset = NULL;
-	}
+	BufFileDeleteFileSet(stream_fileset, path, false);
 
-	/* Remove the xid entry from the stream xid hash */
-	hash_search(xidhash, (void *) &xid, HASH_REMOVE, NULL);
+	/* Delete the subxact file, if it exist. */
+	subxact_filename(path, subid, xid);
+	BufFileDeleteFileSet(stream_fileset, path, true);
 }
 
 /*
@@ -3312,8 +3203,8 @@ stream_cleanup_files(Oid subid, TransactionId xid)
  *
  * Open a file for streamed changes from a toplevel transaction identified
  * by stream_xid (global variable). If it's the first chunk of streamed
- * changes for this transaction, initialize the fileset and create the buffile,
- * otherwise open the previously created file.
+ * changes for this transaction, create the buffile, otherwise open the
+ * previously created file.
  *
  * This can only be called at the beginning of a "streaming" block, i.e.
  * between stream_start/stream_stop messages from the upstream.
@@ -3322,20 +3213,13 @@ static void
 stream_open_file(Oid subid, TransactionId xid, bool first_segment)
 {
 	char		path[MAXPGPATH];
-	bool		found;
 	MemoryContext oldcxt;
-	StreamXidHash *ent;
 
 	Assert(in_streamed_transaction);
 	Assert(OidIsValid(subid));
 	Assert(TransactionIdIsValid(xid));
 	Assert(stream_fd == NULL);
 
-	/* create or find the xid entry in the xidhash */
-	ent = (StreamXidHash *) hash_search(xidhash,
-										(void *) &xid,
-										HASH_ENTER,
-										&found);
 
 	changes_filename(path, subid, xid);
 	elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
@@ -3347,49 +3231,18 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
 	oldcxt = MemoryContextSwitchTo(LogicalStreamingContext);
 
 	/*
-	 * If this is the first streamed segment, the file must not exist, so make
-	 * sure we're the ones creating it. Otherwise just open the file for
-	 * writing, in append mode.
+	 * If this is the first streamed segment, create the changes file.
+	 * Otherwise, just open the file for writing, in append mode.
 	 */
 	if (first_segment)
-	{
-		MemoryContext savectx;
-		FileSet    *fileset;
-
-		if (found)
-			ereport(ERROR,
-					(errcode(ERRCODE_PROTOCOL_VIOLATION),
-					 errmsg_internal("incorrect first-segment flag for streamed replication transaction")));
-
-		/*
-		 * We need to maintain fileset across multiple stream start/stop
-		 * calls. So, need to allocate it in a persistent context.
-		 */
-		savectx = MemoryContextSwitchTo(ApplyContext);
-		fileset = palloc(sizeof(FileSet));
-
-		FileSetInit(fileset);
-		MemoryContextSwitchTo(savectx);
-
-		stream_fd = BufFileCreateFileSet(fileset, path);
-
-		/* Remember the fileset for the next stream of the same transaction */
-		ent->xid = xid;
-		ent->stream_fileset = fileset;
-		ent->subxact_fileset = NULL;
-	}
+		stream_fd = BufFileCreateFileSet(stream_fileset, path);
 	else
 	{
-		if (!found)
-			ereport(ERROR,
-					(errcode(ERRCODE_PROTOCOL_VIOLATION),
-					 errmsg_internal("incorrect first-segment flag for streamed replication transaction")));
-
 		/*
 		 * Open the file and seek to the end of the file because we always
 		 * append the changes file.
 		 */
-		stream_fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDWR);
+		stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDWR, false);
 		BufFileSeek(stream_fd, 0, 0, SEEK_END);
 	}
 
diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c
index 5e5409d..d96b25d 100644
--- a/src/backend/storage/file/buffile.c
+++ b/src/backend/storage/file/buffile.c
@@ -278,10 +278,12 @@ BufFileCreateFileSet(FileSet *fileset, const char *name)
  * with BufFileCreateFileSet in the same FileSet using the same name.
  * The backend that created the file must have called BufFileClose() or
  * BufFileExportFileSet() to make sure that it is ready to be opened by other
- * backends and render it read-only.
+ * backends and render it read-only.  If missing_ok is true, it will return
+ * NULL if the file does not exist otherwise, it will throw an error.
  */
 BufFile *
-BufFileOpenFileSet(FileSet *fileset, const char *name, int mode)
+BufFileOpenFileSet(FileSet *fileset, const char *name, int mode,
+				   bool missing_ok)
 {
 	BufFile    *file;
 	char		segment_name[MAXPGPATH];
@@ -318,10 +320,18 @@ BufFileOpenFileSet(FileSet *fileset, const char *name, int mode)
 	 * name.
 	 */
 	if (nfiles == 0)
+	{
+		/* free the memory */
+		pfree(files);
+
+		if (missing_ok)
+			return NULL;
+
 		ereport(ERROR,
 				(errcode_for_file_access(),
 				 errmsg("could not open temporary file \"%s\" from BufFile \"%s\": %m",
 						segment_name, name)));
+	}
 
 	file = makeBufFileCommon(nfiles);
 	file->files = files;
@@ -341,10 +351,11 @@ BufFileOpenFileSet(FileSet *fileset, const char *name, int mode)
  * the FileSet to be cleaned up.
  *
  * Only one backend should attempt to delete a given name, and should know
- * that it exists and has been exported or closed.
+ * that it exists and has been exported or closed otherwise missing_ok should
+ * be passed true.
  */
 void
-BufFileDeleteFileSet(FileSet *fileset, const char *name)
+BufFileDeleteFileSet(FileSet *fileset, const char *name, bool missing_ok)
 {
 	char		segment_name[MAXPGPATH];
 	int			segment = 0;
@@ -358,7 +369,7 @@ BufFileDeleteFileSet(FileSet *fileset, const char *name)
 	for (;;)
 	{
 		FileSetSegmentName(segment_name, name, segment);
-		if (!FileSetDelete(fileset, segment_name, true))
+		if (!FileSetDelete(fileset, segment_name, !missing_ok))
 			break;
 		found = true;
 		++segment;
@@ -366,7 +377,7 @@ BufFileDeleteFileSet(FileSet *fileset, const char *name)
 		CHECK_FOR_INTERRUPTS();
 	}
 
-	if (!found)
+	if (!found && !missing_ok)
 		elog(ERROR, "could not delete unknown BufFile \"%s\"", name);
 }
 
diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c
index f7994d7..debf12e1 100644
--- a/src/backend/utils/sort/logtape.c
+++ b/src/backend/utils/sort/logtape.c
@@ -564,7 +564,7 @@ ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
 		lt = &lts->tapes[i];
 
 		pg_itoa(i, filename);
-		file = BufFileOpenFileSet(&fileset->fs, filename, O_RDONLY);
+		file = BufFileOpenFileSet(&fileset->fs, filename, O_RDONLY, false);
 		filesize = BufFileSize(file);
 
 		/*
diff --git a/src/backend/utils/sort/sharedtuplestore.c b/src/backend/utils/sort/sharedtuplestore.c
index 504ef1c..033088f 100644
--- a/src/backend/utils/sort/sharedtuplestore.c
+++ b/src/backend/utils/sort/sharedtuplestore.c
@@ -560,7 +560,8 @@ sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
 
 				sts_filename(name, accessor, accessor->read_participant);
 				accessor->read_file =
-					BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY);
+					BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY,
+									   false);
 			}
 
 			/* Seek and load the chunk header. */
diff --git a/src/include/storage/buffile.h b/src/include/storage/buffile.h
index 143eada..7ae5ea2 100644
--- a/src/include/storage/buffile.h
+++ b/src/include/storage/buffile.h
@@ -49,8 +49,9 @@ extern long BufFileAppend(BufFile *target, BufFile *source);
 extern BufFile *BufFileCreateFileSet(FileSet *fileset, const char *name);
 extern void BufFileExportFileSet(BufFile *file);
 extern BufFile *BufFileOpenFileSet(FileSet *fileset, const char *name,
-								   int mode);
-extern void BufFileDeleteFileSet(FileSet *fileset, const char *name);
+								   int mode, bool missing_ok);
+extern void BufFileDeleteFileSet(FileSet *fileset, const char *name,
+								 bool missing_ok);
 extern void BufFileTruncateFileSet(BufFile *file, int fileno, off_t offset);
 
 #endif							/* BUFFILE_H */
-- 
1.8.3.1

Reply via email to