From 601065cf3a2f54fefba9472e53207a181afb168b Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@enterprisedb.com>
Date: Thu, 17 Aug 2017 23:15:09 +1200
Subject: [PATCH 7/9] Add BufFileSet for sharing temporary files between
 backends.

BufFileSet allows temporary files to be created by one backend and
then exported for read-only access by other backends, with clean-up
managed by reference counting associated with a DSM segment.  This includes
changes to fd.c and buffile.c to support new kinds of temporary file.
A BufFileSet is backed by a directory full of temporary files.

Since File closing and unlinking are now disconnected, with the former
performed in every backends and the latter only in the last to detach,
there was an order-of-operations problem in resowner.c: DSM detach callbacks
are invoked before closing File handles.  Move DSM detach handling to the very
last moment in resowner's cleanup.  Without this reordering, unlinking would
risk failing on Windows where open files prevent that.

Introduce a new reusable function pair for deduplicating sorted arrays as
infrastructure.

Author: Thomas Munro
Reviewed-By: Peter Geoghegan, Andres Freund, Robert Haas
Discussion: https://postgr.es/m/CAEepm=2W=cOkiZxcg6qiFQP-dHUe09aqTrEMM7yJDrHMhDv_RA@mail.gmail.com
Discussion: https://postgr.es/m/CAH2-WznJ_UgLux=_jTgCQ4yFz0iBntudsNKa1we3kN1BAG=88w@mail.gmail.com
Discussion: https://postgr.es/m/CAEepm%3D2vmFTNpAmwbGGD2WaryM6T3hSDVKQPfUwjdD_5XY6vAA%40mail.gmail.com
---
 src/backend/storage/file/buffile.c    | 478 +++++++++++++++++++++++++++++++++-
 src/backend/storage/file/fd.c         | 281 ++++++++++++++++++--
 src/backend/utils/resowner/resowner.c |  20 +-
 src/include/lib/arrayutils.h          |  68 +++++
 src/include/storage/buffile.h         |  13 +-
 src/include/storage/fd.h              |  10 +-
 6 files changed, 829 insertions(+), 41 deletions(-)

diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c
index a053ca459c2..ce6a24d2221 100644
--- a/src/backend/storage/file/buffile.c
+++ b/src/backend/storage/file/buffile.c
@@ -31,16 +31,31 @@
  * BufFile also supports temporary files that exceed the OS file size limit
  * (by opening multiple fd.c temporary files).  This is an essential feature
  * for sorts and hashjoins on large amounts of data.
- *-------------------------------------------------------------------------
+ *
+ * BufFile supports temporary files that can be made read-only and shared with
+ * other backends, as infrastructure for parallel execution.  Such files need
+ * to be created as a member of a BufFileSet that all participants are
+ * attached to.  The BufFileSet mechanism provides a namespace so that files
+ * can be discovered by name, and a shared ownership semantics so that shared
+ * files survive until the last user detaches.
+ *
+ * -------------------------------------------------------------------------
  */
 
 #include "postgres.h"
 
+#include "catalog/catalog.h"
+#include "catalog/pg_tablespace.h"
+#include "commands/tablespace.h"
 #include "executor/instrument.h"
+#include "lib/arrayutils.h"
+#include "miscadmin.h"
 #include "pgstat.h"
 #include "storage/fd.h"
 #include "storage/buffile.h"
 #include "storage/buf_internals.h"
+#include "storage/spin.h"
+#include "utils/builtins.h"
 #include "utils/resowner.h"
 
 /*
@@ -51,6 +66,29 @@
 #define MAX_PHYSICAL_FILESIZE	0x40000000
 #define BUFFILE_SEG_SIZE		(MAX_PHYSICAL_FILESIZE / BLCKSZ)
 
+/*
+ * A private type used internally by buffile.c.
+ */
+typedef struct BufFileSetDesc
+{
+	pid_t	creator_pid;		/* PID of the creating process */
+	int		set_number;			/* per-PID identifier for a set of files */
+} BufFileSetDesc;
+
+/*
+ * A set of BufFiles that can be shared with other backends.
+ */
+struct BufFileSet
+{
+	slock_t	mutex;				/* mutex protecting the reference count */
+	int		refcnt;				/* number of attached backends */
+	BufFileSetDesc descriptor;	/* descriptor identifying directory */
+	int		nstripes;			/* number of stripes */
+
+	/* an array of tablespaces indexed by stripe number */
+	Oid		stripe_tablespaces[FLEXIBLE_ARRAY_MEMBER];
+};
+
 /*
  * This data structure represents a buffered file that consists of one or
  * more physical files (each accessed through a virtual file descriptor
@@ -70,6 +108,16 @@ struct BufFile
 
 	bool		isInterXact;	/* keep open over transactions? */
 	bool		dirty;			/* does buffer need to be written? */
+	bool		readOnly;		/* has the file been set to read only? */
+
+	/*
+	 * Shared BufFiles use deterministic paths for their segment files.  We
+	 * need to record enough information to construct path names for new
+	 * segments when extending the file.
+	 */
+	BufFileSetDesc set_desc;	/* info used to construct directory name */
+	Oid			tablespace;		/* tablespace for this shared BufFile */
+	char		name[MAXPGPATH];		/* name of this BufFile within set */
 
 	/*
 	 * resowner is the ResourceOwner to use for underlying temp files.  (We
@@ -94,6 +142,10 @@ static void extendBufFile(BufFile *file);
 static void BufFileLoadBuffer(BufFile *file);
 static void BufFileDumpBuffer(BufFile *file);
 static int	BufFileFlush(BufFile *file);
+static File MakeSharedSegment(Oid tablespace,
+							  const BufFileSetDesc *desc,
+							  const char *name,
+							  int segment_number);
 
 
 /*
@@ -112,12 +164,19 @@ makeBufFile(File firstfile)
 	file->offsets[0] = 0L;
 	file->isInterXact = false;
 	file->dirty = false;
+	file->readOnly = false;
 	file->resowner = CurrentResourceOwner;
 	file->curFile = 0;
 	file->curOffset = 0L;
 	file->pos = 0;
 	file->nbytes = 0;
 
+	/* Clear the members used only for shared BufFiles. */
+	file->set_desc.creator_pid = InvalidPid;
+	file->set_desc.set_number = 0;
+	file->tablespace = InvalidOid;
+	file->name[0] = '\0';
+
 	return file;
 }
 
