From 94c6673707af74638f85c33ae33c253c22f9ad63 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumar@localhost.localdomain>
Date: Mon, 16 Aug 2021 15:49:12 +0530
Subject: [PATCH] Better usage of sharedfileset in apply worker

Instead of using a separate shared fileset for each xid, use one shared
fileset for whole lifetime of the worker.  So for each xid, just create
shared buffile under that shared fileset and remove the file whenever we
are done with the file.  For subxact file we only need to create once
we get the first subtransaction and for detecting that we also extend the
buffile open and buffile delete interfaces to allow the missing files.
---
 src/backend/replication/logical/worker.c  | 217 +++++++-----------------------
 src/backend/storage/file/buffile.c        |  21 ++-
 src/backend/storage/file/sharedfileset.c  |  79 -----------
 src/backend/utils/sort/logtape.c          |   2 +-
 src/backend/utils/sort/sharedtuplestore.c |   2 +-
 src/include/storage/buffile.h             |   5 +-
 6 files changed, 68 insertions(+), 258 deletions(-)

diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index ecaed15..b747593 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -221,20 +221,6 @@ typedef struct ApplyExecutionData
 	PartitionTupleRouting *proute;	/* partition routing info */
 } ApplyExecutionData;
 
-/*
- * Stream xid hash entry. Whenever we see a new xid we create this entry in the
- * xidhash and along with it create the streaming file and store the fileset handle.
- * The subxact file is created iff there is any subxact info under this xid. This
- * entry is used on the subsequent streams for the xid to get the corresponding
- * fileset handles, so storing them in hash makes the search faster.
- */
-typedef struct StreamXidHash
-{
-	TransactionId xid;			/* xid is the hash key and must be first */
-	SharedFileSet *stream_fileset;	/* shared file set for stream data */
-	SharedFileSet *subxact_fileset; /* shared file set for subxact info */
-} StreamXidHash;
-
 static MemoryContext ApplyMessageContext = NULL;
 MemoryContext ApplyContext = NULL;
 
@@ -255,10 +241,12 @@ static bool in_streamed_transaction = false;
 static TransactionId stream_xid = InvalidTransactionId;
 
 /*
- * Hash table for storing the streaming xid information along with shared file
- * set for streaming and subxact files.
+ * Shared fileset for storing the changes and subxact information for the
+ * streaming transaction.  We will use only one shared fileset and for each
+ * xid a separate changes and subxact files will be created under the same
+ * shared fileset.
  */
-static HTAB *xidhash = NULL;
+static SharedFileSet *xidfileset = NULL;
 
 /* BufFile handle of the current streaming file */
 static BufFile *stream_fd = NULL;
@@ -1129,7 +1117,6 @@ static void
 apply_handle_stream_start(StringInfo s)
 {
 	bool		first_segment;
-	HASHCTL		hash_ctl;
 
 	if (in_streamed_transaction)
 		ereport(ERROR,
@@ -1157,17 +1144,22 @@ apply_handle_stream_start(StringInfo s)
 				 errmsg_internal("invalid transaction ID in streamed replication transaction")));
 
 	/*
-	 * Initialize the xidhash table if we haven't yet. This will be used for
+	 * Initialize the xidfileset if we haven't yet. This will be used for
 	 * the entire duration of the apply worker so create it in permanent
 	 * context.
 	 */
-	if (xidhash == NULL)
+	if (xidfileset == NULL)
 	{
-		hash_ctl.keysize = sizeof(TransactionId);
-		hash_ctl.entrysize = sizeof(StreamXidHash);
-		hash_ctl.hcxt = ApplyContext;
-		xidhash = hash_create("StreamXidHash", 1024, &hash_ctl,
-							  HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+		MemoryContext oldctx;
+
+		/*
+		 * We need to maintain shared fileset across multiple stream
+		 * start/stop calls.  So, need to allocate it in a persistent context.
+		 */
+		oldctx = MemoryContextSwitchTo(ApplyContext);
+		xidfileset = palloc(sizeof(SharedFileSet));
+		SharedFileSetInit(xidfileset, NULL);
+		MemoryContextSwitchTo(oldctx);
 	}
 
 	/* open the spool file for this transaction */
@@ -1258,7 +1250,6 @@ apply_handle_stream_abort(StringInfo s)
 		BufFile    *fd;
 		bool		found = false;
 		char		path[MAXPGPATH];
-		StreamXidHash *ent;
 
 		subidx = -1;
 		begin_replication_step();
@@ -1287,19 +1278,9 @@ apply_handle_stream_abort(StringInfo s)
 			return;
 		}
 
