On 2018-05-19 18:12:52 +1200, Thomas Munro wrote:
> On Sat, May 19, 2018 at 4:51 PM, Thomas Munro
> <thomas.mu...@enterprisedb.com> wrote:
> > Next, make check hangs in initdb on both of my pet OSes when md.c
> > raises an error (fseek fails) and we raise and error while raising and
> > error and deadlock against ourselves.  Backtrace here:
> > https://paste.debian.net/1025336/
> 
> Ah, I see now that something similar is happening on Linux too, so I
> guess you already knew this.

I didn't. I cleaned something up and only tested installcheck
after... Singleuser mode was broken.

Attached is a new version.

I've changed my previous attempt at using transient files to using File
type files, but unliked from the LRU so that they're kept open. Not sure
if that's perfect, but seems cleaner.

Greetings,

Andres Freund
>From 96435b05c9546b6da829043fb10b2a7309216bd2 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Mon, 21 May 2018 15:43:30 -0700
Subject: [PATCH v2 1/6] freespace: Don't constantly close files when reading
 buffer.

fsm_readbuf() used to always do an smgrexists() when reading a buffer
beyond the known file size. That currently implies closing the md.c
handle, loosing all the data cached therein.  Change this to only
check for file existance when not already known to be larger than 0
blocks.

Author: Andres Freund
Reviewed-By:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/backend/storage/freespace/freespace.c | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/src/backend/storage/freespace/freespace.c b/src/backend/storage/freespace/freespace.c
index 65c4e74999f..d7569cec5ed 100644
--- a/src/backend/storage/freespace/freespace.c
+++ b/src/backend/storage/freespace/freespace.c
@@ -556,7 +556,7 @@ fsm_readbuf(Relation rel, FSMAddress addr, bool extend)
 	 * not on extension.)
 	 */
 	if (rel->rd_smgr->smgr_fsm_nblocks == InvalidBlockNumber ||
-		blkno >= rel->rd_smgr->smgr_fsm_nblocks)
+		rel->rd_smgr->smgr_fsm_nblocks == 0)
 	{
 		if (smgrexists(rel->rd_smgr, FSM_FORKNUM))
 			rel->rd_smgr->smgr_fsm_nblocks = smgrnblocks(rel->rd_smgr,
@@ -564,6 +564,9 @@ fsm_readbuf(Relation rel, FSMAddress addr, bool extend)
 		else
 			rel->rd_smgr->smgr_fsm_nblocks = 0;
 	}
+	else if (blkno >= rel->rd_smgr->smgr_fsm_nblocks)
+		rel->rd_smgr->smgr_fsm_nblocks = smgrnblocks(rel->rd_smgr,
+													 FSM_FORKNUM);
 
 	/* Handle requests beyond EOF */
 	if (blkno >= rel->rd_smgr->smgr_fsm_nblocks)
-- 
2.17.0.rc1.dirty

>From aa533828e6164731006dab92665fa92b7b058d6f Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Mon, 21 May 2018 15:43:30 -0700
Subject: [PATCH v2 2/6] Add functions to send/receive data & FD over a unix
 domain socket.

This'll be used by a followup patch changing how the fsync request
queue works, to make it safe on linux.

TODO: This probably should live elsewhere.

Author: Andres Freund
Reviewed-By:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/backend/storage/file/fd.c | 102 ++++++++++++++++++++++++++++++++++
 src/include/storage/fd.h      |   4 ++
 2 files changed, 106 insertions(+)

diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c
index 441f18dcf56..65e46483a44 100644
--- a/src/backend/storage/file/fd.c
+++ b/src/backend/storage/file/fd.c
@@ -3572,3 +3572,105 @@ MakePGDirectory(const char *directoryName)
 {
 	return mkdir(directoryName, pg_dir_create_mode);
 }
+
+/*
+ * Send data over a unix domain socket, optionally (when fd != -1) including a
+ * file descriptor.
+ */
+ssize_t
+pg_uds_send_with_fd(int sock, void *buf, ssize_t buflen, int fd)
+{
+	ssize_t     size;
+	struct msghdr   msg = {0};
+	struct iovec    iov;
+	/* cmsg header, union for correct alignment */
+	union
+	{
+		struct cmsghdr  cmsghdr;
+		char        control[CMSG_SPACE(sizeof (int))];
+	} cmsgu;
+	struct cmsghdr  *cmsg;
+
+	iov.iov_base = buf;
+	iov.iov_len = buflen;
+
+	msg.msg_name = NULL;
+	msg.msg_namelen = 0;
+	msg.msg_iov = &iov;
+	msg.msg_iovlen = 1;
+
+	if (fd >= 0)
+	{
+		msg.msg_control = cmsgu.control;
+		msg.msg_controllen = sizeof(cmsgu.control);
+
+		cmsg = CMSG_FIRSTHDR(&msg);
+		cmsg->cmsg_len = CMSG_LEN(sizeof (int));
+		cmsg->cmsg_level = SOL_SOCKET;
+		cmsg->cmsg_type = SCM_RIGHTS;
+
+		*((int *) CMSG_DATA(cmsg)) = fd;
+	}
+
+	size = sendmsg(sock, &msg, 0);
+
+	/* errors are returned directly */
+	return size;
+}
+
+/*
+ * Receive data from a unix domain socket. If a file is sent over the socket,
+ * store it in *fd.
+ */
+ssize_t
+pg_uds_recv_with_fd(int sock, void *buf, ssize_t bufsize, int *fd)
+{
+	ssize_t     size;
+	struct msghdr   msg;
+	struct iovec    iov;
+	/* cmsg header, union for correct alignment */
+	union
+	{
+		struct cmsghdr  cmsghdr;
+		char        control[CMSG_SPACE(sizeof (int))];
+	} cmsgu;
+	struct cmsghdr  *cmsg;
+
+	Assert(fd != NULL);
+
+	iov.iov_base = buf;
+	iov.iov_len = bufsize;
+
+	msg.msg_name = NULL;
+	msg.msg_namelen = 0;
+	msg.msg_iov = &iov;
+	msg.msg_iovlen = 1;
+	msg.msg_control = cmsgu.control;
+	msg.msg_controllen = sizeof(cmsgu.control);
+
+	size = recvmsg (sock, &msg, 0);
+
+	if (size < 0)
+	{
+		*fd = -1;
+		return size;
+	}
+
+	cmsg = CMSG_FIRSTHDR(&msg);
+	if (cmsg && cmsg->cmsg_len == CMSG_LEN(sizeof(int)))
+	{
+		if (cmsg->cmsg_level != SOL_SOCKET)
+			elog(FATAL, "unexpected cmsg_level");
+
+		if (cmsg->cmsg_type != SCM_RIGHTS)
+			elog(FATAL, "unexpected cmsg_type");
+
+		*fd = *((int *) CMSG_DATA(cmsg));
+
+		/* FIXME: check / handle additional cmsg structures */
+	}
+	else
+		*fd = -1;
+
+	return size;
+}
diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h
index 8e7c9728f4b..5e016d69a5a 100644
--- a/src/include/storage/fd.h
+++ b/src/include/storage/fd.h
@@ -143,4 +143,8 @@ extern void SyncDataDirectory(void);
 #define PG_TEMP_FILES_DIR "pgsql_tmp"
 #define PG_TEMP_FILE_PREFIX "pgsql_tmp"
 
+/* XXX; This should probably go elsewhere */
+ssize_t pg_uds_send_with_fd(int sock, void *buf, ssize_t buflen, int fd);
+ssize_t pg_uds_recv_with_fd(int sock, void *buf, ssize_t bufsize, int *fd);
+
 #endif							/* FD_H */
-- 
2.17.0.rc1.dirty

>From 87a6fa9b8d478504b0a4323a3e104b353a239f1a Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Mon, 21 May 2018 15:43:30 -0700
Subject: [PATCH v2 3/6] Make FileGetRawDesc() ensure there's an associated
 kernel FD.

Author: Andres Freund
Reviewed-By:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/backend/storage/file/fd.c | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c
index 65e46483a44..8ae13a51ec1 100644
--- a/src/backend/storage/file/fd.c
+++ b/src/backend/storage/file/fd.c
@@ -2232,6 +2232,10 @@ int
 FileGetRawDesc(File file)
 {
 	Assert(FileIsValid(file));
+
+	if (FileAccess(file))
+		return -1;
+
 	return VfdCache[file].fd;
 }
 
-- 
2.17.0.rc1.dirty

>From 4cdfdef2ff441f5db01dc989a90aecce4dc6b272 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Mon, 21 May 2018 15:43:30 -0700
Subject: [PATCH v2 4/6] WIP: Add FileOpenForFd().

Author: Andres Freund
Reviewed-By:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/backend/storage/file/fd.c | 68 +++++++++++++++++++++++++++++++----
 src/include/storage/fd.h      |  2 ++
 2 files changed, 64 insertions(+), 6 deletions(-)

diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c
index 8ae13a51ec1..50a1cb930f6 100644
--- a/src/backend/storage/file/fd.c
+++ b/src/backend/storage/file/fd.c
@@ -180,6 +180,7 @@ int			max_safe_fds = 32;	/* default if not changed */
 #define FD_DELETE_AT_CLOSE	(1 << 0)	/* T = delete when closed */
 #define FD_CLOSE_AT_EOXACT	(1 << 1)	/* T = close at eoXact */
 #define FD_TEMP_FILE_LIMIT	(1 << 2)	/* T = respect temp_file_limit */
+#define FD_NOT_IN_LRU		(1 << 3)	/* T = not in LRU */
 
 typedef struct vfd
 {
@@ -304,7 +305,6 @@ static void LruDelete(File file);
 static void Insert(File file);
 static int	LruInsert(File file);
 static bool ReleaseLruFile(void);
-static void ReleaseLruFiles(void);
 static File AllocateVfd(void);
 static void FreeVfd(File file);
 
@@ -1176,7 +1176,7 @@ ReleaseLruFile(void)
  * Release kernel FDs as needed to get under the max_safe_fds limit.
  * After calling this, it's OK to try to open another file.
  */
-static void
+void
 ReleaseLruFiles(void)
 {
 	while (nfile + numAllocatedDescs >= max_safe_fds)
@@ -1289,9 +1289,11 @@ FileAccess(File file)
 		 * We now know that the file is open and that it is not the last one
 		 * accessed, so we need to move it to the head of the Lru ring.
 		 */
-
-		Delete(file);
-		Insert(file);
+		if (!(VfdCache[file].fdstate & FD_NOT_IN_LRU))
+		{
+			Delete(file);
+			Insert(file);
+		}
 	}
 
 	return 0;
@@ -1414,6 +1416,56 @@ PathNameOpenFilePerm(const char *fileName, int fileFlags, mode_t fileMode)
 	return file;
 }
 
+/*
+ * Open a File for a pre-existing file descriptor.
+ *
+ * Note that these files will not be closed in an LRU basis, therefore the
+ * caller is responsible for limiting the number of open file descriptors.
+ *
+ * The passed in name is purely for informational purposes.
+ */
+File
+FileOpenForFd(int fd, const char *fileName)
+{
+	char	   *fnamecopy;
+	File		file;
+	Vfd		   *vfdP;
+
+	/*
+	 * We need a malloc'd copy of the file name; fail cleanly if no room.
+	 */
+	fnamecopy = strdup(fileName);
+	if (fnamecopy == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OUT_OF_MEMORY),
+				 errmsg("out of memory")));
+
+	file = AllocateVfd();
+	vfdP = &VfdCache[file];
+
+	/* Close excess kernel FDs. */
+	ReleaseLruFiles();
+
+	vfdP->fd = fd;
+	++nfile;
+
+	DO_DB(elog(LOG, "FileOpenForFd: success %d/%d (%s)",
+			   file, fd, fnamecopy));
+
+	/* NB: Explicitly not inserted into LRU! */
+
+	vfdP->fileName = fnamecopy;
+	/* Saved flags are adjusted to be OK for re-opening file */
+	vfdP->fileFlags = 0;
+	vfdP->fileMode = 0;
+	vfdP->seekPos = 0;
+	vfdP->fileSize = 0;
+	vfdP->fdstate = FD_NOT_IN_LRU;
+	vfdP->resowner = NULL;
+
+	return file;
+}
+
 /*
  * Create directory 'directory'.  If necessary, create 'basedir', which must
  * be the directory above it.  This is designed for creating the top-level
@@ -1760,7 +1812,11 @@ FileClose(File file)
 		vfdP->fd = VFD_CLOSED;
 
 		/* remove the file from the lru ring */