@@ -134,7 +193,12 @@ extendBufFile(BufFile *file)
 	oldowner = CurrentResourceOwner;
 	CurrentResourceOwner = file->resowner;
 
-	pfile = OpenTemporaryFile(file->isInterXact);
+	if (file->set_desc.creator_pid == InvalidPid)
+		pfile = OpenTemporaryFile(file->isInterXact);
+	else
+		pfile = MakeSharedSegment(file->tablespace, &file->set_desc,
+								  file->name, file->numFiles);
+
 	Assert(pfile >= 0);
 
 	CurrentResourceOwner = oldowner;
@@ -190,6 +254,414 @@ BufFileCreate(File file)
 }
 #endif
 
+/*
+ * Build the path of the temp directory in a given tablespace.
+ */
+static void
+MakeTempDirectoryPath(char *path, Oid tablespace)
+{
+	if (tablespace == InvalidOid ||
+		tablespace == DEFAULTTABLESPACE_OID ||
+		tablespace == GLOBALTABLESPACE_OID)
+		snprintf(path, MAXPGPATH, "base/%s", PG_TEMP_FILES_DIR);
+	else
+	{
+		snprintf(path, MAXPGPATH, "pg_tblspc/%u/%s/%s",
+				 tablespace, TABLESPACE_VERSION_DIRECTORY,
+				 PG_TEMP_FILES_DIR);
+	}
+}
+
+/*
+ * Build the path for the directory holding the files backing a BufFileSet in
+ * a given tablespace.
+ */
+static void
+MakeBufFileSetPath(char *path, Oid tablespace, const BufFileSetDesc *desc)
+{
+	char tempdirpath[MAXPGPATH];
+
+	MakeTempDirectoryPath(tempdirpath, tablespace);
+	snprintf(path, MAXPGPATH, "%s/%s%d.%d" PG_TEMP_FILE_SET_DIR_SUFFIX,
+			 tempdirpath, PG_TEMP_FILE_PREFIX,
+			 desc->creator_pid, desc->set_number);
+}
+
+/*
+ * Build the path of a single segment file for a shared BufFile.
+ */
+static void
+MakeSharedSegmentPath(char *path,
+					  Oid tablespace,
+					  const BufFileSetDesc *desc,
+					  const char *name,
+					  int segment_number)
+{
+	char setdirpath[MAXPGPATH];
+
+	MakeBufFileSetPath(setdirpath, tablespace, desc);
+	snprintf(path, MAXPGPATH, "%s/" PG_TEMP_FILE_PREFIX ".%s.%d", setdirpath, name, segment_number);
+}
+
+/*
+ * Create a new segment file backing a shared BufFile.
+ */
+static File
+MakeSharedSegment(Oid tablespace,
+				  const BufFileSetDesc *desc,
+				  const char *name,
+				  int segment_number)
+{
+	File		file;
+	char		segmentpath[MAXPGPATH];
+
+	MakeSharedSegmentPath(segmentpath, tablespace, desc, name, segment_number);
+	file = PathNameCreateTemporaryFile(segmentpath);
+	if (file <= 0)
+		elog(ERROR, "could not create temporary file \"%s\": %m",
+			 segmentpath);
+
+	return file;
+}
+
+/*
+ * Callback function that will be invoked when this backend detaches from a
+ * DSM segment holding a BufFileSet that it has creaed or attached to.  If we
+ * are the last to detach, then try to remove the directory holding this set,
+ * and everything in it.  We can't raise an error, because this runs in error
+ * cleanup paths.
+ */
+static void
+BufFileSetOnDetach(dsm_segment *segment, Datum datum)
+{
+	bool unlink_all = false;
+	BufFileSet *set = (BufFileSet *) DatumGetPointer(datum);
+
+	SpinLockAcquire(&set->mutex);
+	Assert(set->refcnt > 0);
+	if (--set->refcnt == 0)
+		unlink_all = true;
+	SpinLockRelease(&set->mutex);
+
+	/*
+	 * If we are the last to detach, we delete the directory in all referenced
+	 * tablespaces.  Note that we are still actually attached for the rest of
+	 * this function so we can safely access its data.
+	 */
+	if (unlink_all)
+	{
+		size_t num_oids;
+		Oid *oids;
+		char dirpath[MAXPGPATH];
+		int i;
+
+		/* Find the set of unique tablespace OIDs. */
+		oids = (Oid *) palloc(sizeof(Oid) * set->nstripes);
+		for (i = 0; i < set->nstripes; ++i)
+			oids[i] = set->stripe_tablespaces[i];
+		qsort(oids, set->nstripes, sizeof(Oid), oid_cmp);
+		num_oids = qunique(oids, set->nstripes, sizeof(Oid), oid_cmp);
+
+		/*
+		 * Delete the directory we created in each tablespace.  Can't fail,
+		 * but can generate LOG message on IO error.
+		 */
+		for (i = 0; i < num_oids; ++i)
+		{
+			MakeBufFileSetPath(dirpath, oids[i], &set->descriptor);
+			PathNameDeleteTemporaryDirRecursively(dirpath);
+		}
+
+		pfree(oids);
+	}
+}
+
+/*
+ * Determine the size that a BufFileSet with the given number of stripes would
+ * occupy.
+ */
+size_t
+BufFileSetEstimate(int stripes)
+{
+	return offsetof(BufFileSet, stripe_tablespaces) + sizeof(Oid) * stripes;
+}
+
+/*
+ * Create a set of named BufFiles that can be opened for read-only access by
+ * other backends, in the shared memory pointed to by 'set'.  The provided
+ * memory area must have enough space for the number of bytes estimated by
+ * BufFileSetEstimate(stripes).  Other backends must attach to it before
+ * accessing it.  Associate this set of BufFiles with 'seg'.  The set of files
+ * will be deleted when no backends are attached.
+ *
+ * Files will be physically striped over the tablespace configured in
+ * temp_tablespaces.  It is up to client code to determine how files should be
+ * mapped to stripes, but the ParallelWorkerNumber of the backend writing a
+ * file is one straightforward way.  While creating the set, 'stripes' should
+ * be set to the number of stripes that will be used to created and open
+ * files.  If it is set to 3, then BufFileSetCreateTemporaryFile can be called
+ * with values 0, 1 or 2.  Stripe numbers will be mapped to the configured
+ * tablespaces in a round-robin fashion.  'stripes' must be at least 1.
+ *
+ * Under the covers the set is one or more directories which will eventually
+ * be deleted when there are no backends attached.
+ */
+void
+BufFileSetCreate(BufFileSet *set, dsm_segment *seg, int stripes)
+{
+	size_t num_oids;
+	Oid *oids;
+	int i;
+	static int counter = 0;
+
+	if (stripes < 1)
+		elog(ERROR, "cannot create BufFileSet with fewer than 1 stripe");
+
+	SpinLockInit(&set->mutex);
+	set->refcnt = 1;
+	set->descriptor.creator_pid = MyProcPid;
+	set->nstripes = stripes;
+
+	/*
+	 * The name of the directory will consist of the creator's PID and a
+	 * counter that starts at startup for each process.  That's not ideal,
+	 * because a BufFileSet could in theory live longer than the creator, and
+	 * the operating system will eventually create a process with the same PID
+	 * again.  We may want to use a shmem counter instead in future.
+	 */
+	++counter;
+	if (counter == INT_MAX)
+		counter = 1;
+	set->descriptor.set_number = counter;
+
+	/* Map each stripe to a tablespace round-robin. */
+	PrepareTempTablespaces();
+	oids = (Oid *) palloc(sizeof(Oid) * stripes);
+	for (i = 0; i < stripes; ++i)
+		oids[i] = set->stripe_tablespaces[i] = GetNextTempTableSpace();
+
+	/* Find the set of unique tablespace OIDs. */
+	qsort(oids, stripes, sizeof(Oid), oid_cmp);
+	num_oids = qunique(oids, stripes, sizeof(Oid), oid_cmp);
+
+	/* Create the set's directory in every tablespace OID. */
+	for (i = 0; i < num_oids; ++i)
+	{
+		char tempdirpath[MAXPGPATH];
+		char setdirpath[MAXPGPATH];
+
+		/*
+		 * We need to compute both the top-level temporary directory's path,
+		 * and the directory for our set within it.  That's because fd.c needs
+		 * to be able to create the former on demand if creating the latter
+		 * fails.
+		 */
+		MakeTempDirectoryPath(tempdirpath, oids[i]);
+		MakeBufFileSetPath(setdirpath, oids[i], &set->descriptor);
+		PathNameCreateTemporaryDir(tempdirpath, setdirpath);
+	}
+
+	pfree(oids);
+
+	/* Register our cleanup callback. */
+	on_dsm_detach(seg, BufFileSetOnDetach, PointerGetDatum(set));
+}
+
+/*
+ * Attach to a set of named BufFiles that was created with BufFileSetCreate.
+ */
+void
+BufFileSetAttach(BufFileSet *set, dsm_segment *seg)
+{
+	bool success;
+
+	SpinLockAcquire(&set->mutex);
+	if (set->refcnt == 0)
+		success = false;
+	else
+	{
+		++set->refcnt;
+		success = true;
+	}
+	SpinLockRelease(&set->mutex);
+
+	if (!success)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("could not attach to a BufFileSet that is alreayd destroyed")));
+
+	/* Register our cleanup callback. */
+	on_dsm_detach(seg, BufFileSetOnDetach, PointerGetDatum(set));
+}
+
+/*
+ * Create a BufFile that can be discovered and opened read-only by other
+ * backends that are also attached to 'set' using the same 'name' and 'stripe'
+ * values.  Even though both 'name' and 'stripe' must be provided and the same
+ * pair must be used to open the file from another backend, 'name' alone must
+ * be unique within the set.  The caller must have either created or attached
+ * to 'set'.
+ */
+BufFile *
+BufFileCreateShared(const BufFileSet *set, const char *name, int stripe)
+{
+	BufFile    *file;
+
+	if (stripe < 0 || stripe >= set->nstripes)
+		elog(ERROR, "stripe number out of range");
+
+	file = (BufFile *) palloc(sizeof(BufFile));
+	file->numFiles = 1;
+	file->files = (File *) palloc(sizeof(File));
+	file->files[0] = MakeSharedSegment(set->stripe_tablespaces[stripe],
+									   &set->descriptor, name, 0);
+	file->offsets = (off_t *) palloc(sizeof(off_t));
+	file->offsets[0] = 0L;
+	file->isInterXact = false;
+	file->dirty = false;
+	file->resowner = CurrentResourceOwner;
+	file->curFile = 0;
+	file->curOffset = 0L;
+	file->pos = 0;
+	file->nbytes = 0;
+	file->readOnly = false;
+	file->set_desc = set->descriptor;
+	file->tablespace = set->stripe_tablespaces[stripe];
+	strcpy(file->name, name);
+
+	return file;
+}
+
+/*
+ * Open a file that was previously created in another backend with
+ * BufFileCreateShared in the same BufFileSet, using the same 'name' and
+ * 'stripe' values.  The backend that created the file must have called
+ * BufFileClose() or BufFileExport() to make sure that it is ready to be
+ * opened by other backends and render it read-only.
+ *
+ * The caller must have either created or attached to 'set'.
+ */
+BufFile *
+BufFileOpenShared(const BufFileSet *set, const char *name, int stripe)
+{
+	BufFile    *file = (BufFile *) palloc(sizeof(BufFile));
+	char		path[MAXPGPATH];
+	Size		capacity = 16;
+	File	   *files = palloc(sizeof(File) * capacity);
+	int			nfiles = 0;
+
+	if (stripe < 0 || stripe >= set->nstripes)
+		elog(ERROR, "stripe number out of range");
+
+	file = (BufFile *) palloc(sizeof(BufFile));
+	files = palloc(sizeof(File) * capacity);
+
+	/*
+	 * We don't know how many segments there are, so we'll probe the
+	 * filesystem to find out.
+	 */
+	for (;;)
+	{
+		/* See if we need to expand our file segment array. */
+		if (nfiles + 1 > capacity)
+		{
+			capacity *= 2;
+			files = repalloc(files, sizeof(File) * capacity);
+		}
+		/* Try to load a segment. */
+		MakeSharedSegmentPath(path, set->stripe_tablespaces[stripe],
+							  &set->descriptor, name, nfiles);
+		files[nfiles] = PathNameOpenTemporaryFile(path);
+		if (files[nfiles] <= 0)
+			break;
+		++nfiles;
+	}
+
+	/*
+	 * If we didn't find any files at all, then no BufFile exists with this
+	 * tag.
+	 */
+	if (nfiles == 0)
+		return NULL;
+
+	file->numFiles = nfiles;
+	file->files = files;
+	file->offsets = (off_t *) palloc0(sizeof(off_t) * nfiles);
+	file->isInterXact = false;
+	file->dirty = false;
+	file->resowner = CurrentResourceOwner; /* Unused, can't extend */
+	file->curFile = 0;
+	file->curOffset = 0L;
+	file->pos = 0;
+	file->nbytes = 0;
+	file->readOnly = true; /* Can't write to files opened this way */
+
+	/* The following values aren't ever used but are set for consistency. */
+	file->set_desc = set->descriptor;
+	file->tablespace = set->stripe_tablespaces[stripe];
+	strcpy(file->name, name);
+
+	return file;
+}
+
+/*
+ * Delete a BufFile that was created by BufFileCreateShared using the given
+ * 'name' and 'stripe' values.
+ *
+ * It is not necessary to delete files explicitly with this function.  It is
+ * provided only as a way to delete files proactively, rather than waiting for
+ * the BufFileSet to be cleaned up.
+ *
+ * Only one backend should attempt to delete a given (name, stripe), and
+ * should know that it exists and has been exported with BufFileExportShared.
+ *
+ * The caller must have either created or attached to 'set'.
+ */
+void
+BufFileDeleteShared(const BufFileSet *set, const char *name, int stripe)
+{
+	char		path[MAXPGPATH];
+	int			segment = 0;
+	bool		found = false;
+
+	if (stripe < 0 || stripe >= set->nstripes)
+		elog(ERROR, "stripe number out of range");
+
+	/*
+	 * We don't know how many segments the file has.  We'll keep deleting
+	 * until we run out.  If we don't manage to find even an initial segment,
+	 * raise an error.
+	 */
+	for (;;)
+	{
+		MakeSharedSegmentPath(path, set->stripe_tablespaces[stripe],
+							  &set->descriptor, name, segment);
+		if (!PathNameDeleteTemporaryFile(path, true))
+			break;
+		found = true;
+		++segment;
+	}
+
+	if (!found)
+		elog(ERROR, "could not delete unknown shared BufFile \"%s\"", name);
+}
+
+/*
+ * BufFileExportShared --- flush and make read-only, in preparation for sharing.
+ */
+void
+BufFileExportShared(BufFile *file)
+{
+	/* Must be a file belonging to a BufFileSet. */
+	Assert(file->set_desc.creator_pid != InvalidPid);
+
+	/* It's probably a bug if someone calls this twice. */
+	Assert(!file->readOnly);
+
+	BufFileFlush(file);
+	file->readOnly = true;
+}
+
 /*
  * Close a BufFile
  *
@@ -409,6 +881,8 @@ BufFileWrite(BufFile *file, void *ptr, size_t size)
 	size_t		nwritten = 0;
 	size_t		nthistime;
 
+	Assert(!file->readOnly);
+
 	while (size > 0)
 	{
 		if (file->pos >= BLCKSZ)
diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c
index b0c174284b4..54a9061a6fb 100644
--- a/src/backend/storage/file/fd.c
+++ b/src/backend/storage/file/fd.c
@@ -39,6 +39,17 @@
  * for a long time, like relation files. It is the caller's responsibility
  * to close them, there is no automatic mechanism in fd.c for that.
  *
+ * PathNameCreateTemporaryFile, PathNameOpenTemporaryFile and
+ * PathNameDeleteTemporaryFile are used for temporary files that may be shared
+ * between backends.  A File created or opened with these functions is not
+ * automatically deleted when the file is closed, but it is automatically
+ * closed and end of transaction and counts agains the temporary file limit of
+ * the backend that created it.  Any File created this way must be explicitly
+ * deleted with PathNameDeleteTemporaryFile.  Automatic file deletion is not
+ * provided because this interface is designed for use by buffile.c and
+ * indirectly by sharedbuffile.c to implement temporary files with shared
+ * ownership and cleanup.
+ *
  * AllocateFile, AllocateDir, OpenPipeStream and OpenTransientFile are
  * wrappers around fopen(3), opendir(3), popen(3) and open(2), respectively.
  * They behave like the corresponding native functions, except that the handle
@@ -175,8 +186,9 @@ int			max_safe_fds = 32;	/* default if not changed */
 #define FilePosIsUnknown(pos) ((pos) < 0)
 
 /* these are the assigned bits in fdstate below: */