-		ent = (StreamXidHash *) hash_search(xidhash,
-											(void *) &xid,
-											HASH_FIND,
-											NULL);
-		if (!ent)
-			ereport(ERROR,
-					(errcode(ERRCODE_PROTOCOL_VIOLATION),
-					 errmsg_internal("transaction %u not found in stream XID hash table",
-									 xid)));
-
 		/* open the changes file */
 		changes_filename(path, MyLogicalRepWorker->subid, xid);
-		fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR);
+		fd = BufFileOpenShared(xidfileset, path, O_RDWR, false);
 
 		/* OK, truncate the file at the right offset */
 		BufFileTruncateShared(fd, subxact_data.subxacts[subidx].fileno,
@@ -1327,7 +1308,6 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
 	int			nchanges;
 	char		path[MAXPGPATH];
 	char	   *buffer = NULL;
-	StreamXidHash *ent;
 	MemoryContext oldcxt;
 	BufFile    *fd;
 
@@ -1345,17 +1325,7 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
 	changes_filename(path, MyLogicalRepWorker->subid, xid);
 	elog(DEBUG1, "replaying changes from file \"%s\"", path);
 
-	ent = (StreamXidHash *) hash_search(xidhash,
-										(void *) &xid,
-										HASH_FIND,
-										NULL);
-	if (!ent)
-		ereport(ERROR,
-				(errcode(ERRCODE_PROTOCOL_VIOLATION),
-				 errmsg_internal("transaction %u not found in stream XID hash table",
-								 xid)));
-
-	fd = BufFileOpenShared(ent->stream_fileset, path, O_RDONLY);
+	fd = BufFileOpenShared(xidfileset, path, O_RDONLY, false);
 
 	buffer = palloc(BLCKSZ);
 	initStringInfo(&s2);
@@ -2509,6 +2479,16 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
 }
 
 /*
+* Cleanup shared fileset if created.
+*/
+static void
+worker_cleanup(int code, Datum arg)
+{
+	if (xidfileset != NULL)
+		SharedFileSetDeleteAll(xidfileset);
+}
+
+/*
  * Apply main loop.
  */
 static void
@@ -2534,6 +2514,9 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 													"LogicalStreamingContext",
 													ALLOCSET_DEFAULT_SIZES);
 
+	/* do cleanup on worker exit (e.g. after DROP SUBSCRIPTION) */
+	on_shmem_exit(worker_cleanup, (Datum) 0);
+
 	/* mark as idle, before starting to loop */
 	pgstat_report_activity(STATE_IDLE, NULL);
 