-		Delete(file);
+		if (!(vfdP->fdstate & FD_NOT_IN_LRU))
+		{
+			vfdP->fdstate &= ~FD_NOT_IN_LRU;
+			Delete(file);
+		}
 	}
 
 	if (vfdP->fdstate & FD_TEMP_FILE_LIMIT)
diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h
index 5e016d69a5a..e96e8b13982 100644
--- a/src/include/storage/fd.h
+++ b/src/include/storage/fd.h
@@ -65,6 +65,7 @@ extern int	max_safe_fds;
 /* Operations on virtual Files --- equivalent to Unix kernel file ops */
 extern File PathNameOpenFile(const char *fileName, int fileFlags);
 extern File PathNameOpenFilePerm(const char *fileName, int fileFlags, mode_t fileMode);
+extern File FileOpenForFd(int fd, const char *fileName);
 extern File OpenTemporaryFile(bool interXact);
 extern void FileClose(File file);
 extern int	FilePrefetch(File file, off_t offset, int amount, uint32 wait_event_info);
@@ -127,6 +128,7 @@ extern void AtEOSubXact_Files(bool isCommit, SubTransactionId mySubid,
 				  SubTransactionId parentSubid);
 extern void RemovePgTempFiles(void);
 extern bool looks_like_temp_rel_name(const char *name);
+extern void ReleaseLruFiles(void);
 
 extern int	pg_fsync(int fd);
 extern int	pg_fsync_no_writethrough(int fd);
-- 
2.17.0.rc1.dirty

>From bd7dcdafa752802566d0fef3ac9c126c41f276e4 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Mon, 21 May 2018 15:43:30 -0700
Subject: [PATCH v2 5/6] WIP: Optimize register_dirty_segment() to not
 repeatedly queue fsync requests.

Author: Andres Freund
Reviewed-By:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/backend/postmaster/checkpointer.c | 36 ++++++++++++-------
 src/backend/storage/smgr/md.c         | 50 +++++++++++++++++++--------
 src/include/postmaster/bgwriter.h     |  3 ++
 3 files changed, 63 insertions(+), 26 deletions(-)

diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c
index 0950ada6019..333eb91c9de 100644
--- a/src/backend/postmaster/checkpointer.c
+++ b/src/backend/postmaster/checkpointer.c
@@ -46,6 +46,7 @@
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "port/atomics.h"
 #include "postmaster/bgwriter.h"
 #include "replication/syncrep.h"
 #include "storage/bufmgr.h"
@@ -126,8 +127,9 @@ typedef struct
 
 	int			ckpt_flags;		/* checkpoint flags, as defined in xlog.h */
 
-	uint32		num_backend_writes; /* counts user backend buffer writes */
-	uint32		num_backend_fsync;	/* counts user backend fsync calls */
+	pg_atomic_uint32 num_backend_writes; /* counts user backend buffer writes */
+	pg_atomic_uint32 num_backend_fsync;	/* counts user backend fsync calls */
+	pg_atomic_uint32 ckpt_cycle; /* cycle */
 
 	int			num_requests;	/* current # of requests */
 	int			max_requests;	/* allocated array size */
@@ -943,6 +945,9 @@ CheckpointerShmemInit(void)
 		MemSet(CheckpointerShmem, 0, size);
 		SpinLockInit(&CheckpointerShmem->ckpt_lck);
 		CheckpointerShmem->max_requests = NBuffers;
+		pg_atomic_init_u32(&CheckpointerShmem->ckpt_cycle, 0);
+		pg_atomic_init_u32(&CheckpointerShmem->num_backend_writes, 0);
+		pg_atomic_init_u32(&CheckpointerShmem->num_backend_fsync, 0);
 	}
 }
 
@@ -1133,10 +1138,6 @@ ForwardFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno)
 
 	LWLockAcquire(CheckpointerCommLock, LW_EXCLUSIVE);
 
-	/* Count all backend writes regardless of if they fit in the queue */
-	if (!AmBackgroundWriterProcess())
-		CheckpointerShmem->num_backend_writes++;
-
 	/*
 	 * If the checkpointer isn't running or the request queue is full, the
 	 * backend will have to perform its own fsync request.  But before forcing
@@ -1151,7 +1152,7 @@ ForwardFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno)
 		 * fsync
 		 */
 		if (!AmBackgroundWriterProcess())
-			CheckpointerShmem->num_backend_fsync++;
+			pg_atomic_fetch_add_u32(&CheckpointerShmem->num_backend_fsync, 1);
 		LWLockRelease(CheckpointerCommLock);
 		return false;
 	}
@@ -1312,11 +1313,10 @@ AbsorbFsyncRequests(void)
 	LWLockAcquire(CheckpointerCommLock, LW_EXCLUSIVE);
 
 	/* Transfer stats counts into pending pgstats message */
-	BgWriterStats.m_buf_written_backend += CheckpointerShmem->num_backend_writes;
-	BgWriterStats.m_buf_fsync_backend += CheckpointerShmem->num_backend_fsync;
-
-	CheckpointerShmem->num_backend_writes = 0;
-	CheckpointerShmem->num_backend_fsync = 0;
+	BgWriterStats.m_buf_written_backend +=
+		pg_atomic_exchange_u32(&CheckpointerShmem->num_backend_writes, 0);
+	BgWriterStats.m_buf_fsync_backend +=
+		pg_atomic_exchange_u32(&CheckpointerShmem->num_backend_fsync, 0);
 
 	/*
 	 * We try to avoid holding the lock for a long time by copying the request
@@ -1390,3 +1390,15 @@ FirstCallSinceLastCheckpoint(void)
 
 	return FirstCall;
 }
+
+uint32
+GetCheckpointSyncCycle(void)
+{
+	return pg_atomic_read_u32(&CheckpointerShmem->ckpt_cycle);
+}
+
+uint32
+IncCheckpointSyncCycle(void)
+{
+	return pg_atomic_fetch_add_u32(&CheckpointerShmem->ckpt_cycle, 1);
+}
diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c
index 2ec103e6047..555774320b5 100644
--- a/src/backend/storage/smgr/md.c
+++ b/src/backend/storage/smgr/md.c
@@ -109,6 +109,7 @@ typedef struct _MdfdVec
 {
 	File		mdfd_vfd;		/* fd number in fd.c's pool */
 	BlockNumber mdfd_segno;		/* segment number, from 0 */
+	uint32		mdfd_dirtied_cycle;
 } MdfdVec;
 
 static MemoryContext MdCxt;		/* context for all MdfdVec objects */
@@ -133,12 +134,12 @@ static MemoryContext MdCxt;		/* context for all MdfdVec objects */
  * (Regular backends do not track pending operations locally, but forward
  * them to the checkpointer.)
  */