-#define FD_TEMPORARY		(1 << 0)	/* T = delete when closed */
-#define FD_XACT_TEMPORARY	(1 << 1)	/* T = delete at eoXact */
+#define FD_DELETE_AT_CLOSE	(1 << 0)	/* T = delete when closed */
+#define FD_CLOSE_AT_EOXACT	(1 << 1)	/* T = close at eoXact */
+#define FD_TEMP_FILE_LIMIT	(1 << 2)	/* T = respect temp_file_limit */
 
 typedef struct vfd
 {
@@ -326,6 +338,7 @@ static void walkdir(const char *path,
 static void pre_sync_fname(const char *fname, bool isdir, int elevel);
 #endif
 static void datadir_fsync_fname(const char *fname, bool isdir, int elevel);
+static void unlink_if_exists_fname(const char *fname, bool isdir, int elevel);
 
 static int	fsync_fname_ext(const char *fname, bool isdir, bool ignore_perm, int elevel);
 static int	fsync_parent_path(const char *fname, int elevel);
@@ -1294,6 +1307,23 @@ FileAccess(File file)
 	return 0;
 }
 
+/*
+ * Called whenever a temporary file is deleted to report its size.
+ */
+static void
+ReportTemporaryFileUsage(const char *path, off_t size)
+{
+	pgstat_report_tempfile(size);
+
+	if (log_temp_files >= 0)
+	{
+		if ((size / 1024) >= log_temp_files)
+			ereport(LOG,
+					(errmsg("temporary file: path \"%s\", size %lu",
+							path, (unsigned long) size)));
+	}
+}
+
 /*
  *	Called when we get a shared invalidation message on some relation.
  */
@@ -1378,6 +1408,106 @@ PathNameOpenFilePerm(const char *fileName, int fileFlags, mode_t fileMode)
 	return file;
 }
 