@@ -2957,18 +2940,11 @@ subxact_info_write(Oid subid, TransactionId xid)
 {
 	char		path[MAXPGPATH];
 	Size		len;
-	StreamXidHash *ent;
 	BufFile    *fd;
 
 	Assert(TransactionIdIsValid(xid));
 
-	/* Find the xid entry in the xidhash */
-	ent = (StreamXidHash *) hash_search(xidhash,
-										(void *) &xid,
-										HASH_FIND,
-										NULL);
-	/* By this time we must have created the transaction entry */
-	Assert(ent);
+	subxact_filename(path, subid, xid);
 
 	/*
 	 * If there is no subtransaction then nothing to do, but if already have
@@ -2976,39 +2952,17 @@ subxact_info_write(Oid subid, TransactionId xid)
 	 */
 	if (subxact_data.nsubxacts == 0)
 	{
-		if (ent->subxact_fileset)
-		{
-			cleanup_subxact_info();
-			SharedFileSetDeleteAll(ent->subxact_fileset);
-			pfree(ent->subxact_fileset);
-			ent->subxact_fileset = NULL;
-		}
+		cleanup_subxact_info();
+		BufFileDeleteShared(xidfileset, path, true);
 		return;
 	}
 
 	subxact_filename(path, subid, xid);
 
-	/*
-	 * Create the subxact file if it not already created, otherwise open the
-	 * existing file.
-	 */
-	if (ent->subxact_fileset == NULL)
-	{
-		MemoryContext oldctx;
-
-		/*
-		 * We need to maintain shared fileset across multiple stream
-		 * start/stop calls.  So, need to allocate it in a persistent context.
-		 */
-		oldctx = MemoryContextSwitchTo(ApplyContext);
-		ent->subxact_fileset = palloc(sizeof(SharedFileSet));
-		SharedFileSetInit(ent->subxact_fileset, NULL);
-		MemoryContextSwitchTo(oldctx);
-
-		fd = BufFileCreateShared(ent->subxact_fileset, path);
-	}
-	else
-		fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDWR);
+	/* Try to open the subxact file, if it doesn't exist then create it */
+	fd = BufFileOpenShared(xidfileset, path, O_RDWR, true);
+	if (fd == NULL)
+		fd = BufFileCreateShared(xidfileset, path);
 
 	len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
 
@@ -3035,35 +2989,22 @@ subxact_info_read(Oid subid, TransactionId xid)
 	char		path[MAXPGPATH];
 	Size		len;
 	BufFile    *fd;
-	StreamXidHash *ent;
 	MemoryContext oldctx;
 
 	Assert(!subxact_data.subxacts);
 	Assert(subxact_data.nsubxacts == 0);
 	Assert(subxact_data.nsubxacts_max == 0);
 
-	/* Find the stream xid entry in the xidhash */
-	ent = (StreamXidHash *) hash_search(xidhash,
-										(void *) &xid,
-										HASH_FIND,
-										NULL);
-	if (!ent)
-		ereport(ERROR,
-				(errcode(ERRCODE_PROTOCOL_VIOLATION),
-				 errmsg_internal("transaction %u not found in stream XID hash table",
-								 xid)));
+	subxact_filename(path, subid, xid);
 
 	/*
-	 * If subxact_fileset is not valid that mean we don't have any subxact
-	 * info
+	 * Open the subxact file.  If subxact file is not created that mean we
+	 * don't have any subxact info so nothing to be done.
 	 */
-	if (ent->subxact_fileset == NULL)
+	fd = BufFileOpenShared(xidfileset, path, O_RDONLY, true);
+	if (fd == NULL)
 		return;
 