-typedef uint16 CycleCtr;		/* can be any convenient integer size */
+typedef uint32 CycleCtr;		/* can be any convenient integer size */
 
 typedef struct
 {
 	RelFileNode rnode;			/* hash table key (must be first!) */
-	CycleCtr	cycle_ctr;		/* mdsync_cycle_ctr of oldest request */
+	CycleCtr	cycle_ctr;		/* sync cycle of oldest request */
 	/* requests[f] has bit n set if we need to fsync segment n of fork f */
 	Bitmapset  *requests[MAX_FORKNUM + 1];
 	/* canceled[f] is true if we canceled fsyncs for fork "recently" */
@@ -155,7 +156,6 @@ static HTAB *pendingOpsTable = NULL;
 static List *pendingUnlinks = NIL;
 static MemoryContext pendingOpsCxt; /* context for the above  */
 
-static CycleCtr mdsync_cycle_ctr = 0;
 static CycleCtr mdckpt_cycle_ctr = 0;
 
 
@@ -333,6 +333,7 @@ mdcreate(SMgrRelation reln, ForkNumber forkNum, bool isRedo)
 	mdfd = &reln->md_seg_fds[forkNum][0];
 	mdfd->mdfd_vfd = fd;
 	mdfd->mdfd_segno = 0;
+	mdfd->mdfd_dirtied_cycle = GetCheckpointSyncCycle() - 1;
 }
 
 /*
@@ -614,6 +615,7 @@ mdopen(SMgrRelation reln, ForkNumber forknum, int behavior)
 	mdfd = &reln->md_seg_fds[forknum][0];
 	mdfd->mdfd_vfd = fd;
 	mdfd->mdfd_segno = 0;
+	mdfd->mdfd_dirtied_cycle = GetCheckpointSyncCycle() - 1;
 
 	Assert(_mdnblocks(reln, forknum, mdfd) <= ((BlockNumber) RELSEG_SIZE));
 
@@ -1089,9 +1091,9 @@ mdsync(void)
 	 * To avoid excess fsync'ing (in the worst case, maybe a never-terminating
 	 * checkpoint), we want to ignore fsync requests that are entered into the
 	 * hashtable after this point --- they should be processed next time,
-	 * instead.  We use mdsync_cycle_ctr to tell old entries apart from new
-	 * ones: new ones will have cycle_ctr equal to the incremented value of
-	 * mdsync_cycle_ctr.
+	 * instead.  We use GetCheckpointSyncCycle() to tell old entries apart
+	 * from new ones: new ones will have cycle_ctr equal to
+	 * IncCheckpointSyncCycle().
 	 *
 	 * In normal circumstances, all entries present in the table at this point
 	 * will have cycle_ctr exactly equal to the current (about to be old)
@@ -1115,16 +1117,16 @@ mdsync(void)
 		hash_seq_init(&hstat, pendingOpsTable);
 		while ((entry = (PendingOperationEntry *) hash_seq_search(&hstat)) != NULL)
 		{
-			entry->cycle_ctr = mdsync_cycle_ctr;
+			entry->cycle_ctr = GetCheckpointSyncCycle();
 		}
 	}
 
-	/* Advance counter so that new hashtable entries are distinguishable */
-	mdsync_cycle_ctr++;
-
 	/* Set flag to detect failure if we don't reach the end of the loop */
 	mdsync_in_progress = true;
 
+	/* Advance counter so that new hashtable entries are distinguishable */
+	IncCheckpointSyncCycle();
+
 	/* Now scan the hashtable for fsync requests to process */
 	absorb_counter = FSYNCS_PER_ABSORB;
 	hash_seq_init(&hstat, pendingOpsTable);
@@ -1137,11 +1139,11 @@ mdsync(void)
 		 * contain multiple fsync-request bits, but they are all new.  Note
 		 * "continue" bypasses the hash-remove call at the bottom of the loop.
 		 */
-		if (entry->cycle_ctr == mdsync_cycle_ctr)
+		if (entry->cycle_ctr == GetCheckpointSyncCycle())
 			continue;
 
 		/* Else assert we haven't missed it */
-		Assert((CycleCtr) (entry->cycle_ctr + 1) == mdsync_cycle_ctr);
+		Assert((CycleCtr) (entry->cycle_ctr + 1) == GetCheckpointSyncCycle());
 
 		/*
 		 * Scan over the forks and segments represented by the entry.
@@ -1308,7 +1310,7 @@ mdsync(void)
 				break;
 		}
 		if (forknum <= MAX_FORKNUM)
-			entry->cycle_ctr = mdsync_cycle_ctr;
+			entry->cycle_ctr = GetCheckpointSyncCycle();
 		else
 		{
 			/* Okay to remove it */
@@ -1427,18 +1429,37 @@ mdpostckpt(void)
 static void
 register_dirty_segment(SMgrRelation reln, ForkNumber forknum, MdfdVec *seg)
 {
+	uint32 cycle;
+
 	/* Temp relations should never be fsync'd */
 	Assert(!SmgrIsTemp(reln));
 
+	pg_memory_barrier();
+	cycle = GetCheckpointSyncCycle();
+
+	/*
+	 * Don't repeatedly register the same segment as dirty.
+	 *
+	 * FIXME: This doesn't correctly deal with overflows yet! We could
+	 * e.g. emit an smgr invalidation every now and then, or use a 64bit
+	 * counter.  Or just error out if the cycle reaches UINT32_MAX.
+	 */
+	if (seg->mdfd_dirtied_cycle == cycle)
+		return;
+
 	if (pendingOpsTable)
 	{
 		/* push it into local pending-ops table */
 		RememberFsyncRequest(reln->smgr_rnode.node, forknum, seg->mdfd_segno);
+		seg->mdfd_dirtied_cycle = cycle;
 	}
 	else
 	{
 		if (ForwardFsyncRequest(reln->smgr_rnode.node, forknum, seg->mdfd_segno))
+		{
+			seg->mdfd_dirtied_cycle = cycle;
 			return;				/* passed it off successfully */
+		}
 
 		ereport(DEBUG1,
 				(errmsg("could not forward fsync request because request queue is full")));
@@ -1623,7 +1644,7 @@ RememberFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno)
 		/* if new entry, initialize it */
 		if (!found)
 		{
-			entry->cycle_ctr = mdsync_cycle_ctr;
+			entry->cycle_ctr = GetCheckpointSyncCycle();
 			MemSet(entry->requests, 0, sizeof(entry->requests));
 			MemSet(entry->canceled, 0, sizeof(entry->canceled));
 		}
@@ -1793,6 +1814,7 @@ _mdfd_openseg(SMgrRelation reln, ForkNumber forknum, BlockNumber segno,
 	v = &reln->md_seg_fds[forknum][segno];
 	v->mdfd_vfd = fd;
 	v->mdfd_segno = segno;
+	v->mdfd_dirtied_cycle = GetCheckpointSyncCycle() - 1;
 
 	Assert(_mdnblocks(reln, forknum, v) <= ((BlockNumber) RELSEG_SIZE));
 
diff --git a/src/include/postmaster/bgwriter.h b/src/include/postmaster/bgwriter.h
index 941c6aba7d1..87a5cfad415 100644
--- a/src/include/postmaster/bgwriter.h
+++ b/src/include/postmaster/bgwriter.h
@@ -38,6 +38,9 @@ extern void AbsorbFsyncRequests(void);
 extern Size CheckpointerShmemSize(void);
 extern void CheckpointerShmemInit(void);
 
+extern uint32 GetCheckpointSyncCycle(void);
+extern uint32 IncCheckpointSyncCycle(void);
+
 extern bool FirstCallSinceLastCheckpoint(void);
 
 #endif							/* _BGWRITER_H */
-- 
2.17.0.rc1.dirty

>From 306975ba3856c5be31eca5d8efe6d7fe9eee06c6 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Mon, 21 May 2018 15:43:30 -0700
Subject: [PATCH v2 6/6] Heavily-WIP: Send file descriptors to checkpointer for
 fsyncing.

This addresses the issue that, at least on linux, fsyncs only reliably
see errors that occurred after they've been opeend.

Author:
Reviewed-By:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/backend/access/transam/xlog.c     |   7 +-
 src/backend/postmaster/checkpointer.c | 354 +++++++----------
 src/backend/postmaster/postmaster.c   |  38 ++
 src/backend/storage/smgr/md.c         | 549 ++++++++++++++++----------
 src/include/postmaster/bgwriter.h     |   8 +-
 src/include/postmaster/postmaster.h   |   5 +
 src/include/storage/smgr.h            |   3 +-
 7 files changed, 543 insertions(+), 421 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index adbd6a21264..427774152eb 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -8634,8 +8634,10 @@ CreateCheckPoint(int flags)
 	 * Note: because it is possible for log_checkpoints to change while a
 	 * checkpoint proceeds, we always accumulate stats, even if
 	 * log_checkpoints is currently off.
+	 *
+	 * Note #2: this is reset at the end of the checkpoint, not here, because
+	 * we might have to fsync before getting here (see mdsync()).
 	 */
-	MemSet(&CheckpointStats, 0, sizeof(CheckpointStats));
 	CheckpointStats.ckpt_start_t = GetCurrentTimestamp();
 
 	/*
@@ -8999,6 +9001,9 @@ CreateCheckPoint(int flags)
 									 CheckpointStats.ckpt_segs_recycled);
 
 	LWLockRelease(CheckpointLock);
+
+	/* reset stats */
+	MemSet(&CheckpointStats, 0, sizeof(CheckpointStats));
 }
 
 /*
diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c
index 333eb91c9de..c2be529bca4 100644
--- a/src/backend/postmaster/checkpointer.c
+++ b/src/backend/postmaster/checkpointer.c
@@ -48,6 +48,7 @@
 #include "pgstat.h"
 #include "port/atomics.h"
 #include "postmaster/bgwriter.h"
+#include "postmaster/postmaster.h"
 #include "replication/syncrep.h"
 #include "storage/bufmgr.h"
 #include "storage/condition_variable.h"
@@ -102,19 +103,21 @@
  *
  * The requests array holds fsync requests sent by backends and not yet
  * absorbed by the checkpointer.
- *
- * Unlike the checkpoint fields, num_backend_writes, num_backend_fsync, and
- * the requests fields are protected by CheckpointerCommLock.
  *----------
  */
 typedef struct
 {
+	uint32		type;
 	RelFileNode rnode;
 	ForkNumber	forknum;
 	BlockNumber segno;			/* see md.c for special values */
+	bool		contains_fd;
 	/* might add a real request-type field later; not needed yet */
 } CheckpointerRequest;
 
+#define CKPT_REQUEST_RNODE			1
+#define CKPT_REQUEST_SYN			2
+
 typedef struct
 {
 	pid_t		checkpointer_pid;	/* PID (0 if not started) */
@@ -131,8 +134,6 @@ typedef struct
 	pg_atomic_uint32 num_backend_fsync;	/* counts user backend fsync calls */
 	pg_atomic_uint32 ckpt_cycle; /* cycle */
 
-	int			num_requests;	/* current # of requests */
-	int			max_requests;	/* allocated array size */
 	CheckpointerRequest requests[FLEXIBLE_ARRAY_MEMBER];
 } CheckpointerShmemStruct;
 
@@ -168,13 +169,17 @@ static double ckpt_cached_elapsed;
 static pg_time_t last_checkpoint_time;
 static pg_time_t last_xlog_switch_time;
 
+static BlockNumber next_syn_rqst;
+static BlockNumber received_syn_rqst;
+
 /* Prototypes for private functions */
 
 static void CheckArchiveTimeout(void);
 static bool IsCheckpointOnSchedule(double progress);
 static bool ImmediateCheckpointRequested(void);
-static bool CompactCheckpointerRequestQueue(void);
 static void UpdateSharedMemoryConfig(void);
+static void SendFsyncRequest(CheckpointerRequest *request, int fd);
+static bool AbsorbFsyncRequest(void);
 
 /* Signal handlers */
 
@@ -557,10 +562,11 @@ CheckpointerMain(void)
 			cur_timeout = Min(cur_timeout, XLogArchiveTimeout - elapsed_secs);
 		}
 