+/*
+ * Delete a file that was created with PathNameCreateTemporaryFile.
+ * Return true if the file existed, false if didn't.
+ */
+bool
+PathNameDeleteTemporaryFile(const char *fileName, bool error_on_failure)
+{
+	struct stat filestats;
+	int			stat_errno;
+
+	/* Get the final size for pgstat reporting. */
+	if (stat(fileName, &filestats) != 0)
+		stat_errno = errno;
+	else
+		stat_errno = 0;
+
+	/*
+	 * Unlike FileClose's automatic file deletion code, we tolerate
+	 * non-existence to support BufFileDeleteShared which doesn't know how
+	 * many segments it has to delete until it runs out.
+	 */
+	if (stat_errno == ENOENT)
+		return false;
+
+	if (unlink(fileName) < 0)
+	{
+		if (errno != ENOENT)
+			elog(error_on_failure ? ERROR : LOG,
+				 "cannot unlink temporary file \"%s\": %m", fileName);
+		return false;
+	}
+
+	if (stat_errno == 0)
+		ReportTemporaryFileUsage(fileName, filestats.st_size);
+	else
+	{
+		errno = stat_errno;
+		elog(LOG, "could not stat file \"%s\": %m", fileName);
+	}
+
+	return true;
+}
+
+/*
+ * Create directory 'directory'.  If necessary, create 'basedir', which must
+ * be the directory above it.  This is designed for creating the top-level
+ * temporary directory on demand before creating a directory underneath it.
+ * If necessary, delete a pre-existing directory of the same name, on the
+ * theory that crash-restarts can leave junk behind that we'll eventually
+ * crash into (see RemovePgTempFiles).
+ */
+void
+PathNameCreateTemporaryDir(const char *basedir, const char *directory)
+{
+	if (mkdir(directory, S_IRWXU) < 0)
+	{
+		/*
+		 * Failed.  Try to create basedir first in case it's missing.
+		 * Tolerate ENOENT to close a race against another process following
+		 * the same algorithm.
+		 */
+		if (mkdir(basedir, S_IRWXU) < 0 && errno != ENOENT)
+			elog(ERROR, "cannot create temporary directory \"%s\": %m",
+				 basedir);
+
+		/* Try again. */
+		if (mkdir(directory, S_IRWXU) < 0)
+		{
+			elog(LOG, "cannot create temporary subdirectory \"%s\": %m (will try to unlink and then try again)",
+				 directory);
+			/*
+			 * Failed again.  Perhaps there is a directory in the way that is
+			 * left-over from a crash-restart?  Try to delete it.
+			 */
+			PathNameDeleteTemporaryDirRecursively(directory);
+
+			/* Now try one more time. */
+			if (mkdir(directory, S_IRWXU) < 0)
+				elog(ERROR, "cannot create temporary subdirectory \"%s\": %m",
+					 directory);
+		}
+	}
+}
+
+/*
+ * Delete a directory and everything in it.
+ */
+void
+PathNameDeleteTemporaryDirRecursively(const char *dirname)
+{
+	/*
+	 * Currently, walkdir doesn't offer a way for our passed in function to
+	 * maintain state.  Perhaps it should, so that we could tell the caller
+	 * whether this operation succeeded or failed.  Since this operation is
+	 * used in a cleanup path, we wouldn't actually behave differently: we'll
+	 * just log failures.
+	 */
+	walkdir(dirname, unlink_if_exists_fname, false, LOG);
+}
+
 /*
  * Open a temporary file that will disappear when we close it.
  *
@@ -1426,12 +1556,12 @@ OpenTemporaryFile(bool interXact)
 											 true);
 
 	/* Mark it for deletion at close */