-	subxact_filename(path, subid, xid);
-
-	fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDONLY);
-
 	/* read number of subxact items */
 	if (BufFileRead(fd, &subxact_data.nsubxacts,
 					sizeof(subxact_data.nsubxacts)) !=
@@ -3204,36 +3145,14 @@ static void
 stream_cleanup_files(Oid subid, TransactionId xid)
 {
 	char		path[MAXPGPATH];
-	StreamXidHash *ent;
-
-	/* Find the xid entry in the xidhash */
-	ent = (StreamXidHash *) hash_search(xidhash,
-										(void *) &xid,
-										HASH_FIND,
-										NULL);
-	if (!ent)
-		ereport(ERROR,
-				(errcode(ERRCODE_PROTOCOL_VIOLATION),
-				 errmsg_internal("transaction %u not found in stream XID hash table",
-								 xid)));
 
 	/* Delete the change file and release the stream fileset memory */
 	changes_filename(path, subid, xid);
-	SharedFileSetDeleteAll(ent->stream_fileset);
-	pfree(ent->stream_fileset);
-	ent->stream_fileset = NULL;
+	BufFileDeleteShared(xidfileset, path, false);
 
 	/* Delete the subxact file and release the memory, if it exist */
-	if (ent->subxact_fileset)
-	{
-		subxact_filename(path, subid, xid);
-		SharedFileSetDeleteAll(ent->subxact_fileset);
-		pfree(ent->subxact_fileset);
-		ent->subxact_fileset = NULL;
-	}
-
-	/* Remove the xid entry from the stream xid hash */
-	hash_search(xidhash, (void *) &xid, HASH_REMOVE, NULL);
+	subxact_filename(path, subid, xid);
+	SharedFileSetDelete(xidfileset, path, true);
 }
 
 /*
@@ -3253,21 +3172,13 @@ static void
 stream_open_file(Oid subid, TransactionId xid, bool first_segment)
 {
 	char		path[MAXPGPATH];
-	bool		found;
 	MemoryContext oldcxt;
-	StreamXidHash *ent;
 
 	Assert(in_streamed_transaction);
 	Assert(OidIsValid(subid));
 	Assert(TransactionIdIsValid(xid));
 	Assert(stream_fd == NULL);
 
-	/* create or find the xid entry in the xidhash */
-	ent = (StreamXidHash *) hash_search(xidhash,
-										(void *) &xid,
-										HASH_ENTER,
-										&found);
-
 	changes_filename(path, subid, xid);
 	elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
 
@@ -3283,44 +3194,14 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
 	 * writing, in append mode.
 	 */
 	if (first_segment)
-	{
-		MemoryContext savectx;
-		SharedFileSet *fileset;
-
-		if (found)
-			ereport(ERROR,
-					(errcode(ERRCODE_PROTOCOL_VIOLATION),
-					 errmsg_internal("incorrect first-segment flag for streamed replication transaction")));
-
-		/*
-		 * We need to maintain shared fileset across multiple stream
-		 * start/stop calls. So, need to allocate it in a persistent context.
-		 */
-		savectx = MemoryContextSwitchTo(ApplyContext);
-		fileset = palloc(sizeof(SharedFileSet));
-
-		SharedFileSetInit(fileset, NULL);
-		MemoryContextSwitchTo(savectx);
-
-		stream_fd = BufFileCreateShared(fileset, path);
-
-		/* Remember the fileset for the next stream of the same transaction */
-		ent->xid = xid;
-		ent->stream_fileset = fileset;
-		ent->subxact_fileset = NULL;
-	}
+		stream_fd = BufFileCreateShared(xidfileset, path);
 	else
 	{
-		if (!found)
-			ereport(ERROR,
-					(errcode(ERRCODE_PROTOCOL_VIOLATION),
-					 errmsg_internal("incorrect first-segment flag for streamed replication transaction")));
-
 		/*
 		 * Open the file and seek to the end of the file because we always
 		 * append the changes file.
 		 */
-		stream_fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR);
+		stream_fd = BufFileOpenShared(xidfileset, path, O_RDWR, false);
 		BufFileSeek(stream_fd, 0, 0, SEEK_END);
 	}
 
diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c
index a4be5fe..99a1d3d 100644
--- a/src/backend/storage/file/buffile.c
+++ b/src/backend/storage/file/buffile.c
@@ -278,10 +278,12 @@ BufFileCreateShared(SharedFileSet *fileset, const char *name)
  * with BufFileCreateShared in the same SharedFileSet using the same name.
  * The backend that created the file must have called BufFileClose() or
  * BufFileExportShared() to make sure that it is ready to be opened by other
- * backends and render it read-only.
+ * backends and render it read-only.  If missing_ok is true then it will return
+ * NULL if file doesn not exist otherwise error.
  */
 BufFile *