-		rc = WaitLatch(MyLatch,
-					   WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
-					   cur_timeout * 1000L /* convert to ms */ ,
-					   WAIT_EVENT_CHECKPOINTER_MAIN);
+		rc = WaitLatchOrSocket(MyLatch,
+							   WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH | WL_SOCKET_READABLE,
+							   fsync_fds[FSYNC_FD_PROCESS],
+							   cur_timeout * 1000L /* convert to ms */ ,
+							   WAIT_EVENT_CHECKPOINTER_MAIN);
 
 		/*
 		 * Emergency bailout if postmaster has died.  This is to avoid the
@@ -910,12 +916,7 @@ CheckpointerShmemSize(void)
 {
 	Size		size;
 
-	/*
-	 * Currently, the size of the requests[] array is arbitrarily set equal to
-	 * NBuffers.  This may prove too large or small ...
-	 */
 	size = offsetof(CheckpointerShmemStruct, requests);
-	size = add_size(size, mul_size(NBuffers, sizeof(CheckpointerRequest)));
 
 	return size;
 }
@@ -938,13 +939,10 @@ CheckpointerShmemInit(void)
 	if (!found)
 	{
 		/*
-		 * First time through, so initialize.  Note that we zero the whole
-		 * requests array; this is so that CompactCheckpointerRequestQueue can
-		 * assume that any pad bytes in the request structs are zeroes.
+		 * First time through, so initialize.
 		 */
 		MemSet(CheckpointerShmem, 0, size);
 		SpinLockInit(&CheckpointerShmem->ckpt_lck);
-		CheckpointerShmem->max_requests = NBuffers;
 		pg_atomic_init_u32(&CheckpointerShmem->ckpt_cycle, 0);
 		pg_atomic_init_u32(&CheckpointerShmem->num_backend_writes, 0);
 		pg_atomic_init_u32(&CheckpointerShmem->num_backend_fsync, 0);
@@ -1124,176 +1122,61 @@ RequestCheckpoint(int flags)
  * the queue is full and contains no duplicate entries.  In that case, we
  * let the backend know by returning false.
  */
-bool
-ForwardFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno)
+void
+ForwardFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno,
+					File file)
 {
-	CheckpointerRequest *request;
-	bool		too_full;
+	CheckpointerRequest request = {0};
 
 	if (!IsUnderPostmaster)
-		return false;			/* probably shouldn't even get here */
+		elog(ERROR, "ForwardFsyncRequest must not be called in single user mode");
 
 	if (AmCheckpointerProcess())
 		elog(ERROR, "ForwardFsyncRequest must not be called in checkpointer");
 
-	LWLockAcquire(CheckpointerCommLock, LW_EXCLUSIVE);
+	request.type = CKPT_REQUEST_RNODE;
+	request.rnode = rnode;
+	request.forknum = forknum;
+	request.segno = segno;
+	request.contains_fd = file != -1;
 
-	/*
-	 * If the checkpointer isn't running or the request queue is full, the
-	 * backend will have to perform its own fsync request.  But before forcing
-	 * that to happen, we can try to compact the request queue.
-	 */
-	if (CheckpointerShmem->checkpointer_pid == 0 ||
-		(CheckpointerShmem->num_requests >= CheckpointerShmem->max_requests &&
-		 !CompactCheckpointerRequestQueue()))
-	{
-		/*
-		 * Count the subset of writes where backends have to do their own
-		 * fsync
-		 */
-		if (!AmBackgroundWriterProcess())
-			pg_atomic_fetch_add_u32(&CheckpointerShmem->num_backend_fsync, 1);
-		LWLockRelease(CheckpointerCommLock);
-		return false;
-	}
-
-	/* OK, insert request */
-	request = &CheckpointerShmem->requests[CheckpointerShmem->num_requests++];
-	request->rnode = rnode;
-	request->forknum = forknum;
-	request->segno = segno;
-
-	/* If queue is more than half full, nudge the checkpointer to empty it */
-	too_full = (CheckpointerShmem->num_requests >=
-				CheckpointerShmem->max_requests / 2);
-
-	LWLockRelease(CheckpointerCommLock);
-
-	/* ... but not till after we release the lock */
-	if (too_full && ProcGlobal->checkpointerLatch)
-		SetLatch(ProcGlobal->checkpointerLatch);
-
-	return true;
-}
-
-/*
- * CompactCheckpointerRequestQueue
- *		Remove duplicates from the request queue to avoid backend fsyncs.
- *		Returns "true" if any entries were removed.
- *
- * Although a full fsync request queue is not common, it can lead to severe
- * performance problems when it does happen.  So far, this situation has
- * only been observed to occur when the system is under heavy write load,
- * and especially during the "sync" phase of a checkpoint.  Without this
- * logic, each backend begins doing an fsync for every block written, which
- * gets very expensive and can slow down the whole system.
- *
- * Trying to do this every time the queue is full could lose if there
- * aren't any removable entries.  But that should be vanishingly rare in
- * practice: there's one queue entry per shared buffer.
- */
-static bool
-CompactCheckpointerRequestQueue(void)
-{
-	struct CheckpointerSlotMapping
-	{
-		CheckpointerRequest request;
-		int			slot;
-	};
-
-	int			n,
-				preserve_count;
-	int			num_skipped = 0;
-	HASHCTL		ctl;
-	HTAB	   *htab;
-	bool	   *skip_slot;
-
-	/* must hold CheckpointerCommLock in exclusive mode */
-	Assert(LWLockHeldByMe(CheckpointerCommLock));
-
-	/* Initialize skip_slot array */
-	skip_slot = palloc0(sizeof(bool) * CheckpointerShmem->num_requests);
-
-	/* Initialize temporary hash table */
-	MemSet(&ctl, 0, sizeof(ctl));
-	ctl.keysize = sizeof(CheckpointerRequest);
-	ctl.entrysize = sizeof(struct CheckpointerSlotMapping);
-	ctl.hcxt = CurrentMemoryContext;
-
-	htab = hash_create("CompactCheckpointerRequestQueue",
-					   CheckpointerShmem->num_requests,
-					   &ctl,
-					   HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
-
-	/*
-	 * The basic idea here is that a request can be skipped if it's followed
-	 * by a later, identical request.  It might seem more sensible to work
-	 * backwards from the end of the queue and check whether a request is
-	 * *preceded* by an earlier, identical request, in the hopes of doing less
-	 * copying.  But that might change the semantics, if there's an
-	 * intervening FORGET_RELATION_FSYNC or FORGET_DATABASE_FSYNC request, so
-	 * we do it this way.  It would be possible to be even smarter if we made
-	 * the code below understand the specific semantics of such requests (it
-	 * could blow away preceding entries that would end up being canceled
-	 * anyhow), but it's not clear that the extra complexity would buy us
-	 * anything.
-	 */
-	for (n = 0; n < CheckpointerShmem->num_requests; n++)
-	{
-		CheckpointerRequest *request;
-		struct CheckpointerSlotMapping *slotmap;
-		bool		found;
-
-		/*
-		 * We use the request struct directly as a hashtable key.  This
-		 * assumes that any padding bytes in the structs are consistently the
-		 * same, which should be okay because we zeroed them in
-		 * CheckpointerShmemInit.  Note also that RelFileNode had better
-		 * contain no pad bytes.
-		 */
-		request = &CheckpointerShmem->requests[n];
-		slotmap = hash_search(htab, request, HASH_ENTER, &found);
-		if (found)
-		{
-			/* Duplicate, so mark the previous occurrence as skippable */
-			skip_slot[slotmap->slot] = true;
-			num_skipped++;
-		}
-		/* Remember slot containing latest occurrence of this request value */
-		slotmap->slot = n;
-	}
-
-	/* Done with the hash table. */
-	hash_destroy(htab);
-
-	/* If no duplicates, we're out of luck. */
-	if (!num_skipped)
-	{
-		pfree(skip_slot);
-		return false;
-	}
-
-	/* We found some duplicates; remove them. */
-	preserve_count = 0;
-	for (n = 0; n < CheckpointerShmem->num_requests; n++)
-	{
-		if (skip_slot[n])
-			continue;
-		CheckpointerShmem->requests[preserve_count++] = CheckpointerShmem->requests[n];
-	}
-	ereport(DEBUG1,
-			(errmsg("compacted fsync request queue from %d entries to %d entries",
-					CheckpointerShmem->num_requests, preserve_count)));
-	CheckpointerShmem->num_requests = preserve_count;
-
-	/* Cleanup. */
-	pfree(skip_slot);
-	return true;
+	SendFsyncRequest(&request, request.contains_fd ? FileGetRawDesc(file) : -1);
 }
 
 /*
  * AbsorbFsyncRequests
- *		Retrieve queued fsync requests and pass them to local smgr.
+ *		Retrieve queued fsync requests and pass them to local smgr. Stop when
+ *		resources would be exhausted by absorbing more.
+ *
+ * This is exported because we want to continue accepting requests during
+ * mdsync().
+ */
+void
+AbsorbFsyncRequests(void)
+{
+	if (!AmCheckpointerProcess())
+		return;
+
+	/* Transfer stats counts into pending pgstats message */
+	BgWriterStats.m_buf_written_backend +=
+		pg_atomic_exchange_u32(&CheckpointerShmem->num_backend_writes, 0);
+	BgWriterStats.m_buf_fsync_backend +=
+		pg_atomic_exchange_u32(&CheckpointerShmem->num_backend_fsync, 0);
+
+	while (true)
+	{
+		if (!FlushFsyncRequestQueueIfNecessary())
+			break;
+
+		if (!AbsorbFsyncRequest())
+			break;
+	}
+}
+
+/*
+ * AbsorbAllFsyncRequests
+ *		Retrieve all already pending fsync requests and pass them to local
+ *		smgr.
  *
  * This is exported because it must be called during CreateCheckPoint;
  * we have to be sure we have accepted all pending requests just before
@@ -1301,17 +1184,13 @@ CompactCheckpointerRequestQueue(void)
  * non-checkpointer processes, do nothing if not checkpointer.
  */
 void
-AbsorbFsyncRequests(void)
+AbsorbAllFsyncRequests(void)
 {
-	CheckpointerRequest *requests = NULL;
-	CheckpointerRequest *request;
-	int			n;
+	CheckpointerRequest request = {0};
 
 	if (!AmCheckpointerProcess())
 		return;
 
-	LWLockAcquire(CheckpointerCommLock, LW_EXCLUSIVE);
-
 	/* Transfer stats counts into pending pgstats message */
 	BgWriterStats.m_buf_written_backend +=
 		pg_atomic_exchange_u32(&CheckpointerShmem->num_backend_writes, 0);
@@ -1319,35 +1198,65 @@ AbsorbFsyncRequests(void)
 		pg_atomic_exchange_u32(&CheckpointerShmem->num_backend_fsync, 0);
 
 	/*
-	 * We try to avoid holding the lock for a long time by copying the request
-	 * array, and processing the requests after releasing the lock.
-	 *
-	 * Once we have cleared the requests from shared memory, we have to PANIC
-	 * if we then fail to absorb them (eg, because our hashtable runs out of
-	 * memory).  This is because the system cannot run safely if we are unable
-	 * to fsync what we have been told to fsync.  Fortunately, the hashtable
-	 * is so small that the problem is quite unlikely to arise in practice.
+	 * For mdsync()'s guarantees to work, all pending fsync requests need to
+	 * be executed. But we don't want to absorb requests till the queue is
+	 * empty, as that could take a long while.  So instead we enqueue
 	 */
-	n = CheckpointerShmem->num_requests;
-	if (n > 0)
+	request.type = CKPT_REQUEST_SYN;
+	request.segno = ++next_syn_rqst;
+	SendFsyncRequest(&request, -1);
+
+	received_syn_rqst = next_syn_rqst + 1;
+	while (received_syn_rqst != request.segno)
 	{
-		requests = (CheckpointerRequest *) palloc(n * sizeof(CheckpointerRequest));
-		memcpy(requests, CheckpointerShmem->requests, n * sizeof(CheckpointerRequest));
+		if (!FlushFsyncRequestQueueIfNecessary())
+			elog(FATAL, "may not happen");
+
+		if (!AbsorbFsyncRequest())
+			break;
 	}
+}
+
+/*
+ * AbsorbFsyncRequest
+ *		Retrieve one queued fsync request and pass them to local smgr.
+ */
+static bool
+AbsorbFsyncRequest(void)
+{
+	CheckpointerRequest req;
+	int fd;
+	int ret;
+
+	ReleaseLruFiles();
 
 	START_CRIT_SECTION();
+	ret = pg_uds_recv_with_fd(fsync_fds[FSYNC_FD_PROCESS], &req, sizeof(req), &fd);
+	if (ret < 0 && (errno == EWOULDBLOCK || errno == EAGAIN))
+	{
+		END_CRIT_SECTION();
+		return false;
+	}
+	else if (ret < 0)
+		elog(ERROR, "recvmsg failed: %m");
 
-	CheckpointerShmem->num_requests = 0;
-
-	LWLockRelease(CheckpointerCommLock);
-
-	for (request = requests; n > 0; request++, n--)
-		RememberFsyncRequest(request->rnode, request->forknum, request->segno);
+	if (req.contains_fd != (fd != -1))
+	{
+		elog(FATAL, "message should have fd associated, but doesn't");
+	}
 
+	if (req.type == CKPT_REQUEST_SYN)
+	{
+		received_syn_rqst = req.segno;
+		Assert(fd == -1);
+	}
+	else
+	{
+		RememberFsyncRequest(req.rnode, req.forknum, req.segno, fd);
+	}
 	END_CRIT_SECTION();
 
-	if (requests)
-		pfree(requests);
+	return true;
 }
 
 /*
@@ -1402,3 +1311,42 @@ IncCheckpointSyncCycle(void)
 {
 	return pg_atomic_fetch_add_u32(&CheckpointerShmem->ckpt_cycle, 1);
 }
+
+void
+CountBackendWrite(void)
+{
+	pg_atomic_fetch_add_u32(&CheckpointerShmem->num_backend_writes, 1);
+}
+
+static void
+SendFsyncRequest(CheckpointerRequest *request, int fd)
+{
+	ssize_t ret;
+
+	while (true)
+	{
+		ret = pg_uds_send_with_fd(fsync_fds[FSYNC_FD_SUBMIT], request, sizeof(*request),
+								  request->contains_fd ? fd : -1);
+
+		if (ret >= 0)
+		{
+			/*
+			 * Don't think short reads will ever happen in realistic
+			 * implementations, but better make sure that's true...
+			 */
+			if (ret != sizeof(*request))
+				elog(FATAL, "oops, gotta do better");
+			break;
+		}
+		else if (errno == EWOULDBLOCK || errno == EAGAIN)
+		{
+			/* blocked on write - wait for socket to become readable */
+			/* FIXME: postmaster death? Other interrupts? */
+			WaitLatchOrSocket(NULL, WL_SOCKET_WRITEABLE, fsync_fds[FSYNC_FD_SUBMIT], -1, 0);
+		}
+		else
+		{
+			ereport(FATAL, (errmsg("could not receive fsync request: %m")));
+		}
+	}
+}
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index a4b53b33cdd..135aa29bfeb 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -70,6 +70,7 @@
 #include <time.h>
 #include <sys/wait.h>
 #include <ctype.h>