-	VfdCache[file].fdstate |= FD_TEMPORARY;
+	VfdCache[file].fdstate |= FD_DELETE_AT_CLOSE;
 
 	/* Register it with the current resource owner */
 	if (!interXact)
 	{
-		VfdCache[file].fdstate |= FD_XACT_TEMPORARY;
+		VfdCache[file].fdstate |= FD_CLOSE_AT_EOXACT;
 
 		ResourceOwnerEnlargeFiles(CurrentResourceOwner);
 		ResourceOwnerRememberFile(CurrentResourceOwner, file);
@@ -1509,6 +1639,81 @@ OpenTemporaryFileInTablespace(Oid tblspcOid, bool rejectError)
 	return file;
 }
 
+/*
+ * Create a new file.  The directory containing it must already exist.  Files
+ * created this way are subject to temp_file_limit and are automatically
+ * closed at end of transaction, but are not automatically deleted on close
+ * because they are intended to be shared between cooperating backends.
+ */
+File
+PathNameCreateTemporaryFile(const char *tempfilepath)
+{
+	File file;
+
+	/*
+	 * Open the file.  Note: we don't use O_EXCL, in case there is an orphaned
+	 * temp file that can be reused.
+	 */
+	file = PathNameOpenFile(tempfilepath,
+							O_RDWR | O_CREAT | O_TRUNC | PG_BINARY);
+	if (file <= 0)
+		elog(ERROR, "could not create temporary file \"%s\": %m",
+			 tempfilepath);
+
+	/* Mark it for temp_file_limit accounting. */
+	VfdCache[file].fdstate |= FD_TEMP_FILE_LIMIT;
+
+	/*
+	 * We don't set FD_DELETE_AT_CLOSE for files opened this way, but we
+	 * still want to make sure they get closed at end of xact.
+	 */
+	ResourceOwnerEnlargeFiles(CurrentResourceOwner);
+	ResourceOwnerRememberFile(CurrentResourceOwner, file);
+	VfdCache[file].resowner = CurrentResourceOwner;
+
+	/* Backup mechanism for closing at end of xact. */
+	VfdCache[file].fdstate |= FD_CLOSE_AT_EOXACT;
+	have_xact_temporary_files = true;
+
+	return file;
+}
+
+/*
+ * Open a file that was created with PathNameCreateTemporaryFile in another
+ * backend.  Files opened this way don't count agains the temp_file_limit of
+ * the caller, are read-only and are automatically closed at the end of the
+ * transaction but are not deleted on close.
+ */
+File
+PathNameOpenTemporaryFile(const char *tempfilepath)
+{
+	File file;
+
+	/* We open the file read-only. */
+	file = PathNameOpenFile(tempfilepath, O_RDONLY | PG_BINARY);
+
+	/* If no such file, then we don't raise an error. */
+	if (file <= 0 && errno != ENOENT)
+		elog(ERROR, "could not open temporary file \"%s\": %m", tempfilepath);
+
+	if (file > 0)
+	{
+		/*
+		 * We don't set FD_DELETE_AT_CLOSE for files opened this way, but we
+		 * still want to make sure they get closed at end of xact.
+		 */
+		ResourceOwnerEnlargeFiles(CurrentResourceOwner);
+		ResourceOwnerRememberFile(CurrentResourceOwner, file);
+		VfdCache[file].resowner = CurrentResourceOwner;
+
+		/* Backup mechanism for closing at end of xact. */
+		VfdCache[file].fdstate |= FD_CLOSE_AT_EOXACT;
+		have_xact_temporary_files = true;
+	}
+
+	return file;
+}
+
 /*
  * close a file when done with it
  */