-BufFileOpenShared(SharedFileSet *fileset, const char *name, int mode)
+BufFileOpenShared(SharedFileSet *fileset, const char *name, int mode,
+				  bool missing_ok)
 {
 	BufFile    *file;
 	char		segment_name[MAXPGPATH];
@@ -318,10 +320,14 @@ BufFileOpenShared(SharedFileSet *fileset, const char *name, int mode)
 	 * name.
 	 */
 	if (nfiles == 0)
+	{
+		if (missing_ok)
+			return NULL;
 		ereport(ERROR,
 				(errcode_for_file_access(),
-				 errmsg("could not open temporary file \"%s\" from BufFile \"%s\": %m",
+				errmsg("could not open temporary file \"%s\" from BufFile \"%s\": %m",
 						segment_name, name)));
+	}
 
 	file = makeBufFileCommon(nfiles);
 	file->files = files;
@@ -341,10 +347,11 @@ BufFileOpenShared(SharedFileSet *fileset, const char *name, int mode)
  * the SharedFileSet to be cleaned up.
  *
  * Only one backend should attempt to delete a given name, and should know
- * that it exists and has been exported or closed.
+ * that it exists and has been exported or closed otherwise missing_ok should
+ * be passed true.
  */
 void
-BufFileDeleteShared(SharedFileSet *fileset, const char *name)
+BufFileDeleteShared(SharedFileSet *fileset, const char *name, bool missing_ok)
 {
 	char		segment_name[MAXPGPATH];
 	int			segment = 0;
@@ -358,7 +365,7 @@ BufFileDeleteShared(SharedFileSet *fileset, const char *name)
 	for (;;)
 	{
 		SharedSegmentName(segment_name, name, segment);
-		if (!SharedFileSetDelete(fileset, segment_name, true))
+		if (!SharedFileSetDelete(fileset, segment_name, !missing_ok))
 			break;
 		found = true;
 		++segment;
@@ -366,7 +373,7 @@ BufFileDeleteShared(SharedFileSet *fileset, const char *name)
 		CHECK_FOR_INTERRUPTS();
 	}
 
-	if (!found)
+	if (!found && !missing_ok)
 		elog(ERROR, "could not delete unknown shared BufFile \"%s\"", name);
 }
 
diff --git a/src/backend/storage/file/sharedfileset.c b/src/backend/storage/file/sharedfileset.c
index ed37c94..832b0a6 100644
--- a/src/backend/storage/file/sharedfileset.c
+++ b/src/backend/storage/file/sharedfileset.c
@@ -33,10 +33,7 @@
 #include "storage/sharedfileset.h"
 #include "utils/builtins.h"
 
-static List *filesetlist = NIL;
-
 static void SharedFileSetOnDetach(dsm_segment *segment, Datum datum);
-static void SharedFileSetDeleteOnProcExit(int status, Datum arg);
 static void SharedFileSetPath(char *path, SharedFileSet *fileset, Oid tablespace);
 static void SharedFilePath(char *path, SharedFileSet *fileset, const char *name);
 static Oid	ChooseTablespace(const SharedFileSet *fileset, const char *name);
@@ -101,23 +98,6 @@ SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg)
 	/* Register our cleanup callback. */
 	if (seg)
 		on_dsm_detach(seg, SharedFileSetOnDetach, PointerGetDatum(fileset));
-	else
-	{
-		static bool registered_cleanup = false;
-
-		if (!registered_cleanup)
-		{
-			/*
-			 * We must not have registered any fileset before registering the
-			 * fileset clean up.
-			 */
-			Assert(filesetlist == NIL);
-			on_proc_exit(SharedFileSetDeleteOnProcExit, 0);
-			registered_cleanup = true;
-		}
-
-		filesetlist = lcons((void *) fileset, filesetlist);
-	}
 }
 
 /*
@@ -225,9 +205,6 @@ SharedFileSetDeleteAll(SharedFileSet *fileset)
 		SharedFileSetPath(dirpath, fileset, fileset->tablespaces[i]);
 		PathNameDeleteTemporaryDir(dirpath);
 	}
-
-	/* Unregister the shared fileset */
-	SharedFileSetUnregister(fileset);
 }
 
 /*
@@ -259,62 +236,6 @@ SharedFileSetOnDetach(dsm_segment *segment, Datum datum)
 }
 
 /*
- * Callback function that will be invoked on the process exit.  This will
- * process the list of all the registered sharedfilesets and delete the
- * underlying files.
- */
-static void
-SharedFileSetDeleteOnProcExit(int status, Datum arg)
-{
-	/*
-	 * Remove all the pending shared fileset entries. We don't use foreach()
-	 * here because SharedFileSetDeleteAll will remove the current element in
-	 * filesetlist. Though we have used foreach_delete_current() to remove the
-	 * element from filesetlist it could only fix up the state of one of the
-	 * loops, see SharedFileSetUnregister.
-	 */
-	while (list_length(filesetlist) > 0)
-	{
-		SharedFileSet *fileset = (SharedFileSet *) linitial(filesetlist);
-
-		SharedFileSetDeleteAll(fileset);
-	}
-
-	filesetlist = NIL;
-}
-
-/*
- * Unregister the shared fileset entry registered for cleanup on proc exit.
- */
-void
-SharedFileSetUnregister(SharedFileSet *input_fileset)
-{
-	ListCell   *l;
-
-	/*
-	 * If the caller is following the dsm based cleanup then we don't maintain
-	 * the filesetlist so return.
-	 */
-	if (filesetlist == NIL)
-		return;
-
-	foreach(l, filesetlist)
-	{
-		SharedFileSet *fileset = (SharedFileSet *) lfirst(l);
-
-		/* Remove the entry from the list */
-		if (input_fileset == fileset)
-		{
-			filesetlist = foreach_delete_current(filesetlist, l);
-			return;
-		}
-	}
-
-	/* Should have found a match */
-	Assert(false);
-}
-
-/*
  * Build the path for the directory holding the files backing a SharedFileSet
  * in a given tablespace.
  */
diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c
index cafc087..08612f0 100644
--- a/src/backend/utils/sort/logtape.c
+++ b/src/backend/utils/sort/logtape.c
@@ -564,7 +564,7 @@ ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
 		lt = &lts->tapes[i];
 
 		pg_itoa(i, filename);
-		file = BufFileOpenShared(fileset, filename, O_RDONLY);
+		file = BufFileOpenShared(fileset, filename, O_RDONLY, false);
 		filesize = BufFileSize(file);
 
 		/*
diff --git a/src/backend/utils/sort/sharedtuplestore.c b/src/backend/utils/sort/sharedtuplestore.c
index 57e35db..ad18991 100644
--- a/src/backend/utils/sort/sharedtuplestore.c
+++ b/src/backend/utils/sort/sharedtuplestore.c
@@ -559,7 +559,7 @@ sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
 
 				sts_filename(name, accessor, accessor->read_participant);
 				accessor->read_file =
-					BufFileOpenShared(accessor->fileset, name, O_RDONLY);
+					BufFileOpenShared(accessor->fileset, name, O_RDONLY, false);
 			}
 
 			/* Seek and load the chunk header. */
diff --git a/src/include/storage/buffile.h b/src/include/storage/buffile.h
index 566523d..3f997e1 100644
--- a/src/include/storage/buffile.h
+++ b/src/include/storage/buffile.h
@@ -49,8 +49,9 @@ extern long BufFileAppend(BufFile *target, BufFile *source);
 extern BufFile *BufFileCreateShared(SharedFileSet *fileset, const char *name);
 extern void BufFileExportShared(BufFile *file);
 extern BufFile *BufFileOpenShared(SharedFileSet *fileset, const char *name,
-								  int mode);
-extern void BufFileDeleteShared(SharedFileSet *fileset, const char *name);
+								  int mode, bool missing_ok);
+extern void BufFileDeleteShared(SharedFileSet *fileset, const char *name,
+								bool missing_ok);
 extern void BufFileTruncateShared(BufFile *file, int fileno, off_t offset);
 
 #endif							/* BUFFILE_H */
-- 
1.8.3.1