+#include <sys/types.h>
 #include <sys/stat.h>
 #include <sys/socket.h>
 #include <fcntl.h>
@@ -434,6 +435,7 @@ static pid_t StartChildProcess(AuxProcType type);
 static void StartAutovacuumWorker(void);
 static void MaybeStartWalReceiver(void);
 static void InitPostmasterDeathWatchHandle(void);
+static void InitFsyncFdSocketPair(void);
 
 /*
  * Archiver is allowed to start up at the current postmaster state?
@@ -568,6 +570,8 @@ int			postmaster_alive_fds[2] = {-1, -1};
 HANDLE		PostmasterHandle;
 #endif
 
+int			fsync_fds[2] = {-1, -1};
+
 /*
  * Postmaster main entry point
  */
@@ -1195,6 +1199,11 @@ PostmasterMain(int argc, char *argv[])
 	 */
 	InitPostmasterDeathWatchHandle();
 
+	/*
+	 * Initialize socket pair used to transport file descriptors over.
+	 */
+	InitFsyncFdSocketPair();
+
 #ifdef WIN32
 
 	/*
@@ -6443,3 +6452,32 @@ InitPostmasterDeathWatchHandle(void)
 								 GetLastError())));
 #endif							/* WIN32 */
 }
+
+/* Create socket used for requesting fsyncs by checkpointer */
+static void
+InitFsyncFdSocketPair(void)
+{
+	Assert(MyProcPid == PostmasterPid);
+	if (socketpair(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0, fsync_fds) < 0)
+		ereport(FATAL,
+				(errcode_for_file_access(),
+				 errmsg_internal("could not create fsync sockets: %m")));
+
+	/*
+	 * Set O_NONBLOCK on both fds.
+	 */
+	if (fcntl(fsync_fds[FSYNC_FD_PROCESS], F_SETFL, O_NONBLOCK) == -1)
+		ereport(FATAL,
+				(errcode_for_socket_access(),
+				 errmsg_internal("could not set fsync process socket to nonblocking mode: %m")));
+
+	if (fcntl(fsync_fds[FSYNC_FD_SUBMIT], F_SETFL, O_NONBLOCK) == -1)
+		ereport(FATAL,
+				(errcode_for_socket_access(),
+				 errmsg_internal("could not set fsync submit socket to nonblocking mode: %m")));
+
+	/*
+	 * FIXME: do DuplicateHandle dance for windows - can that work
+	 * trivially?
+	 */
+}
diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c
index 555774320b5..ae3a5bf023f 100644
--- a/src/backend/storage/smgr/md.c
+++ b/src/backend/storage/smgr/md.c
@@ -142,8 +142,8 @@ typedef struct
 	CycleCtr	cycle_ctr;		/* sync cycle of oldest request */
 	/* requests[f] has bit n set if we need to fsync segment n of fork f */
 	Bitmapset  *requests[MAX_FORKNUM + 1];
-	/* canceled[f] is true if we canceled fsyncs for fork "recently" */
-	bool		canceled[MAX_FORKNUM + 1];
+	File	   *syncfds[MAX_FORKNUM + 1];
+	int			syncfd_len[MAX_FORKNUM + 1];
 } PendingOperationEntry;
 
 typedef struct
@@ -152,6 +152,8 @@ typedef struct
 	CycleCtr	cycle_ctr;		/* mdckpt_cycle_ctr when request was made */
 } PendingUnlinkEntry;
 
+static uint32 open_fsync_queue_files = 0;
+static bool mdsync_in_progress = false;
 static HTAB *pendingOpsTable = NULL;
 static List *pendingUnlinks = NIL;
 static MemoryContext pendingOpsCxt; /* context for the above  */