@@ -1537,10 +1742,17 @@ FileClose(File file)
 		Delete(file);
 	}
 
+	if (vfdP->fdstate & FD_TEMP_FILE_LIMIT)
+	{
+		/* Subtract its size from current usage (do first in case of error) */
+		temporary_files_size -= vfdP->fileSize;
+		vfdP->fileSize = 0;
+	}
+
 	/*
 	 * Delete the file if it was temporary, and make a log entry if wanted
 	 */
-	if (vfdP->fdstate & FD_TEMPORARY)
+	if (vfdP->fdstate & FD_DELETE_AT_CLOSE)
 	{
 		struct stat filestats;
 		int			stat_errno;
@@ -1552,11 +1764,8 @@ FileClose(File file)
 		 * is arranged to ensure that the worst-case consequence is failing to
 		 * emit log message(s), not failing to attempt the unlink.
 		 */
-		vfdP->fdstate &= ~FD_TEMPORARY;
+		vfdP->fdstate &= ~FD_DELETE_AT_CLOSE;
 
-		/* Subtract its size from current usage (do first in case of error) */
-		temporary_files_size -= vfdP->fileSize;
-		vfdP->fileSize = 0;
 
 		/* first try the stat() */
 		if (stat(vfdP->fileName, &filestats))
@@ -1570,18 +1779,7 @@ FileClose(File file)
 
 		/* and last report the stat results */
 		if (stat_errno == 0)
-		{
-			pgstat_report_tempfile(filestats.st_size);
-
-			if (log_temp_files >= 0)
-			{
-				if ((filestats.st_size / 1024) >= log_temp_files)
-					ereport(LOG,
-							(errmsg("temporary file: path \"%s\", size %lu",
-									vfdP->fileName,
-									(unsigned long) filestats.st_size)));
-			}
-		}
+			ReportTemporaryFileUsage(vfdP->fileName, filestats.st_size);
 		else
 		{
 			errno = stat_errno;
@@ -1755,7 +1953,7 @@ FileWrite(File file, char *buffer, int amount, uint32 wait_event_info)
 	 * message if we do that.  All current callers would just throw error
 	 * immediately anyway, so this is safe at present.
 	 */
-	if (temp_file_limit >= 0 && (vfdP->fdstate & FD_TEMPORARY))
+	if (temp_file_limit >= 0 && (vfdP->fdstate & FD_TEMP_FILE_LIMIT))
 	{
 		off_t		newPos;
 
@@ -1808,7 +2006,7 @@ retry:
 		 * get here in that state if we're not enforcing temporary_files_size,
 		 * so we don't care.
 		 */
-		if (vfdP->fdstate & FD_TEMPORARY)
+		if (vfdP->fdstate & FD_TEMP_FILE_LIMIT)
 		{
 			off_t		newPos = vfdP->seekPos;
 
@@ -1979,7 +2177,7 @@ FileTruncate(File file, off_t offset, uint32 wait_event_info)
 	if (returnCode == 0 && VfdCache[file].fileSize > offset)
 	{
 		/* adjust our state for truncation of a temp file */
-		Assert(VfdCache[file].fdstate & FD_TEMPORARY);
+		Assert(VfdCache[file].fdstate & FD_TEMP_FILE_LIMIT);
 		temporary_files_size -= VfdCache[file].fileSize - offset;
 		VfdCache[file].fileSize = offset;
 	}
@@ -2690,7 +2888,8 @@ CleanupTempFiles(bool isProcExit)
 		{
 			unsigned short fdstate = VfdCache[i].fdstate;
 
-			if ((fdstate & FD_TEMPORARY) && VfdCache[i].fileName != NULL)
+			if (((fdstate & FD_DELETE_AT_CLOSE) || (fdstate & FD_CLOSE_AT_EOXACT)) &&
+				VfdCache[i].fileName != NULL)
 			{
 				/*
 				 * If we're in the process of exiting a backend process, close
@@ -2701,7 +2900,7 @@ CleanupTempFiles(bool isProcExit)
 				 */
 				if (isProcExit)
 					FileClose(i);
-				else if (fdstate & FD_XACT_TEMPORARY)
+				else if (fdstate & FD_CLOSE_AT_EOXACT)
 				{
 					elog(WARNING,
 						 "temporary file %s not closed at end-of-transaction",
@@ -2810,7 +3009,20 @@ RemovePgTempFilesInDir(const char *tmpdirname)
 		if (strncmp(temp_de->d_name,
 					PG_TEMP_FILE_PREFIX,
 					strlen(PG_TEMP_FILE_PREFIX)) == 0)
-			unlink(rm_path);	/* note we ignore any error */
+		{
+			size_t name_len = strlen(temp_de->d_name);
+			size_t suffix_len = strlen(PG_TEMP_FILE_SET_DIR_SUFFIX);
+
+			if (name_len > suffix_len &&
+				strcmp(temp_de->d_name + name_len - suffix_len,
+					   PG_TEMP_FILE_SET_DIR_SUFFIX) == 0)
+			{
+				RemovePgTempFilesInDir(rm_path);
+				rmdir(rm_path);		/* note we ignore any error */
+			}
+			else
+				unlink(rm_path);	/* note we ignore any error */
+		}
 		else
 			elog(LOG,
 				 "unexpected file found in temporary-files directory: \"%s\"",
@@ -3146,6 +3358,21 @@ datadir_fsync_fname(const char *fname, bool isdir, int elevel)
 	fsync_fname_ext(fname, isdir, true, elevel);
 }
 
+static void
+unlink_if_exists_fname(const char *fname, bool isdir, int elevel)
+{
+	if (isdir)
+	{
+		if (rmdir(fname) != 0 && errno != ENOENT)
+			elog(elevel, "could not rmdir directory \"%s\": %m", fname);
+	}
+	else
+	{
+		/* Use PathNameDeleteTemporaryFile to report filesize */
+		PathNameDeleteTemporaryFile(fname, false);
+	}
+}
+
 /*
  * fsync_fname_ext -- Try to fsync a file or directory
  *
diff --git a/src/backend/utils/resowner/resowner.c b/src/backend/utils/resowner/resowner.c
index bd19fad77e9..db1d688bc97 100644
--- a/src/backend/utils/resowner/resowner.c
+++ b/src/backend/utils/resowner/resowner.c
@@ -542,16 +542,6 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
 				PrintRelCacheLeakWarning(res);
 			RelationClose(res);
 		}
-
-		/* Ditto for dynamic shared memory segments */
-		while (ResourceArrayGetAny(&(owner->dsmarr), &foundres))
-		{
-			dsm_segment *res = (dsm_segment *) DatumGetPointer(foundres);
-
-			if (isCommit)
-				PrintDSMLeakWarning(res);
-			dsm_detach(res);
-		}
 	}
 	else if (phase == RESOURCE_RELEASE_LOCKS)
 	{
@@ -668,6 +658,16 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
 				PrintFileLeakWarning(res);
 			FileClose(res);
 		}
+
+		/* Ditto for dynamic shared memory segments */
+		while (ResourceArrayGetAny(&(owner->dsmarr), &foundres))
+		{
+			dsm_segment *res = (dsm_segment *) DatumGetPointer(foundres);
+
+			if (isCommit)
+				PrintDSMLeakWarning(res);
+			dsm_detach(res);
+		}
 	}
 
 	/* Let add-on modules get a chance too */
diff --git a/src/include/lib/arrayutils.h b/src/include/lib/arrayutils.h
new file mode 100644
index 00000000000..4a39ba74865
--- /dev/null
+++ b/src/include/lib/arrayutils.h
@@ -0,0 +1,68 @@
+/*-------------------------------------------------------------------------
+ *
+ * arrayutils.h
+ *		inline C-array utility functions
+ *
+ * Portions Copyright (c) 2017, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *		src/include/lib/arrayutils.h
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef ARRAYUTILS_H
+#define ARRAYUTILS_H
+
+/*
+ * Remove duplicates from a pre-sorted array, according to a user-supplied
+ * comparator.  Usually the array should have been sorted with qsort using the
+ * same arguments.  Return the new size.
+ *
+ * The name reflects the fact that the arguments are compatible with the
+ * standard qsort function.
+ */
+static inline size_t
+qunique(void *array, size_t elements, size_t width,
+		int (*compare)(const void *, const void *))
+{
+	int i, j;
+	char *bytes = array;
+
+	if (elements <= 1)
+		return elements;
+
+	for (i = 1, j = 0; i < elements; ++i)
+	{
+		if (compare(bytes + i * width, bytes + j * width) != 0)
+			memcpy(bytes + ++j * width, bytes + i * width, width);
+	}
+
+	return j + 1;
+}
+
+/*
+ * Like qunique, but takes a comparator with an extra user data argument which
+ * is passed through, for compatibility with qsort_arg.
+ */
+static inline size_t
+qunique_arg(void *array, size_t elements, size_t width,
+			int (*compare)(const void *, const void *, void *),
+			void *arg)
+{
+	int i, j;
+	char *bytes = array;
+
+	if (elements <= 1)
+		return elements;
+
+	for (i = 1, j = 0; i < elements; ++i)
+	{
+		if (compare(bytes + i * width, bytes + j * width, arg) != 0)
+			memcpy(bytes + ++j * width, bytes + i * width, width);
+	}
+
+	return j + 1;
+}
+
+#endif
diff --git a/src/include/storage/buffile.h b/src/include/storage/buffile.h
index fafcb3f0898..d6d69cd0d0c 100644
--- a/src/include/storage/buffile.h
+++ b/src/include/storage/buffile.h
@@ -26,9 +26,12 @@
 #ifndef BUFFILE_H
 #define BUFFILE_H
 
-/* BufFile is an opaque type whose details are not known outside buffile.c. */
+#include "storage/dsm.h"
+
+/* Opaque types whose details are not known outside buffile.c. */
 
 typedef struct BufFile BufFile;
+typedef struct BufFileSet BufFileSet;
 
 /*
  * prototypes for functions in buffile.c
@@ -42,4 +45,12 @@ extern int	BufFileSeek(BufFile *file, int fileno, off_t offset, int whence);
 extern void BufFileTell(BufFile *file, int *fileno, off_t *offset);
 extern int	BufFileSeekBlock(BufFile *file, long blknum);
 
+extern size_t BufFileSetEstimate(int stripes);
+extern void BufFileSetCreate(BufFileSet *set, dsm_segment *seg, int stripes);
+extern void BufFileSetAttach(BufFileSet *set, dsm_segment *seg);
+extern BufFile *BufFileCreateShared(const BufFileSet *set, const char *name, int stripe);
+extern void BufFileExportShared(BufFile *file);
+extern BufFile *BufFileOpenShared(const BufFileSet *set, const char *name, int stripe);
+extern void BufFileDeleteShared(const BufFileSet *set, const char *name, int stripe);
+
 #endif							/* BUFFILE_H */
diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h
index 6ea26e63b84..26d65e97e35 100644
--- a/src/include/storage/fd.h
+++ b/src/include/storage/fd.h
@@ -79,6 +79,13 @@ extern int	FileGetRawDesc(File file);
 extern int	FileGetRawFlags(File file);
 extern mode_t FileGetRawMode(File file);
 
+/* Operations used by buffile.c for named shared temporary files */
+extern void PathNameCreateTemporaryDir(const char *basedir, const char *directory);
+extern File PathNameCreateTemporaryFile(const char *filename);
+extern File PathNameOpenTemporaryFile(const char *filename);
+extern bool PathNameDeleteTemporaryFile(const char *filename, bool error_on_failure);
+extern void PathNameDeleteTemporaryDirRecursively(const char *filename);
+
 /* Operations that allow use of regular stdio --- USE WITH CAUTION */
 extern FILE *AllocateFile(const char *name, const char *mode);
 extern int	FreeFile(FILE *file);
@@ -124,8 +131,9 @@ extern int	durable_unlink(const char *fname, int loglevel);
 extern int	durable_link_or_rename(const char *oldfile, const char *newfile, int loglevel);
 extern void SyncDataDirectory(void);
 
-/* Filename components for OpenTemporaryFile */
+/* Filename components */
 #define PG_TEMP_FILES_DIR "pgsql_tmp"
 #define PG_TEMP_FILE_PREFIX "pgsql_tmp"
+#define PG_TEMP_FILE_SET_DIR_SUFFIX ".set"
 
 #endif							/* FD_H */
-- 
2.14.1