@@ -196,6 +198,8 @@ static MdfdVec *_mdfd_getseg(SMgrRelation reln, ForkNumber forkno,
 			 BlockNumber blkno, bool skipFsync, int behavior);
 static BlockNumber _mdnblocks(SMgrRelation reln, ForkNumber forknum,
 		   MdfdVec *seg);
+static char *mdpath(RelFileNode rnode, ForkNumber forknum, BlockNumber segno);
+static void mdsyncpass(bool include_current);
 
 
 /*
@@ -1049,43 +1053,28 @@ mdimmedsync(SMgrRelation reln, ForkNumber forknum)
 }
 
 /*
- *	mdsync() -- Sync previous writes to stable storage.
+ * Do one pass over the the fsync request hashtable and perform the necessary
+ * fsyncs. Increments the mdsync cycle counter.
+ *
+ * If include_current is true perform all fsyncs (this is done if too many
+ * files are open), otherwise only perform the fsyncs belonging to the cycle
+ * valid at call time.
  */
-void
-mdsync(void)
+static void
+mdsyncpass(bool include_current)
 {
-	static bool mdsync_in_progress = false;
-
 	HASH_SEQ_STATUS hstat;
 	PendingOperationEntry *entry;
 	int			absorb_counter;
 
 	/* Statistics on sync times */
-	int			processed = 0;
 	instr_time	sync_start,
 				sync_end,
 				sync_diff;
 	uint64		elapsed;
-	uint64		longest = 0;
-	uint64		total_elapsed = 0;
-
-	/*
-	 * This is only called during checkpoints, and checkpoints should only
-	 * occur in processes that have created a pendingOpsTable.
-	 */
-	if (!pendingOpsTable)
-		elog(ERROR, "cannot sync without a pendingOpsTable");
-
-	/*
-	 * If we are in the checkpointer, the sync had better include all fsync
-	 * requests that were queued by backends up to this point.  The tightest
-	 * race condition that could occur is that a buffer that must be written
-	 * and fsync'd for the checkpoint could have been dumped by a backend just
-	 * before it was visited by BufferSync().  We know the backend will have
-	 * queued an fsync request before clearing the buffer's dirtybit, so we
-	 * are safe as long as we do an Absorb after completing BufferSync().
-	 */
-	AbsorbFsyncRequests();
+	int			processed = CheckpointStats.ckpt_sync_rels;
+	uint64		longest = CheckpointStats.ckpt_longest_sync;
+	uint64		total_elapsed = CheckpointStats.ckpt_agg_sync_time;
 
 	/*
 	 * To avoid excess fsync'ing (in the worst case, maybe a never-terminating
@@ -1133,17 +1122,27 @@ mdsync(void)
 	while ((entry = (PendingOperationEntry *) hash_seq_search(&hstat)) != NULL)
 	{
 		ForkNumber	forknum;
+		bool has_remaining;
 
 		/*
-		 * If the entry is new then don't process it this time; it might
-		 * contain multiple fsync-request bits, but they are all new.  Note
-		 * "continue" bypasses the hash-remove call at the bottom of the loop.
+		 * If processing fsync requests because of too may file handles, close
+		 * regardless of cycle. Otherwise nothing to be closed might be found,
+		 * and we want to make room as quickly as possible so more requests
+		 * can be absorbed.
 		 */
-		if (entry->cycle_ctr == GetCheckpointSyncCycle())
-			continue;
+		if (!include_current)
+		{
+			/*
+			 * If the entry is new then don't process it this time; it might
+			 * contain multiple fsync-request bits, but they are all new.  Note
+			 * "continue" bypasses the hash-remove call at the bottom of the loop.
+			 */
+			if (entry->cycle_ctr == GetCheckpointSyncCycle())
+				continue;
 
-		/* Else assert we haven't missed it */
-		Assert((CycleCtr) (entry->cycle_ctr + 1) == GetCheckpointSyncCycle());
+			/* Else assert we haven't missed it */
+			Assert((CycleCtr) (entry->cycle_ctr + 1) == GetCheckpointSyncCycle());
+		}
 
 		/*
 		 * Scan over the forks and segments represented by the entry.
@@ -1158,158 +1157,144 @@ mdsync(void)
 		 */
 		for (forknum = 0; forknum <= MAX_FORKNUM; forknum++)
 		{
-			Bitmapset  *requests = entry->requests[forknum];
 			int			segno;
 
-			entry->requests[forknum] = NULL;
-			entry->canceled[forknum] = false;
-
-			while ((segno = bms_first_member(requests)) >= 0)
+			segno = -1;
+			while ((segno = bms_next_member(entry->requests[forknum], segno)) >= 0)
 			{
-				int			failures;
+				int			returnCode;
+
+				/*
+				 * Temporarily mark as processed. Have to do so before
+				 * absorbing further requests, otherwise we might delete a new
+				 * requests in a new cycle.
+				 */
+				bms_del_member(entry->requests[forknum], segno);
+
+				if (entry->syncfd_len[forknum] <= segno ||
+					entry->syncfds[forknum][segno] == -1)
+				{
+					/*
+					 * Optionally open file, if we want to support not
+					 * transporting fds as well.
+					 */
+					elog(FATAL, "file not opened");
+				}
 
 				/*
 				 * If fsync is off then we don't have to bother opening the
 				 * file at all.  (We delay checking until this point so that
 				 * changing fsync on the fly behaves sensibly.)
+				 *
+				 * XXX: Why is that an important goal? Doesn't give any
+				 * interesting guarantees afaict?
 				 */
-				if (!enableFsync)
-					continue;
-
-				/*
-				 * If in checkpointer, we want to absorb pending requests
-				 * every so often to prevent overflow of the fsync request
-				 * queue.  It is unspecified whether newly-added entries will
-				 * be visited by hash_seq_search, but we don't care since we
-				 * don't need to process them anyway.
-				 */
-				if (--absorb_counter <= 0)
+				if (enableFsync)
 				{
-					AbsorbFsyncRequests();
-					absorb_counter = FSYNCS_PER_ABSORB;
-				}
-
-				/*
-				 * The fsync table could contain requests to fsync segments
-				 * that have been deleted (unlinked) by the time we get to
-				 * them. Rather than just hoping an ENOENT (or EACCES on
-				 * Windows) error can be ignored, what we do on error is
-				 * absorb pending requests and then retry.  Since mdunlink()
-				 * queues a "cancel" message before actually unlinking, the
-				 * fsync request is guaranteed to be marked canceled after the
-				 * absorb if it really was this case. DROP DATABASE likewise
-				 * has to tell us to forget fsync requests before it starts
-				 * deletions.
-				 */
-				for (failures = 0;; failures++) /* loop exits at "break" */
-				{
-					SMgrRelation reln;
-					MdfdVec    *seg;
-					char	   *path;
-					int			save_errno;
-
 					/*
-					 * Find or create an smgr hash entry for this relation.
-					 * This may seem a bit unclean -- md calling smgr?	But
-					 * it's really the best solution.  It ensures that the
-					 * open file reference isn't permanently leaked if we get
-					 * an error here. (You may say "but an unreferenced
-					 * SMgrRelation is still a leak!" Not really, because the
-					 * only case in which a checkpoint is done by a process
-					 * that isn't about to shut down is in the checkpointer,
-					 * and it will periodically do smgrcloseall(). This fact
-					 * justifies our not closing the reln in the success path
-					 * either, which is a good thing since in non-checkpointer
-					 * cases we couldn't safely do that.)
+					 * The fsync table could contain requests to fsync
+					 * segments that have been deleted (unlinked) by the time
+					 * we get to them.  That used to be problematic, but now
+					 * we have a filehandle to the deleted file. That means we
+					 * might fsync an empty file superfluously, in a
+					 * relatively tight window, which is acceptable.
 					 */
-					reln = smgropen(entry->rnode, InvalidBackendId);
-
-					/* Attempt to open and fsync the target segment */
-					seg = _mdfd_getseg(reln, forknum,
-									   (BlockNumber) segno * (BlockNumber) RELSEG_SIZE,
-									   false,
-									   EXTENSION_RETURN_NULL
-									   | EXTENSION_DONT_CHECK_SIZE);
 
 					INSTR_TIME_SET_CURRENT(sync_start);
 
-					if (seg != NULL &&
-						FileSync(seg->mdfd_vfd, WAIT_EVENT_DATA_FILE_SYNC) >= 0)
+					returnCode = FileSync(entry->syncfds[forknum][segno], WAIT_EVENT_DATA_FILE_SYNC);
+
+					if (returnCode < 0)
 					{
-						/* Success; update statistics about sync timing */
-						INSTR_TIME_SET_CURRENT(sync_end);
-						sync_diff = sync_end;
-						INSTR_TIME_SUBTRACT(sync_diff, sync_start);
-						elapsed = INSTR_TIME_GET_MICROSEC(sync_diff);
-						if (elapsed > longest)
-							longest = elapsed;
-						total_elapsed += elapsed;
-						processed++;
-						if (log_checkpoints)
-							elog(DEBUG1, "checkpoint sync: number=%d file=%s time=%.3f msec",
-								 processed,
-								 FilePathName(seg->mdfd_vfd),
-								 (double) elapsed / 1000);
+						/* XXX: decide on policy */
+						bms_add_member(entry->requests[forknum], segno);
 
-						break;	/* out of retry loop */
-					}
-
-					/* Compute file name for use in message */
-					save_errno = errno;
-					path = _mdfd_segpath(reln, forknum, (BlockNumber) segno);
-					errno = save_errno;
-
-					/*
-					 * It is possible that the relation has been dropped or
-					 * truncated since the fsync request was entered.
-					 * Therefore, allow ENOENT, but only if we didn't fail
-					 * already on this file.  This applies both for
-					 * _mdfd_getseg() and for FileSync, since fd.c might have
-					 * closed the file behind our back.
-					 *
-					 * XXX is there any point in allowing more than one retry?
-					 * Don't see one at the moment, but easy to change the
-					 * test here if so.
-					 */
-					if (!FILE_POSSIBLY_DELETED(errno) ||
-						failures > 0)
 						ereport(ERROR,
 								(errcode_for_file_access(),
 								 errmsg("could not fsync file \"%s\": %m",
-										path)));
-					else
+										FilePathName(entry->syncfds[forknum][segno]))));
+					}
+
+					/* Success; update statistics about sync timing */
+					INSTR_TIME_SET_CURRENT(sync_end);
+					sync_diff = sync_end;
+					INSTR_TIME_SUBTRACT(sync_diff, sync_start);
+					elapsed = INSTR_TIME_GET_MICROSEC(sync_diff);
+					if (elapsed > longest)
+						longest = elapsed;
+					total_elapsed += elapsed;
+					processed++;
+					if (log_checkpoints)
 						ereport(DEBUG1,
-								(errcode_for_file_access(),
-								 errmsg("could not fsync file \"%s\" but retrying: %m",
-										path)));
-					pfree(path);
+								(errmsg("checkpoint sync: number=%d file=%s time=%.3f msec",
+										processed,
+										FilePathName(entry->syncfds[forknum][segno]),
+										(double) elapsed / 1000),
+								 errhidestmt(true),
+								 errhidecontext(true)));
+				}
 
+				/*
+				 * It shouldn't be possible for a new request to arrive during
+				 * the fsync (on error this will not be reached).
+				 */
+				Assert(!bms_is_member(segno, entry->requests[forknum]));
+
+				/*
+				 * Close file.  XXX: centralize code.
+				 */
+				{
+					open_fsync_queue_files--;
+					FileClose(entry->syncfds[forknum][segno]);
+					entry->syncfds[forknum][segno] = -1;
+				}
+
+				/*
+				 * If in checkpointer, we want to absorb pending requests every so
+				 * often to prevent overflow of the fsync request queue.  It is
+				 * unspecified whether newly-added entries will be visited by
+				 * hash_seq_search, but we don't care since we don't need to process
+				 * them anyway.
+				 */
+				if (absorb_counter-- <= 0)
+				{
 					/*
-					 * Absorb incoming requests and check to see if a cancel
-					 * arrived for this relation fork.
+					 * Don't absorb if too many files are open. This pass will
+					 * soon close some, so check again later.
 					 */
-					AbsorbFsyncRequests();
-					absorb_counter = FSYNCS_PER_ABSORB; /* might as well... */
-
-					if (entry->canceled[forknum])
-						break;
-				}				/* end retry loop */
+					if (open_fsync_queue_files < ((max_safe_fds * 7) / 10))
+						AbsorbFsyncRequests();
+					absorb_counter = FSYNCS_PER_ABSORB;
+				}
 			}
-			bms_free(requests);
 		}
 
 		/*
-		 * We've finished everything that was requested before we started to
-		 * scan the entry.  If no new requests have been inserted meanwhile,
-		 * remove the entry.  Otherwise, update its cycle counter, as all the
-		 * requests now in it must have arrived during this cycle.
+		 * We've finished everything for the file that was requested before we
+		 * started to scan the entry.  If no new requests have been inserted
+		 * meanwhile, remove the entry.  Otherwise, update its cycle counter,
+		 * as all the requests now in it must have arrived during this cycle.
+		 *
+		 * This needs to be checked separately from the above for-each-fork
+		 * loop, as new requests for this relation could have been absorbed.
 		 */
+		has_remaining = false;
 		for (forknum = 0; forknum <= MAX_FORKNUM; forknum++)
 		{
-			if (entry->requests[forknum] != NULL)
-				break;
+			if (bms_is_empty(entry->requests[forknum]))
+			{
+				if (entry->syncfds[forknum])
+				{
+					pfree(entry->syncfds[forknum]);
+					entry->syncfds[forknum] = NULL;
+				}
+				bms_free(entry->requests[forknum]);
+				entry->requests[forknum] = NULL;
+			}
+			else
+				has_remaining = true;
 		}
-		if (forknum <= MAX_FORKNUM)
+		if (has_remaining)
 			entry->cycle_ctr = GetCheckpointSyncCycle();
 		else
 		{
@@ -1320,13 +1305,69 @@ mdsync(void)
 		}
 	}							/* end loop over hashtable entries */
 
-	/* Return sync performance metrics for report at checkpoint end */
+	/* Flag successful completion of mdsync */
+	mdsync_in_progress = false;
+
+	/* Maintain sync performance metrics for report at checkpoint end */
 	CheckpointStats.ckpt_sync_rels = processed;
 	CheckpointStats.ckpt_longest_sync = longest;
 	CheckpointStats.ckpt_agg_sync_time = total_elapsed;
+}
 
-	/* Flag successful completion of mdsync */
-	mdsync_in_progress = false;
+/*
+ *	mdsync() -- Sync previous writes to stable storage.
+ */
+void
+mdsync(void)
+{
+	/*
+	 * This is only called during checkpoints, and checkpoints should only
+	 * occur in processes that have created a pendingOpsTable.
+	 */
+	if (!pendingOpsTable)
+		elog(ERROR, "cannot sync without a pendingOpsTable");
+
+	/*
+	 * If we are in the checkpointer, the sync had better include all fsync
+	 * requests that were queued by backends up to this point.  The tightest
+	 * race condition that could occur is that a buffer that must be written
+	 * and fsync'd for the checkpoint could have been dumped by a backend just
+	 * before it was visited by BufferSync().  We know the backend will have
+	 * queued an fsync request before clearing the buffer's dirtybit, so we
+	 * are safe as long as we do an Absorb after completing BufferSync().
+	 */
+	AbsorbAllFsyncRequests();
+
+	mdsyncpass(false);
+}
+
+/*
+ * Flush the fsync request queue enough to make sure there's room for at least
+ * one more entry.
+ */
+bool
+FlushFsyncRequestQueueIfNecessary(void)
+{
+	if (mdsync_in_progress)
+		return false;
+
+	while (true)
+	{
+		if (open_fsync_queue_files >= ((max_safe_fds * 7) / 10))
+		{
+			elog(DEBUG1,
+				 "flush fsync request queue due to %u open files",
+				 open_fsync_queue_files);
+			mdsyncpass(true);
+			elog(DEBUG1,
+				 "flushed fsync request, now at %u open files",
+				 open_fsync_queue_files);
+		}
+		else
+			break;
+	}
+
+	return true;
 }
 
 /*
@@ -1411,12 +1452,38 @@ mdpostckpt(void)
 		 */
 		if (--absorb_counter <= 0)
 		{
-			AbsorbFsyncRequests();
+			/* XXX: Centralize this condition */
+			if (open_fsync_queue_files < ((max_safe_fds * 7) / 10))
+				AbsorbFsyncRequests();
 			absorb_counter = UNLINKS_PER_ABSORB;
 		}
 	}
 }
 
+
+/*
+ * Return the filename for the specified segment of the relation. The
+ * returned string is palloc'd.
+ */
+static char *
+mdpath(RelFileNode rnode, ForkNumber forknum, BlockNumber segno)
+{
+	char	   *path,
+			   *fullpath;
+
+	path = relpathperm(rnode, forknum);
+
+	if (segno > 0)
+	{
+		fullpath = psprintf("%s.%u", path, segno);
+		pfree(path);
+	}
+	else
+		fullpath = path;
+
+	return fullpath;
+}
+
 /*
  * register_dirty_segment() -- Mark a relation segment as needing fsync
  *
@@ -1437,6 +1504,13 @@ register_dirty_segment(SMgrRelation reln, ForkNumber forknum, MdfdVec *seg)
 	pg_memory_barrier();
 	cycle = GetCheckpointSyncCycle();
 
+	/*
+	 * For historical reasons checkpointer keeps track of the number of time
+	 * backends perform writes themselves.
+	 */
+	if (!AmBackgroundWriterProcess())
+		CountBackendWrite();
+
 	/*
 	 * Don't repeatedly register the same segment as dirty.
 	 *
@@ -1449,27 +1523,23 @@ register_dirty_segment(SMgrRelation reln, ForkNumber forknum, MdfdVec *seg)
 
 	if (pendingOpsTable)
 	{
-		/* push it into local pending-ops table */
-		RememberFsyncRequest(reln->smgr_rnode.node, forknum, seg->mdfd_segno);
-		seg->mdfd_dirtied_cycle = cycle;
+		int fd;
+
+		/*
+		 * Push it into local pending-ops table.
+		 *
+		 * Gotta duplicate the fd - we can't have fd.c close it behind our
+		 * back, as that'd lead to loosing error reporting guarantees on
+		 * linux. RememberFsyncRequest() will manage the lifetime.
+		 */
+		ReleaseLruFiles();
+		fd = dup(FileGetRawDesc(seg->mdfd_vfd));
+		if (fd < 0)
+			elog(ERROR, "couldn't dup: %m");
+		RememberFsyncRequest(reln->smgr_rnode.node, forknum, seg->mdfd_segno, fd);
 	}
 	else
-	{
-		if (ForwardFsyncRequest(reln->smgr_rnode.node, forknum, seg->mdfd_segno))
-		{
-			seg->mdfd_dirtied_cycle = cycle;
-			return;				/* passed it off successfully */
-		}
-
-		ereport(DEBUG1,
-				(errmsg("could not forward fsync request because request queue is full")));
-
-		if (FileSync(seg->mdfd_vfd, WAIT_EVENT_DATA_FILE_SYNC) < 0)
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not fsync file \"%s\": %m",
-							FilePathName(seg->mdfd_vfd))));
-	}
+		ForwardFsyncRequest(reln->smgr_rnode.node, forknum, seg->mdfd_segno, seg->mdfd_vfd);
 }
 
 /*
@@ -1491,21 +1561,14 @@ register_unlink(RelFileNodeBackend rnode)
 	{
 		/* push it into local pending-ops table */
 		RememberFsyncRequest(rnode.node, MAIN_FORKNUM,
-							 UNLINK_RELATION_REQUEST);
+							 UNLINK_RELATION_REQUEST,
+							 -1);
 	}
 	else
 	{
-		/*
-		 * Notify the checkpointer about it.  If we fail to queue the request
-		 * message, we have to sleep and try again, because we can't simply
-		 * delete the file now.  Ugly, but hopefully won't happen often.
-		 *
-		 * XXX should we just leave the file orphaned instead?
-		 */
+		/* Notify the checkpointer about it. */
 		Assert(IsUnderPostmaster);
-		while (!ForwardFsyncRequest(rnode.node, MAIN_FORKNUM,
-									UNLINK_RELATION_REQUEST))
-			pg_usleep(10000L);	/* 10 msec seems a good number */
+		ForwardFsyncRequest(rnode.node, MAIN_FORKNUM, UNLINK_RELATION_REQUEST, -1);
 	}
 }
 
@@ -1531,7 +1594,7 @@ register_unlink(RelFileNodeBackend rnode)
  * heavyweight operation anyhow, so we'll live with it.)
  */
 void
-RememberFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno)
+RememberFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno, int fd)
 {
 	Assert(pendingOpsTable);
 
@@ -1549,18 +1612,28 @@ RememberFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno)
 			/*
 			 * We can't just delete the entry since mdsync could have an
 			 * active hashtable scan.  Instead we delete the bitmapsets; this
-			 * is safe because of the way mdsync is coded.  We also set the
-			 * "canceled" flags so that mdsync can tell that a cancel arrived
-			 * for the fork(s).
+			 * is safe because of the way mdsync is coded.
 			 */
 			if (forknum == InvalidForkNumber)
 			{
 				/* remove requests for all forks */
 				for (forknum = 0; forknum <= MAX_FORKNUM; forknum++)
 				{
+					int segno;
+
 					bms_free(entry->requests[forknum]);
 					entry->requests[forknum] = NULL;
-					entry->canceled[forknum] = true;
+
+					for (segno = 0; segno < entry->syncfd_len[forknum]; segno++)
+					{
+						if (entry->syncfds[forknum][segno] != -1)
+						{
+							open_fsync_queue_files--;
+							FileClose(entry->syncfds[forknum][segno]);
+							entry->syncfds[forknum][segno] = -1;
+						}
+					}
+
 				}
 			}
 			else
@@ -1568,7 +1641,16 @@ RememberFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno)
 				/* remove requests for single fork */
 				bms_free(entry->requests[forknum]);
 				entry->requests[forknum] = NULL;
-				entry->canceled[forknum] = true;
+
+				for (segno = 0; segno < entry->syncfd_len[forknum]; segno++)
+				{
+					if (entry->syncfds[forknum][segno] != -1)
+					{
+						open_fsync_queue_files--;
+						FileClose(entry->syncfds[forknum][segno]);
+						entry->syncfds[forknum][segno] = -1;
+					}
+				}
 			}
 		}
 	}
@@ -1592,7 +1674,6 @@ RememberFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno)
 				{
 					bms_free(entry->requests[forknum]);
 					entry->requests[forknum] = NULL;
-					entry->canceled[forknum] = true;
 				}
 			}
 		}
@@ -1646,7 +1727,8 @@ RememberFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno)
 		{
 			entry->cycle_ctr = GetCheckpointSyncCycle();
 			MemSet(entry->requests, 0, sizeof(entry->requests));
-			MemSet(entry->canceled, 0, sizeof(entry->canceled));
+			MemSet(entry->syncfds, 0, sizeof(entry->syncfds));
+			MemSet(entry->syncfd_len, 0, sizeof(entry->syncfd_len));
 		}
 
 		/*
@@ -1658,6 +1740,57 @@ RememberFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno)
 		entry->requests[forknum] = bms_add_member(entry->requests[forknum],
 												  (int) segno);
 
+		if (fd >= 0)
+		{
+			/* make space for entry */
+			if (entry->syncfds[forknum] == NULL)
+			{
+				int i;
+
+				entry->syncfds[forknum] = palloc(sizeof(File) * (segno + 1));
+				entry->syncfd_len[forknum] = segno + 1;
+
+				for (i = 0; i <= segno; i++)
+					entry->syncfds[forknum][i] = -1;
+			}
+			else  if (entry->syncfd_len[forknum] <= segno)
+			{
+				int i;
+
+				entry->syncfds[forknum] = repalloc(entry->syncfds[forknum],
+												   sizeof(File) * (segno + 1));
+
+				/* initialize newly created entries */
+				for (i = entry->syncfd_len[forknum]; i <= segno; i++)
+					entry->syncfds[forknum][i] = -1;
+
+				entry->syncfd_len[forknum] = segno + 1;
+			}
+
+			if (entry->syncfds[forknum][segno] == -1)
+			{
+				char *path = mdpath(entry->rnode, forknum, segno);
+				open_fsync_queue_files++;
+				/* caller must have reserved entry */
+				entry->syncfds[forknum][segno] =
+					FileOpenForFd(fd, path);
+				pfree(path);
+			}
+			else
+			{
+				/*
+				 * File is already open. Have to keep the older fd, errors
+				 * might only be reported to it, thus close the one we just
+				 * got.
+				 *
+				 * XXX: check for errrors.
+				 */
+				close(fd);
+			}
+
+			FlushFsyncRequestQueueIfNecessary();
+		}
+
 		MemoryContextSwitchTo(oldcxt);
 	}
 }
@@ -1674,22 +1807,12 @@ ForgetRelationFsyncRequests(RelFileNode rnode, ForkNumber forknum)
 	if (pendingOpsTable)
 	{
 		/* standalone backend or startup process: fsync state is local */
-		RememberFsyncRequest(rnode, forknum, FORGET_RELATION_FSYNC);
+		RememberFsyncRequest(rnode, forknum, FORGET_RELATION_FSYNC, -1);
 	}
 	else if (IsUnderPostmaster)
 	{
-		/*
-		 * Notify the checkpointer about it.  If we fail to queue the cancel
-		 * message, we have to sleep and try again ... ugly, but hopefully
-		 * won't happen often.
-		 *
-		 * XXX should we CHECK_FOR_INTERRUPTS in this loop?  Escaping with an
-		 * error would leave the no-longer-used file still present on disk,
-		 * which would be bad, so I'm inclined to assume that the checkpointer
-		 * will always empty the queue soon.
-		 */
-		while (!ForwardFsyncRequest(rnode, forknum, FORGET_RELATION_FSYNC))
-			pg_usleep(10000L);	/* 10 msec seems a good number */
+		/* Notify the checkpointer about it. */
+		ForwardFsyncRequest(rnode, forknum, FORGET_RELATION_FSYNC, -1);
 
 		/*
 		 * Note we don't wait for the checkpointer to actually absorb the
@@ -1713,14 +1836,12 @@ ForgetDatabaseFsyncRequests(Oid dbid)
 	if (pendingOpsTable)
 	{
 		/* standalone backend or startup process: fsync state is local */
-		RememberFsyncRequest(rnode, InvalidForkNumber, FORGET_DATABASE_FSYNC);
+		RememberFsyncRequest(rnode, InvalidForkNumber, FORGET_DATABASE_FSYNC, -1);
 	}
 	else if (IsUnderPostmaster)
 	{
 		/* see notes in ForgetRelationFsyncRequests */
-		while (!ForwardFsyncRequest(rnode, InvalidForkNumber,
-									FORGET_DATABASE_FSYNC))
-			pg_usleep(10000L);	/* 10 msec seems a good number */
+		ForwardFsyncRequest(rnode, InvalidForkNumber, FORGET_DATABASE_FSYNC, -1);
 	}
 }
 
diff --git a/src/include/postmaster/bgwriter.h b/src/include/postmaster/bgwriter.h
index 87a5cfad415..58ba671a907 100644
--- a/src/include/postmaster/bgwriter.h
+++ b/src/include/postmaster/bgwriter.h
@@ -16,6 +16,7 @@
 #define _BGWRITER_H
 
 #include "storage/block.h"
+#include "storage/fd.h"
 #include "storage/relfilenode.h"
 
 
@@ -31,9 +32,10 @@ extern void CheckpointerMain(void) pg_attribute_noreturn();
 extern void RequestCheckpoint(int flags);
 extern void CheckpointWriteDelay(int flags, double progress);
 
-extern bool ForwardFsyncRequest(RelFileNode rnode, ForkNumber forknum,
-					BlockNumber segno);
+extern void ForwardFsyncRequest(RelFileNode rnode, ForkNumber forknum,
+								BlockNumber segno, File file);
 extern void AbsorbFsyncRequests(void);
+extern void AbsorbAllFsyncRequests(void);
 
 extern Size CheckpointerShmemSize(void);
 extern void CheckpointerShmemInit(void);
@@ -43,4 +45,6 @@ extern uint32 IncCheckpointSyncCycle(void);
 
 extern bool FirstCallSinceLastCheckpoint(void);
 
+extern void CountBackendWrite(void);
+
 #endif							/* _BGWRITER_H */
diff --git a/src/include/postmaster/postmaster.h b/src/include/postmaster/postmaster.h
index 1877eef2391..e2ba64e8984 100644
--- a/src/include/postmaster/postmaster.h
+++ b/src/include/postmaster/postmaster.h
@@ -44,6 +44,11 @@ extern int	postmaster_alive_fds[2];
 #define POSTMASTER_FD_OWN		1	/* kept open by postmaster only */
 #endif
 
+#define FSYNC_FD_SUBMIT			0
+#define FSYNC_FD_PROCESS		1
+
+extern int	fsync_fds[2];
+
 extern PGDLLIMPORT const char *progname;
 
 extern void PostmasterMain(int argc, char *argv[]) pg_attribute_noreturn();
diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h
index 558e4d8518b..798a9652927 100644
--- a/src/include/storage/smgr.h
+++ b/src/include/storage/smgr.h
@@ -140,7 +140,8 @@ extern void mdpostckpt(void);
 
 extern void SetForwardFsyncRequests(void);
 extern void RememberFsyncRequest(RelFileNode rnode, ForkNumber forknum,
-					 BlockNumber segno);
+					 BlockNumber segno, int fd);
+extern bool FlushFsyncRequestQueueIfNecessary(void);
 extern void ForgetRelationFsyncRequests(RelFileNode rnode, ForkNumber forknum);
 extern void ForgetDatabaseFsyncRequests(Oid dbid);
 
-- 
2.17.0.rc1.dirty

Reply via email to