On 2015-09-10 17:15:26 +0200, Fabien COELHO wrote:
> Here is a v13, which is just a rebase after 1aba62ec.

And here's v14. It's not something entirely ready. A lot of details have
changed, I unfortunately don't remember them all. But there are more
important things than the details of the patch.

I've played *a lot* with this patch. I found a bunch of issues:

1) The FileFlushContext context infrastructure isn't actually
   correct. There's two problems: First, using the actual 'fd' number to
   reference a to-be-flushed file isn't meaningful. If there  are lots
   of files open, fds get reused within fd.c. That part is enough fixed
   by referencing File instead the fd. The bigger problem is that the
   infrastructure doesn't deal with files being closed. There can, which
   isn't that hard to trigger, be smgr invalidations causing smgr handle
   and thus the file to be closed.

   I think this means that the entire flushing infrastructure actually
   needs to be hoisted up, onto the smgr/md level.

2) I noticed that sync_file_range() blocked far more often than I'd
   expected. Reading the kernel code that turned out to be caused by a
   pessimization in the kernel introduced years ago - in many situation
   SFR_WRITE waited for the writes. A fix for this will be in the 4.4
   kernel.

3) I found that latency wasn't improved much for workloads that are
   significantly bigger than shared buffers. The problem here is that
   neither bgwriter nor the backends have, so far, done
   sync_file_range() calls. That meant that the old problem of having
   gigabytes of dirty data that periodically get flushed out, still
   exists. Having these do flushes mostly attacks that problem.


Benchmarking revealed that for workloads where the hot data set mostly
fits into shared buffers flushing and sorting is anywhere from a small
to a massive improvement, both in throughput and latency. Even without
the patch from 2), although fixing that improves things furhter.



What I did not expect, and what confounded me for a long while, is that
for workloads where the hot data set does *NOT* fit into shared buffers,
sorting often led to be a noticeable reduction in throughput. Up to
30%. The performance was still much more regular than before, i.e. no
more multi-second periods without any transactions happening.

By now I think I know what's going on: Before the sorting portion of the
patch the write-loop in BufferSync() starts at the current clock hand,
by using StrategySyncStart(). But after the sorting that obviously
doesn't happen anymore - buffers are accessed in their sort order. By
starting at the current clock hand and moving on from there the
checkpointer basically makes it more less likely that victim buffers
need to be written either by the backends themselves or by
bgwriter. That means that the sorted checkpoint writes can, indirectly,
increase the number of unsorted writes by other processes :(

My benchmarking suggest that that effect is the larger, the shorter the
checkpoint timeout is. That seems to intuitively make sense, give the
above explanation attempt. If the checkpoint takes longer the clock hand
will almost certainly soon overtake checkpoints 'implicit' hand.

I'm not sure if we can really do anything about this problem. While I'm
pretty jet lagged, I still spent a fair amount of time thinking about
it. Seems to suggest that we need to bring back the setting to
enable/disable sorting :(


What I think needs to happen next with the patch is:
1) Hoist up the FileFlushContext stuff into the smgr layer. Carefully
   handling the issue of smgr invalidations.
2) Replace the boolean checkpoint_flush_to_disk GUC with a list guc that
   later can contain multiple elements like checkpoint, bgwriter,
   backends, ddl, bulk-writes. That seems better than adding GUCs for
   these separately. Then make the flush locations in the patch
   configurable using that.
3) I think we should remove the sort timing from the checkpoint logging
   before commit. It'll always be pretty short.


Greetings,

Andres Freund
>From dd0868d2c714bf18d34f82db40669b435d4b2ba2 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Fri, 23 Oct 2015 15:22:04 +0200
Subject: [PATCH] ckpt-14-andres

---
 doc/src/sgml/config.sgml                      |  18 ++
 doc/src/sgml/wal.sgml                         |  12 +
 src/backend/access/heap/rewriteheap.c         |   2 +-
 src/backend/access/nbtree/nbtree.c            |   2 +-
 src/backend/access/nbtree/nbtsort.c           |   2 +-
 src/backend/access/spgist/spginsert.c         |   6 +-
 src/backend/access/transam/xlog.c             |  11 +-
 src/backend/storage/buffer/README             |   5 -
 src/backend/storage/buffer/buf_init.c         |  24 +-
 src/backend/storage/buffer/bufmgr.c           | 365 ++++++++++++++++++++++----
 src/backend/storage/buffer/freelist.c         |   6 +-
 src/backend/storage/buffer/localbuf.c         |   3 +-
 src/backend/storage/file/buffile.c            |   3 +-
 src/backend/storage/file/copydir.c            |   4 +-
 src/backend/storage/file/fd.c                 | 242 +++++++++++++++--
 src/backend/storage/smgr/md.c                 |   6 +-
 src/backend/storage/smgr/smgr.c               |   8 +-
 src/backend/utils/misc/guc.c                  |  12 +
 src/backend/utils/misc/postgresql.conf.sample |   2 +
 src/include/access/xlog.h                     |   2 +
 src/include/storage/buf_internals.h           |  18 ++
 src/include/storage/bufmgr.h                  |   8 +
 src/include/storage/fd.h                      |  37 ++-
 src/include/storage/smgr.h                    |   7 +-
 src/tools/pgindent/typedefs.list              |   1 +
 25 files changed, 705 insertions(+), 101 deletions(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 5549de7..7db7ae7 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -2452,6 +2452,24 @@ include_dir 'conf.d'
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-checkpoint-flush-to-disk" xreflabel="checkpoint_flush_to_disk">
+      <term><varname>checkpoint_flush_to_disk</varname> (<type>bool</type>)
+      <indexterm>
+       <primary><varname>checkpoint_flush_to_disk</> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        When writing data for a checkpoint, hint the underlying OS that the
+        data must be sent to disk as soon as possible.  This may help smoothing
+        disk I/O writes and avoid a stall when fsync is issued at the end of
+        the checkpoint, but it may also reduce average performance.
+        This setting may have no effect on some platforms.
+        The default is <literal>on</> on Linux, <literal>off</> otherwise.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-checkpoint-warning" xreflabel="checkpoint_warning">
       <term><varname>checkpoint_warning</varname> (<type>integer</type>)
       <indexterm>
diff --git a/doc/src/sgml/wal.sgml b/doc/src/sgml/wal.sgml
index e3941c9..a4b8d91 100644
--- a/doc/src/sgml/wal.sgml
+++ b/doc/src/sgml/wal.sgml
@@ -546,6 +546,18 @@
   </para>
 
   <para>
+   On Linux and POSIX platforms, <xref linkend="guc-checkpoint-flush-to-disk">
+   allows to hint the OS that pages written on checkpoints must be flushed
+   to disk quickly.  Otherwise, these pages may be kept in cache for some time,
+   inducing a stall later when <literal>fsync</> is called to actually
+   complete the checkpoint.  This setting helps to reduce transaction latency,
+   but it may also have a small adverse effect on the average transaction rate
+   at maximum throughput on some OS.  It should be beneficial for high write
+   loads on HDD.  This feature probably brings no benefit on SSD, as the I/O
+   write latency is small on such hardware, thus it may be disabled.
+  </para>
+
+  <para>
    The number of WAL segment files in <filename>pg_xlog</> directory depends on
    <varname>min_wal_size</>, <varname>max_wal_size</> and
    the amount of WAL generated in previous checkpoint cycles. When old log
diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c
index 6a6fc3b..95f086d 100644
--- a/src/backend/access/heap/rewriteheap.c
+++ b/src/backend/access/heap/rewriteheap.c
@@ -918,7 +918,7 @@ logical_heap_rewrite_flush_mappings(RewriteState state)
 		 * Note that we deviate from the usual WAL coding practices here,
 		 * check the above "Logical rewrite support" comment for reasoning.
 		 */
-		written = FileWrite(src->vfd, waldata_start, len);
+		written = FileWrite(src->vfd, waldata_start, len, NULL);
 		if (written != len)
 			ereport(ERROR,
 					(errcode_for_file_access(),
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index cf4a6dc..efb3338 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -203,7 +203,7 @@ btbuildempty(PG_FUNCTION_ARGS)
 	/* Write the page.  If archiving/streaming, XLOG it. */
 	PageSetChecksumInplace(metapage, BTREE_METAPAGE);
 	smgrwrite(index->rd_smgr, INIT_FORKNUM, BTREE_METAPAGE,
-			  (char *) metapage, true);
+			  (char *) metapage, true, NULL);
 	if (XLogIsNeeded())
 		log_newpage(&index->rd_smgr->smgr_rnode.node, INIT_FORKNUM,
 					BTREE_METAPAGE, metapage, false);
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index f95f67a..f8976f1 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -315,7 +315,7 @@ _bt_blwritepage(BTWriteState *wstate, Page page, BlockNumber blkno)
 	{
 		/* overwriting a block we zero-filled before */
 		smgrwrite(wstate->index->rd_smgr, MAIN_FORKNUM, blkno,
-				  (char *) page, true);
+				  (char *) page, true, NULL);
 	}
 
 	pfree(page);
diff --git a/src/backend/access/spgist/spginsert.c b/src/backend/access/spgist/spginsert.c
index bceee8d..149c1c4 100644
--- a/src/backend/access/spgist/spginsert.c
+++ b/src/backend/access/spgist/spginsert.c
@@ -170,7 +170,7 @@ spgbuildempty(PG_FUNCTION_ARGS)
 	/* Write the page.  If archiving/streaming, XLOG it. */
 	PageSetChecksumInplace(page, SPGIST_METAPAGE_BLKNO);
 	smgrwrite(index->rd_smgr, INIT_FORKNUM, SPGIST_METAPAGE_BLKNO,
-			  (char *) page, true);
+			  (char *) page, true, NULL);
 	if (XLogIsNeeded())
 		log_newpage(&index->rd_smgr->smgr_rnode.node, INIT_FORKNUM,
 					SPGIST_METAPAGE_BLKNO, page, false);
@@ -180,7 +180,7 @@ spgbuildempty(PG_FUNCTION_ARGS)
 
 	PageSetChecksumInplace(page, SPGIST_ROOT_BLKNO);
 	smgrwrite(index->rd_smgr, INIT_FORKNUM, SPGIST_ROOT_BLKNO,
-			  (char *) page, true);
+			  (char *) page, true, NULL);
 	if (XLogIsNeeded())
 		log_newpage(&index->rd_smgr->smgr_rnode.node, INIT_FORKNUM,
 					SPGIST_ROOT_BLKNO, page, true);
@@ -190,7 +190,7 @@ spgbuildempty(PG_FUNCTION_ARGS)
 
 	PageSetChecksumInplace(page, SPGIST_NULL_BLKNO);
 	smgrwrite(index->rd_smgr, INIT_FORKNUM, SPGIST_NULL_BLKNO,
-			  (char *) page, true);
+			  (char *) page, true, NULL);
 	if (XLogIsNeeded())
 		log_newpage(&index->rd_smgr->smgr_rnode.node, INIT_FORKNUM,
 					SPGIST_NULL_BLKNO, page, true);
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 08d1682..a40f7d5 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7980,11 +7980,13 @@ LogCheckpointEnd(bool restartpoint)
 				sync_secs,
 				total_secs,
 				longest_secs,
+				sort_secs,
 				average_secs;
 	int			write_usecs,
 				sync_usecs,
 				total_usecs,
 				longest_usecs,
+				sort_usecs,
 				average_usecs;
 	uint64		average_sync_time;
 
@@ -8015,6 +8017,10 @@ LogCheckpointEnd(bool restartpoint)
 						CheckpointStats.ckpt_end_t,
 						&total_secs, &total_usecs);
 
+	TimestampDifference(CheckpointStats.ckpt_sort_t,
+						CheckpointStats.ckpt_sort_end_t,
+						&sort_secs, &sort_usecs);
+
 	/*
 	 * Timing values returned from CheckpointStats are in microseconds.
 	 * Convert to the second plus microsecond form that TimestampDifference
@@ -8033,8 +8039,8 @@ LogCheckpointEnd(bool restartpoint)
 
 	elog(LOG, "%s complete: wrote %d buffers (%.1f%%); "
 		 "%d transaction log file(s) added, %d removed, %d recycled; "
-		 "write=%ld.%03d s, sync=%ld.%03d s, total=%ld.%03d s; "
-		 "sync files=%d, longest=%ld.%03d s, average=%ld.%03d s; "
+		 "sort=%ld.%03d s, write=%ld.%03d s, sync=%ld.%03d s, total=%ld.%03d s;"
+		 " sync files=%d, longest=%ld.%03d s, average=%ld.%03d s; "
 		 "distance=%d kB, estimate=%d kB",
 		 restartpoint ? "restartpoint" : "checkpoint",
 		 CheckpointStats.ckpt_bufs_written,
@@ -8042,6 +8048,7 @@ LogCheckpointEnd(bool restartpoint)
 		 CheckpointStats.ckpt_segs_added,
 		 CheckpointStats.ckpt_segs_removed,
 		 CheckpointStats.ckpt_segs_recycled,
+		 sort_secs, sort_usecs / 1000,
 		 write_secs, write_usecs / 1000,
 		 sync_secs, sync_usecs / 1000,
 		 total_secs, total_usecs / 1000,
diff --git a/src/backend/storage/buffer/README b/src/backend/storage/buffer/README
index 45c5c83..e33e2ba 100644
--- a/src/backend/storage/buffer/README
+++ b/src/backend/storage/buffer/README
@@ -265,11 +265,6 @@ only needs to take the lock long enough to read the variable value, not
 while scanning the buffers.  (This is a very substantial improvement in
 the contention cost of the writer compared to PG 8.0.)
 
-During a checkpoint, the writer's strategy must be to write every dirty
-buffer (pinned or not!).  We may as well make it start this scan from
-nextVictimBuffer, however, so that the first-to-be-written pages are the
-ones that backends might otherwise have to write for themselves soon.
-
 The background writer takes shared content lock on a buffer while writing it
 out (and anyone else who flushes buffer contents to disk must do so too).
 This ensures that the page image transferred to disk is reasonably consistent.
diff --git a/src/backend/storage/buffer/buf_init.c b/src/backend/storage/buffer/buf_init.c
index 3ae2848..c6a3be8 100644
--- a/src/backend/storage/buffer/buf_init.c
+++ b/src/backend/storage/buffer/buf_init.c
@@ -20,6 +20,7 @@
 
 BufferDescPadded *BufferDescriptors;
 char	   *BufferBlocks;
+CkptSortItem *CkptBufferIds;
 
 
 /*
@@ -65,7 +66,8 @@ void
 InitBufferPool(void)
 {
 	bool		foundBufs,
-				foundDescs;
+				foundDescs,
+				foundBufCkpt;
 
 	/* Align descriptors to a cacheline boundary. */
 	BufferDescriptors = (BufferDescPadded *) CACHELINEALIGN(
@@ -77,10 +79,21 @@ InitBufferPool(void)
 		ShmemInitStruct("Buffer Blocks",
 						NBuffers * (Size) BLCKSZ, &foundBufs);
 
-	if (foundDescs || foundBufs)
+	/*
+	 * The array used to sort to-be-checkpointed buffer ids is located in
+	 * shared memory, to avoid having to allocate significant amounts of
+	 * memory at runtime. As that'd be in the middle of a checkpoint, or when
+	 * the checkpointer is restarted, memory allocation failures would be
+	 * painful.
+	 */
+	CkptBufferIds = (CkptSortItem *)
+		ShmemInitStruct("Checkpoint BufferIds",
+						NBuffers * sizeof(CkptSortItem), &foundBufCkpt);
+
+	if (foundDescs || foundBufs || foundBufCkpt)
 	{
-		/* both should be present or neither */
-		Assert(foundDescs && foundBufs);
+		/* all should be present or neither */
+		Assert(foundDescs && foundBufs && foundBufCkpt);
 		/* note: this path is only taken in EXEC_BACKEND case */
 	}
 	else
@@ -144,5 +157,8 @@ BufferShmemSize(void)
 	/* size of stuff controlled by freelist.c */
 	size = add_size(size, StrategyShmemSize());
 
+	/* size of checkpoint sort array in bufmgr.c */
+	size = add_size(size, mul_size(NBuffers, sizeof(CkptSortItem)));
+
 	return size;
 }
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 8c0358e..5fb09c8 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -37,6 +37,7 @@
 #include "catalog/catalog.h"
 #include "catalog/storage.h"
 #include "executor/instrument.h"
+#include "lib/binaryheap.h"
 #include "miscadmin.h"
 #include "pg_trace.h"
 #include "pgstat.h"
@@ -47,6 +48,7 @@
 #include "storage/proc.h"
 #include "storage/smgr.h"
 #include "storage/standby.h"
+#include "utils/memutils.h"
 #include "utils/rel.h"
 #include "utils/resowner_private.h"
 #include "utils/timestamp.h"
@@ -75,6 +77,36 @@ typedef struct PrivateRefCountEntry
 /* 64 bytes, about the size of a cache line on common systems */
 #define REFCOUNT_ARRAY_ENTRIES 8
 
+/*
+ * Status of buffers to checkpoint for a particular tablespace, used
+ * internally in BufferSync.
+ */
+typedef struct CkptTsStatus
+{
+	/* oid of the tablespace */
+	Oid			tsId;
+
+	/*
+	 * Checkpoint progress for this tablespace. To make progress comparable
+	 * between tablespaces the progress is, for each tablespace, measured as a
+	 * number between 0 and the total number of to-be-checkpointed pages. Each
+	 * page checkpointed in this tablespace increments this space's progress
+	 * by progress_slice.
+	 */
+	float8		progress;
+	float8		progress_slice;
+
+	/* number of to-be checkpointed pages in this tablespace */
+	int			num_to_scan;
+	/* already processed pages in this tablespace */
+	int			num_scanned;
+
+	/* current offset in CkptBufferIds for this tablespace */
+	int			index;
+
+	FileFlushContext flushContext;
+}	CkptTsStatus;
+
 /* GUC variables */
 bool		zero_damaged_pages = false;
 int			bgwriter_lru_maxpages = 100;
@@ -82,6 +114,9 @@ double		bgwriter_lru_multiplier = 2.0;
 bool		track_io_timing = false;
 int			effective_io_concurrency = 0;
 
+/* hint to move writes to high priority */
+bool		checkpoint_flush_to_disk = DEFAULT_CHECKPOINT_FLUSH_TO_DISK;
+
 /*
  * How many buffers PrefetchBuffer callers should try to stay ahead of their
  * ReadBuffer calls by.  This is maintained by the assign hook for
@@ -399,7 +434,8 @@ static bool PinBuffer(volatile BufferDesc *buf, BufferAccessStrategy strategy);
 static void PinBuffer_Locked(volatile BufferDesc *buf);
 static void UnpinBuffer(volatile BufferDesc *buf, bool fixOwner);
 static void BufferSync(int flags);
-static int	SyncOneBuffer(int buf_id, bool skip_recently_used);
+static int SyncOneBuffer(int buf_id, bool skip_recently_used,
+			  FileFlushContext *flush_context);
 static void WaitIO(volatile BufferDesc *buf);
 static bool StartBufferIO(volatile BufferDesc *buf, bool forInput);
 static void TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty,
@@ -412,10 +448,13 @@ static volatile BufferDesc *BufferAlloc(SMgrRelation smgr,
 			BlockNumber blockNum,
 			BufferAccessStrategy strategy,
 			bool *foundPtr);
-static void FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln);
+static void FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln,
+			FileFlushContext *flush_context);
 static void AtProcExit_Buffers(int code, Datum arg);
 static void CheckForBufferLeaks(void);
 static int	rnode_comparator(const void *p1, const void *p2);
+static int	ckpt_buforder_comparator(const void *pa, const void *pb);
+static int	ts_ckpt_progress_comparator(Datum a, Datum b, void *arg);
 
 
 /*
@@ -943,6 +982,14 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 	int			buf_id;
 	volatile BufferDesc *buf;
 	bool		valid;
+	static FileFlushContext *context = NULL;
+
+	/* XXX: Should probably rather be in buf_init() */
+	if (context == NULL)
+	{
+		context = MemoryContextAlloc(TopMemoryContext, sizeof(*context));
+		FlushContextInit(context, FLUSH_CONTEXT_DEFAULT_MAX_COALESCE);
+	}
 
 	/* create a tag so we can lookup the buffer */
 	INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum);
@@ -1078,8 +1125,8 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 											   smgr->smgr_rnode.node.spcNode,
 												smgr->smgr_rnode.node.dbNode,
 											  smgr->smgr_rnode.node.relNode);
-
-				FlushBuffer(buf, NULL);
+				/* FIXME: configurable */
+				FlushBuffer(buf, NULL, context);
 				LWLockRelease(buf->content_lock);
 
 				TRACE_POSTGRESQL_BUFFER_WRITE_DIRTY_DONE(forkNum, blockNum,
@@ -1637,10 +1684,16 @@ BufferSync(int flags)
 {
 	int			buf_id;
 	int			num_to_scan;
-	int			num_to_write;
+	int			num_spaces;
+	int			num_processed;
 	int			num_written;
+	CkptTsStatus *per_ts_stat = NULL;
+	Oid			last_tsid;
+	binaryheap *ts_heap;
+	int			i;
 	int			mask = BM_DIRTY;
 
+
 	/* Make sure we can handle the pin inside SyncOneBuffer */
 	ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
 
@@ -1655,7 +1708,7 @@ BufferSync(int flags)
 
 	/*
 	 * Loop over all buffers, and mark the ones that need to be written with
-	 * BM_CHECKPOINT_NEEDED.  Count them as we go (num_to_write), so that we
+	 * BM_CHECKPOINT_NEEDED.  Count them as we go (num_to_scan), so that we
 	 * can estimate how much work needs to be done.
 	 *
 	 * This allows us to write only those pages that were dirty when the
@@ -1669,7 +1722,7 @@ BufferSync(int flags)
 	 * BM_CHECKPOINT_NEEDED still set.  This is OK since any such buffer would
 	 * certainly need to be written for the next checkpoint attempt, too.
 	 */
-	num_to_write = 0;
+	num_to_scan = 0;
 	for (buf_id = 0; buf_id < NBuffers; buf_id++)
 	{
 		volatile BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
@@ -1682,32 +1735,144 @@ BufferSync(int flags)
 
 		if ((bufHdr->flags & mask) == mask)
 		{
+			CkptSortItem *item;
+
 			bufHdr->flags |= BM_CHECKPOINT_NEEDED;
-			num_to_write++;
+
+			item = &CkptBufferIds[num_to_scan++];
+			item->buf_id = buf_id;
+			item->tsId = bufHdr->tag.rnode.spcNode;
+			item->relNode = bufHdr->tag.rnode.relNode;
+			item->forkNum = bufHdr->tag.forkNum;
+			item->blockNum = bufHdr->tag.blockNum;
 		}
 
 		UnlockBufHdr(bufHdr);
 	}
 
-	if (num_to_write == 0)
+	if (num_to_scan == 0)
 		return;					/* nothing to do */
 
-	TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_write);
+	TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_scan);
 
 	/*
-	 * Loop over all buffers again, and write the ones (still) marked with
-	 * BM_CHECKPOINT_NEEDED.  In this loop, we start at the clock sweep point
-	 * since we might as well dump soon-to-be-recycled buffers first.
-	 *
-	 * Note that we don't read the buffer alloc count here --- that should be
-	 * left untouched till the next BgBufferSync() call.
+	 * Sort buffers that need to be written to reduce the likelihood of random
+	 * IO. The sorting is also important for the implementation of balancing
+	 * writes between tablespaces. Without balancing writes we'd potentially
+	 * end up writing to the tablespaces one-by-one; possibly overloading the
+	 * underlying system.
+	 */
+	CheckpointStats.ckpt_sort_t = GetCurrentTimestamp();
+	qsort(CkptBufferIds, num_to_scan, sizeof(CkptSortItem),
+		  ckpt_buforder_comparator);
+	CheckpointStats.ckpt_sort_end_t = GetCurrentTimestamp();
+
+	num_spaces = 0;
+
+	/*
+	 * Allocate progress status for each tablespace with buffers that need to
+	 * be flushed. This requires the to-be-flushed array to be sorted.
+	 */
+	last_tsid = InvalidOid;
+	for (i = 0; i < num_to_scan; i++)
+	{
+		CkptTsStatus *s;
+		Oid			cur_tsid;
+
+		cur_tsid = CkptBufferIds[i].tsId;
+
+		/*
+		 * Grow array of per-tablespace status structs, everytime a new
+		 * tablespace is found.
+		 */
+		if (last_tsid == InvalidOid || last_tsid != cur_tsid)
+		{
+			Size		sz;
+
+			num_spaces++;
+
+			/*
+			 * Not worth adding grow-by-power-of-2 logic here - even with a
+			 * few hundred tablespaces this will be fine.
+			 */
+			sz = sizeof(CkptTsStatus) * num_spaces;
+
+			if (per_ts_stat == NULL)
+				per_ts_stat = (CkptTsStatus *) palloc(sz);
+			else
+				per_ts_stat = (CkptTsStatus *) repalloc(per_ts_stat, sz);
+
+			s = &per_ts_stat[num_spaces - 1];
+			memset(s, 0, sizeof(*s));
+			s->tsId = cur_tsid;
+
+			/*
+			 * The first buffer in this tablespace. As CkptBufferIds is sorted
+			 * by tablespace all (s->num_to_scan) buffers in this tablespace
+			 * will follow afterwards.
+			 */
+			s->index = i;
+
+			/*
+			 * The progress_slice will be computed once we know how many
+			 * buffers are in this tablespace, i.e. after this loop.
+			 */
+
+			last_tsid = cur_tsid;
+		}
+		else
+		{
+			s = &per_ts_stat[num_spaces - 1];
+		}
+
+		s->num_to_scan++;
+	}
+
+	Assert(num_spaces > 0);
+
+	/*
+	 * Build a min-heap over the write-progress in the individual tablespaces,
+	 * and compute how large a portion of the total progress a single
+	 * processed buffer is.
 	 */
-	buf_id = StrategySyncStart(NULL, NULL);
-	num_to_scan = NBuffers;
+	ts_heap = binaryheap_allocate(num_spaces,
+								  ts_ckpt_progress_comparator,
+								  NULL);
+
+	for (i = 0; i < num_spaces; i++)
+	{
+		CkptTsStatus *ts_stat = &per_ts_stat[i];
+
+		ts_stat->progress_slice = (float8) num_to_scan / ts_stat->num_to_scan;
+
+		FlushContextInit(&ts_stat->flushContext,
+						 FLUSH_CONTEXT_DEFAULT_MAX_COALESCE);
+
+		binaryheap_add_unordered(ts_heap, PointerGetDatum(ts_stat));
+	}
+
+	binaryheap_build(ts_heap);
+
+	/*
+	 * Iterate through to-be-checkpointed buffers and write the ones (still)
+	 * marked with BM_CHECKPOINT_NEEDED. The writes are balanced between
+	 * tablespaces.
+	 */
+	num_processed = 0;
 	num_written = 0;
-	while (num_to_scan-- > 0)
+	while (!binaryheap_empty(ts_heap))
 	{
-		volatile BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
+		volatile BufferDesc *bufHdr = NULL;
+		CkptTsStatus *ts_stat = (CkptTsStatus *)
+		DatumGetPointer(binaryheap_first(ts_heap));
+
+		buf_id = CkptBufferIds[ts_stat->index].buf_id;
+		Assert(buf_id != -1);
+
+		bufHdr = GetBufferDescriptor(buf_id);
+		Assert(bufHdr->tag.rnode.spcNode == ts_stat->tsId);
+
+		num_processed++;
 
 		/*
 		 * We don't need to acquire the lock here, because we're only looking
@@ -1723,44 +1888,69 @@ BufferSync(int flags)
 		 */
 		if (bufHdr->flags & BM_CHECKPOINT_NEEDED)
 		{
-			if (SyncOneBuffer(buf_id, false) & BUF_WRITTEN)
+			FileFlushContext *context;
+
+			if (checkpoint_flush_to_disk)
+				context = &ts_stat->flushContext;
+			else
+				context = NULL;
+
+			if (SyncOneBuffer(buf_id, false, context) & BUF_WRITTEN)
 			{
 				TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id);
 				BgWriterStats.m_buf_written_checkpoints++;
 				num_written++;
+			}
+		}
 
-				/*
-				 * We know there are at most num_to_write buffers with
-				 * BM_CHECKPOINT_NEEDED set; so we can stop scanning if
-				 * num_written reaches num_to_write.
-				 *
-				 * Note that num_written doesn't include buffers written by
-				 * other backends, or by the bgwriter cleaning scan. That
-				 * means that the estimate of how much progress we've made is
-				 * conservative, and also that this test will often fail to
-				 * trigger.  But it seems worth making anyway.
-				 */
-				if (num_written >= num_to_write)
-					break;
+		/*
+		 * Measure progress independent of actualy having to flush the buffer
+		 * - otherwise writing become unbalanced.
+		 */
+		ts_stat->progress += ts_stat->progress_slice;
+		ts_stat->num_scanned++;
+		ts_stat->index++;
 
-				/*
-				 * Sleep to throttle our I/O rate.
-				 */
-				CheckpointWriteDelay(flags, (double) num_written / num_to_write);
-			}
+		/* Have all the buffers from the tablespace been processed? */
+		if (ts_stat->num_scanned == ts_stat->num_to_scan)
+		{
+			/*
+			 * If there's a pending flush, perform that now, we're finished
+			 * with the tablespace.
+			 */
+			FlushContextIssuePending(&ts_stat->flushContext);
+
+			binaryheap_remove_first(ts_heap);
+		}
+		else
+		{
+			/* update heap with the new progress */
+			binaryheap_replace_first(ts_heap, PointerGetDatum(ts_stat));
 		}
 
-		if (++buf_id >= NBuffers)
-			buf_id = 0;
+		/*
+		 * Sleep to throttle our I/O rate.
+		 */
+		CheckpointWriteDelay(flags, (double) num_processed / num_to_scan);
+#ifdef CHECKPOINTER_DEBUG
+		/* delete current content of the line, print progress */
+		fprintf(stderr, "\33[2K\rto_scan: %d, scanned: %d, %%processed: %.2f, %%writeouts: %.2f",
+				num_to_scan, num_processed,
+				(((double) num_processed) / num_to_scan) * 100,
+				((double) num_written / num_processed) * 100);
+#endif
 	}
 
+	pfree(per_ts_stat);
+	per_ts_stat = NULL;
+
 	/*
 	 * Update checkpoint statistics. As noted above, this doesn't include
 	 * buffers written by other backends or bgwriter scan.
 	 */
 	CheckpointStats.ckpt_bufs_written += num_written;
 
-	TRACE_POSTGRESQL_BUFFER_SYNC_DONE(NBuffers, num_written, num_to_write);
+	TRACE_POSTGRESQL_BUFFER_SYNC_DONE(NBuffers, num_written, num_to_scan);
 }
 
 /*
@@ -1818,6 +2008,10 @@ BgBufferSync(void)
 	long		new_strategy_delta;
 	uint32		new_recent_alloc;
 
+	FileFlushContext context;
+
+	FlushContextInit(&context, FLUSH_CONTEXT_DEFAULT_MAX_COALESCE);
+
 	/*
 	 * Find out where the freelist clock sweep currently is, and how many
 	 * buffer allocations have happened since our last call.
@@ -2000,7 +2194,15 @@ BgBufferSync(void)
 	/* Execute the LRU scan */
 	while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est)
 	{
-		int			buffer_state = SyncOneBuffer(next_to_clean, true);
+		int			buffer_state;
+
+		/*
+		 * FIXME: flushing should be configurable.
+		 *
+		 * Flushing here is important for latency, but also not unproblematic,
+		 * because the buffers are written out entirely unsorted.
+		 */
+		buffer_state = SyncOneBuffer(next_to_clean, true, &context);
 
 		if (++next_to_clean >= NBuffers)
 		{
@@ -2077,7 +2279,8 @@ BgBufferSync(void)
  * Note: caller must have done ResourceOwnerEnlargeBuffers.
  */
 static int
-SyncOneBuffer(int buf_id, bool skip_recently_used)
+SyncOneBuffer(int buf_id, bool skip_recently_used,
+			  FileFlushContext *flush_context)
 {
 	volatile BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
 	int			result = 0;
@@ -2118,7 +2321,7 @@ SyncOneBuffer(int buf_id, bool skip_recently_used)
 	PinBuffer_Locked(bufHdr);
 	LWLockAcquire(bufHdr->content_lock, LW_SHARED);
 
-	FlushBuffer(bufHdr, NULL);
+	FlushBuffer(bufHdr, NULL, flush_context);
 
 	LWLockRelease(bufHdr->content_lock);
 	UnpinBuffer(bufHdr, true);
@@ -2380,9 +2583,16 @@ BufferGetTag(Buffer buffer, RelFileNode *rnode, ForkNumber *forknum,
  *
  * If the caller has an smgr reference for the buffer's relation, pass it
  * as the second parameter.  If not, pass NULL.
+ *
+ * The third parameter tries to hint the OS that a high priority write is meant,
+ * possibly because io-throttling is already managed elsewhere.
+ * The last parameter holds the current flush context that accumulates flush
+ * requests to be performed in one call, instead of being performed on a buffer
+ * per buffer basis.
  */
 static void
-FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln)
+FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln,
+			FileFlushContext *flush_context)
 {
 	XLogRecPtr	recptr;
 	ErrorContextCallback errcallback;
@@ -2471,7 +2681,8 @@ FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln)
 			  buf->tag.forkNum,
 			  buf->tag.blockNum,
 			  bufToWrite,
-			  false);
+			  false,
+			  flush_context);
 
 	if (track_io_timing)
 	{
@@ -2893,7 +3104,8 @@ FlushRelationBuffers(Relation rel)
 						  bufHdr->tag.forkNum,
 						  bufHdr->tag.blockNum,
 						  localpage,
-						  false);
+						  false,
+						  NULL);
 
 				bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
 
@@ -2927,7 +3139,7 @@ FlushRelationBuffers(Relation rel)
 		{
 			PinBuffer_Locked(bufHdr);
 			LWLockAcquire(bufHdr->content_lock, LW_SHARED);
-			FlushBuffer(bufHdr, rel->rd_smgr);
+			FlushBuffer(bufHdr, rel->rd_smgr, NULL);
 			LWLockRelease(bufHdr->content_lock);
 			UnpinBuffer(bufHdr, true);
 		}
@@ -2979,7 +3191,7 @@ FlushDatabaseBuffers(Oid dbid)
 		{
 			PinBuffer_Locked(bufHdr);
 			LWLockAcquire(bufHdr->content_lock, LW_SHARED);
-			FlushBuffer(bufHdr, NULL);
+			FlushBuffer(bufHdr, NULL, NULL);
 			LWLockRelease(bufHdr->content_lock);
 			UnpinBuffer(bufHdr, true);
 		}
@@ -3701,3 +3913,56 @@ rnode_comparator(const void *p1, const void *p2)
 	else
 		return 0;
 }
+
+/*
+ * Comparator determining the writeout order in a checkpoint.
+ *
+ * It is important that tablespaces are compared first as the logic balancing
+ * writes between tablespaces relies on it.
+ */
+static int
+ckpt_buforder_comparator(const void *pa, const void *pb)
+{
+	const CkptSortItem *a = (CkptSortItem *) pa;
+	const CkptSortItem *b = (CkptSortItem *) pb;
+
+	/* compare tablespace */
+	if (a->tsId < b->tsId)
+		return -1;
+	else if (a->tsId > b->tsId)
+		return 1;
+	/* compare relation */
+	if (a->relNode < b->relNode)
+		return -1;
+	else if (a->relNode > b->relNode)
+		return 1;
+	/* compare fork */
+	else if (a->forkNum < b->forkNum)
+		return -1;
+	else if (a->forkNum > b->forkNum)
+		return 1;
+	/* compare block number */
+	else if (a->blockNum < b->blockNum)
+		return -1;
+	else	/* should not be the same block anyway... */
+		return 1;
+}
+
+/*
+ * Comparator for a Min-Heap over the, per-tablespace, checkpoint completion
+ * progress.
+ */
+static int
+ts_ckpt_progress_comparator(Datum a, Datum b, void *arg)
+{
+	CkptTsStatus *sa = (CkptTsStatus *) a;
+	CkptTsStatus *sb = (CkptTsStatus *) b;
+
+	/* we want a min-heap, so return 1 for the a < b */
+	if (sa->progress < sb->progress)
+		return 1;
+	else if (sa->progress == sb->progress)
+		return 0;
+	else
+		return -1;
+}
diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c
index bc2c773..18e4397 100644
--- a/src/backend/storage/buffer/freelist.c
+++ b/src/backend/storage/buffer/freelist.c
@@ -358,10 +358,10 @@ StrategyFreeBuffer(volatile BufferDesc *buf)
 }
 
 /*
- * StrategySyncStart -- tell BufferSync where to start syncing
+ * StrategySyncStart -- tell BgBufferSync where to start syncing
  *
- * The result is the buffer index of the best buffer to sync first.
- * BufferSync() will proceed circularly around the buffer array from there.
+ * The result is the buffer index below the current clock-hand. BgBufferSync()
+ * will proceed circularly around the buffer array from there.
  *
  * In addition, we return the completed-pass count (which is effectively
  * the higher-order bits of nextVictimBuffer) and the count of recent buffer
diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c
index 3144afe..c508fc6 100644
--- a/src/backend/storage/buffer/localbuf.c
+++ b/src/backend/storage/buffer/localbuf.c
@@ -208,7 +208,8 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
 				  bufHdr->tag.forkNum,
 				  bufHdr->tag.blockNum,
 				  localpage,
-				  false);
+				  false,
+				  NULL);
 
 		/* Mark not-dirty now in case we error out below */
 		bufHdr->flags &= ~BM_DIRTY;
diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c
index ea4d689..f2913df 100644
--- a/src/backend/storage/file/buffile.c
+++ b/src/backend/storage/file/buffile.c
@@ -317,7 +317,8 @@ BufFileDumpBuffer(BufFile *file)
 				return;			/* seek failed, give up */
 			file->offsets[file->curFile] = file->curOffset;
 		}
-		bytestowrite = FileWrite(thisfile, file->buffer + wpos, bytestowrite);
+		bytestowrite = FileWrite(thisfile, file->buffer + wpos, bytestowrite,
+								 NULL);
 		if (bytestowrite <= 0)
 			return;				/* failed to write */
 		file->offsets[file->curFile] += bytestowrite;
diff --git a/src/backend/storage/file/copydir.c b/src/backend/storage/file/copydir.c
index 41b2c62..81c9754 100644
--- a/src/backend/storage/file/copydir.c
+++ b/src/backend/storage/file/copydir.c
@@ -190,9 +190,9 @@ copy_file(char *fromfile, char *tofile)
 		/*
 		 * We fsync the files later but first flush them to avoid spamming the
 		 * cache and hopefully get the kernel to start writing them out before
-		 * the fsync comes.  Ignore any error, since it's only a hint.
+		 * the fsync comes.
 		 */
-		(void) pg_flush_data(dstfd, offset, nbytes);
+		pg_flush_data(dstfd, offset, nbytes);
 	}
 
 	if (CloseTransientFile(dstfd))
diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c
index 1ba4946..2974c2b 100644
--- a/src/backend/storage/file/fd.c
+++ b/src/backend/storage/file/fd.c
@@ -61,6 +61,9 @@
 #include <sys/file.h>
 #include <sys/param.h>
 #include <sys/stat.h>
+#ifndef WIN32
+#include <sys/mman.h>
+#endif
 #include <unistd.h>
 #include <fcntl.h>
 #ifdef HAVE_SYS_RESOURCE_H
@@ -82,6 +85,8 @@
 /* Define PG_FLUSH_DATA_WORKS if we have an implementation for pg_flush_data */
 #if defined(HAVE_SYNC_FILE_RANGE)
 #define PG_FLUSH_DATA_WORKS 1
+#elif !defined(WIN32) && defined(MS_ASYNC)
+#define PG_FLUSH_DATA_WORKS 1
 #elif defined(USE_POSIX_FADVISE) && defined(POSIX_FADV_DONTNEED)
 #define PG_FLUSH_DATA_WORKS 1
 #endif
@@ -380,29 +385,128 @@ pg_fdatasync(int fd)
 }
 
 /*
- * pg_flush_data --- advise OS that the data described won't be needed soon
+ * pg_flush_data --- advise OS that the described dirty data should be flushed
  *
- * Not all platforms have sync_file_range or posix_fadvise; treat as no-op
- * if not available.  Also, treat as no-op if enableFsync is off; this is
- * because the call isn't free, and some platforms such as Linux will actually
- * block the requestor until the write is scheduled.
+ * An offset of 0 with an amount of 0 means that the entire file should be
+ * flushed.
  */
-int
-pg_flush_data(int fd, off_t offset, off_t amount)
+void
+pg_flush_data(int fd, off_t offset, off_t nbytes)
 {
 #ifdef PG_FLUSH_DATA_WORKS
-	if (enableFsync)
-	{
+
+	/*
+	 * Right now file flushing is primarily used to avoid making later
+	 * fsync()/fdatasync() calls have a significant impact. Thus don't trigger
+	 * flushes if fsyncs are disabled - that's a decision we might want to
+	 * make configurable at some point.
+	 */
+	if (!enableFsync)
+		return;
+
 #if defined(HAVE_SYNC_FILE_RANGE)
-		return sync_file_range(fd, offset, amount, SYNC_FILE_RANGE_WRITE);
+	{
+		int			rc = 0;
+
+		/*
+		 * sync_file_range(2), currently linux specific, with
+		 * SYNC_FILE_RANGE_WRITE as a parameter tells the OS that writeback
+		 * for the passed in blocks should be started, but that we don't want
+		 * to wait for completion.  Note that this call might block if too
+		 * much dirty data exists in the range.  This is the preferrable
+		 * method on OSs supporting it, as it works reliably when available
+		 * (contrast to msync()) and doesn't flush out clean data (like
+		 * FADV_DONTNEED).
+		 */
+		rc = sync_file_range(fd, offset, nbytes,
+							 SYNC_FILE_RANGE_WRITE);
+
+		/* don't error out, this is just a performance optimization */
+		if (rc != 0)
+		{
+			ereport(WARNING,
+					(errcode_for_file_access(),
+					 errmsg("could not flush dirty data: %m")));
+		}
+	}
+#elif !defined(WIN32) && defined(MS_ASYNC)
+	{
+		int			rc = 0;
+		void	   *p;
+
+		/*
+		 * On many OSs msync() on a mmap'ed file triggers writeback. On linux
+		 * it only does so when MS_SYNC is specified, but then it does the
+		 * writeback in the foreground. Luckily all common linux systems have
+		 * sync_file_range().  This is preferrable over FADV_DONTNEED because
+		 * it doesn't flush out clean data.
+		 *
+		 * We map the file (mmap()), tell the kernel to sync back the contents
+		 * (msync()), and then remove the mapping again (munmap()).
+		 */
+
+		p = mmap(NULL, context->nbytes,
+				 PROT_READ | PROT_WRITE, MAP_SHARED,
+				 context->fd, context->offset);
+		if (p == MAP_FAILED)
+		{
+			ereport(WARNING,
+					(errcode_for_file_access(),
+					 errmsg("could not mmap while flushing dirty data in file \"%s\": %m",
+							context->filename ? context->filename : "")));
+			goto out;
+		}
+
+		rc = msync(p, context->nbytes, MS_ASYNC);
+		if (rc != 0)
+		{
+			ereport(WARNING,
+					(errcode_for_file_access(),
+					 errmsg("could not flush dirty data in file \"%s\": %m",
+							context->filename ? context->filename : "")));
+			/* NB: need to fall through to munmap()! */
+		}
+
+		rc = munmap(p, context->nbytes);
+		if (rc != 0)
+		{
+			/* FATAL error because mapping would remain */
+			ereport(FATAL,
+					(errcode_for_file_access(),
+					 errmsg("could not munmap while flushing blocks in file \"%s\": %m",
+							context->filename ? context->filename : "")));
+		}
+	}
 #elif defined(USE_POSIX_FADVISE) && defined(POSIX_FADV_DONTNEED)
-		return posix_fadvise(fd, offset, amount, POSIX_FADV_DONTNEED);
+	{
+		int			rc = 0;
+
+		/*
+		 * Signal the kernel that the passed in range should not be cached
+		 * anymore. This has the, desired, side effect of writing out dirty
+		 * data, and the, undesired, side effect of likely discarding useful
+		 * clean cached blocks.  For the latter reason this is the least
+		 * preferrable method.
+		 */
+
+		rc = posix_fadvise(context->fd, context->offset, context->nbytes,
+						   POSIX_FADV_DONTNEED);
+
+		/* don't error out, this is just a performance optimization */
+		if (rc != 0)
+		{
+			ereport(WARNING,
+					(errcode_for_file_access(),
+					 errmsg("could not flush dirty data in file \"%s\": %m",
+							context->filename ? context->filename : "")));
+			goto out;
+		}
+	}
 #else
 #error PG_FLUSH_DATA_WORKS should not have been defined
 #endif
-	}
-#endif
-	return 0;
+
+#endif /* PG_FLUSH_DATA_WORKS */
 }
 
 
@@ -1345,7 +1449,8 @@ retry:
 }
 
 int
-FileWrite(File file, char *buffer, int amount)
+FileWrite(File file, char *buffer, int amount,
+		  FileFlushContext *flush_context)
 {
 	int			returnCode;
 
@@ -1408,6 +1513,11 @@ retry:
 				VfdCache[file].fileSize = newPos;
 			}
 		}
+
+		/* update bulk flush state */
+		if (flush_context != NULL)
+			FlushContextSchedule(flush_context, file,
+								 VfdCache[file].seekPos, amount);
 	}
 	else
 	{
@@ -1579,6 +1689,103 @@ FilePathName(File file)
 
 
 /*
+ * Initialize a FileFlushContext, discarding potential previous state in
+ * context.
+ *
+ * max_coalesce is the maximum number of flush requests that will be coalesced
+ * into a bigger one. 0 meaning there is no limit.
+ */
+void
+FlushContextInit(FileFlushContext *context, int max_coalesce)
+{
+	context->max_coalesce = max_coalesce;
+	context->file = -1;
+	context->ncalls = 0;
+	context->offset = 0;
+	context->nbytes = 0;
+}
+
+/*
+ * Schedule writeout of a range of bytes in a file.
+ *
+ * filename is just used for error reporting, and may be NULL.
+ */
+void
+FlushContextSchedule(FileFlushContext *context,
+					 File file, off_t offset, off_t nbytes)
+{
+	/*
+	 * If the new range of blocks is in the same file as a previous request
+	 * try to coalesce with previous requests. That increases the chance that
+	 * these writeouts can be coalesced in the OSs IO layer and decreases the
+	 * number of syscalls.  If there are a lot of outstanding flush requests,
+	 * immediately trigger writeout of previously blocks to avoid overflowing
+	 * request queues and thelike, thereby causing latency spikes.
+	 */
+	if (context->file == file && context->ncalls != 0)
+	{
+		int64	startoff;
+		int64	endoff;
+
+		/* merge current flush with previous ones */
+		startoff = Min(context->offset, offset);
+		endoff = Max(context->offset + context->nbytes, offset + nbytes);
+
+		context->offset = startoff;
+		context->nbytes = endoff - startoff;
+		context->ncalls++;
+
+		/*
+		 * Accumulated enough dirty ranges - flush now. XXX: It might be
+		 * worthwhile to count actual bytes that we've been asked to flush,
+		 * and to have additional limits; but that's for another day.
+		 */
+		if (context->max_coalesce > 0 &&
+			context->ncalls >= context->max_coalesce)
+			FlushContextIssuePending(context);
+	}
+	else
+	{
+		/* flush previous file & reset flush accumulator */
+		FlushContextIssuePending(context);
+
+		context->file = file;
+		context->ncalls = 1;
+		context->offset = offset;
+		context->nbytes = nbytes;
+	}
+}
+
+/*
+ * Issue all pending flush requests previously scheduled with
+ * FlushContextSchedule to the OS.
+ *
+ * Because this is, currently, only used to improve the OSs IO scheduling we
+ * try hard to never error out - it's just a hint.
+ */
+void
+FlushContextIssuePending(FileFlushContext *context)
+{
+	int			rc;
+
+	if (context->ncalls == 0)
+		return;
+
+	rc = FileAccess(context->file);
+	if (rc < 0)
+		return;
+
+	pg_flush_data(VfdCache[context->file].fd,
+				  context->offset, context->nbytes);
+
+	context->file = -1;
+	context->ncalls = 0;
+	context->offset = 0;
+	context->nbytes = 0;
+}
+
+
+/*
  * Make room for another allocatedDescs[] array entry if needed and possible.
  * Returns true if an array element is available.
  */
@@ -2655,9 +2862,10 @@ pre_sync_fname(const char *fname, bool isdir, int elevel)
 	}
 
 	/*
-	 * We ignore errors from pg_flush_data() because this is only a hint.
+	 * pg_flush_data() ignores errors, which is ok because this is only a
+	 * hint.
 	 */
-	(void) pg_flush_data(fd, 0, 0);
+	pg_flush_data(fd, 0, 0);
 
 	(void) CloseTransientFile(fd);
 }
diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c
index 42a43bb..eeaac07 100644
--- a/src/backend/storage/smgr/md.c
+++ b/src/backend/storage/smgr/md.c
@@ -531,7 +531,7 @@ mdextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 				 errmsg("could not seek to block %u in file \"%s\": %m",
 						blocknum, FilePathName(v->mdfd_vfd))));
 
-	if ((nbytes = FileWrite(v->mdfd_vfd, buffer, BLCKSZ)) != BLCKSZ)
+	if ((nbytes = FileWrite(v->mdfd_vfd, buffer, BLCKSZ, NULL)) != BLCKSZ)
 	{
 		if (nbytes < 0)
 			ereport(ERROR,
@@ -738,7 +738,7 @@ mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
  */
 void
 mdwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
-		char *buffer, bool skipFsync)
+		char *buffer, bool skipFsync, FileFlushContext *flush_context)
 {
 	off_t		seekpos;
 	int			nbytes;
@@ -767,7 +767,7 @@ mdwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 				 errmsg("could not seek to block %u in file \"%s\": %m",
 						blocknum, FilePathName(v->mdfd_vfd))));
 
-	nbytes = FileWrite(v->mdfd_vfd, buffer, BLCKSZ);
+	nbytes = FileWrite(v->mdfd_vfd, buffer, BLCKSZ, flush_context);
 
 	TRACE_POSTGRESQL_SMGR_MD_WRITE_DONE(forknum, blocknum,
 										reln->smgr_rnode.node.spcNode,
diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c
index 244b4ea..31c15a6 100644
--- a/src/backend/storage/smgr/smgr.c
+++ b/src/backend/storage/smgr/smgr.c
@@ -52,7 +52,8 @@ typedef struct f_smgr
 	void		(*smgr_read) (SMgrRelation reln, ForkNumber forknum,
 										  BlockNumber blocknum, char *buffer);
 	void		(*smgr_write) (SMgrRelation reln, ForkNumber forknum,
-						 BlockNumber blocknum, char *buffer, bool skipFsync);
+						  BlockNumber blocknum, char *buffer, bool skipFsync,
+										   FileFlushContext *flush_context);
 	BlockNumber (*smgr_nblocks) (SMgrRelation reln, ForkNumber forknum);
 	void		(*smgr_truncate) (SMgrRelation reln, ForkNumber forknum,
 											  BlockNumber nblocks);
@@ -643,10 +644,11 @@ smgrread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
  */
 void
 smgrwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
-		  char *buffer, bool skipFsync)
+		  char *buffer, bool skipFsync, FileFlushContext *flush_context)
 {
 	(*(smgrsw[reln->smgr_which].smgr_write)) (reln, forknum, blocknum,
-											  buffer, skipFsync);
+											  buffer, skipFsync,
+											  flush_context);
 }
 
 /*
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index fda0fb9..b72f782 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1004,6 +1004,18 @@ static struct config_bool ConfigureNamesBool[] =
 		false,
 		NULL, NULL, NULL
 	},
+
+	{
+		{"checkpoint_flush_to_disk", PGC_SIGHUP, WAL_CHECKPOINTS,
+			gettext_noop("Hint that checkpoint's writes are high priority."),
+			NULL
+		},
+		&checkpoint_flush_to_disk,
+		/* see bufmgr.h: true on Linux, false otherwise */
+		DEFAULT_CHECKPOINT_FLUSH_TO_DISK,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"log_connections", PGC_SU_BACKEND, LOGGING_WHAT,
 			gettext_noop("Logs each successful connection."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index dcf929f..20726dc 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -202,6 +202,8 @@
 #max_wal_size = 1GB
 #min_wal_size = 80MB
 #checkpoint_completion_target = 0.5	# checkpoint target duration, 0.0 - 1.0
+#checkpoint_flush_to_disk = ?		# send buffers to disk on checkpoint
+					# default is on if Linux, off otherwise
 #checkpoint_warning = 30s		# 0 disables
 
 # - Archiving -
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 790ca66..11815a8 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -186,6 +186,8 @@ extern bool XLOG_DEBUG;
 typedef struct CheckpointStatsData
 {
 	TimestampTz ckpt_start_t;	/* start of checkpoint */
+	TimestampTz ckpt_sort_t;    /* start buffer sorting */
+	TimestampTz ckpt_sort_end_t;      /* end of sorting */
 	TimestampTz ckpt_write_t;	/* start of flushing buffers */
 	TimestampTz ckpt_sync_t;	/* start of fsyncs */
 	TimestampTz ckpt_sync_end_t;	/* end of fsyncs */
diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index 521ee1c..1628154 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -210,6 +210,24 @@ extern PGDLLIMPORT BufferDescPadded *BufferDescriptors;
 /* in localbuf.c */
 extern BufferDesc *LocalBufferDescriptors;
 
+/* in bufmgr.c */
+
+/*
+ * Structure to sort buffers per file on checkpoints.
+ *
+ * This structure is allocated per buffer in shared memory, so it should be
+ * kept as small as possible.
+ */
+typedef struct CkptSortItem
+{
+	Oid			tsId;
+	Oid			relNode;
+	ForkNumber	forkNum;
+	BlockNumber blockNum;
+	int			buf_id;
+}	CkptSortItem;
+
+extern CkptSortItem *CkptBufferIds;
 
 /*
  * Internal routines: only called by bufmgr
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 0f59201..28a3deb 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -55,6 +55,14 @@ extern double bgwriter_lru_multiplier;
 extern bool track_io_timing;
 extern int	target_prefetch_pages;
 
+#ifdef HAVE_SYNC_FILE_RANGE
+#define DEFAULT_CHECKPOINT_FLUSH_TO_DISK true
+#else
+#define DEFAULT_CHECKPOINT_FLUSH_TO_DISK false
+#endif   /* HAVE_SYNC_FILE_RANGE */
+
+extern bool checkpoint_flush_to_disk;
+
 /* in buf_init.c */
 extern PGDLLIMPORT char *BufferBlocks;
 
diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h
index 7eabe09..a05500a 100644
--- a/src/include/storage/fd.h
+++ b/src/include/storage/fd.h
@@ -59,6 +59,34 @@ extern int	max_files_per_process;
  */
 extern int	max_safe_fds;
 
+/*
+ * FlushContext structure - This is used to accumulate several flush requests
+ * made by one callsite into a larger flush request.
+ */
+typedef struct FileFlushContext
+{
+	/* max number of flush requests to coalesce */
+	int			max_coalesce;
+	/* VFD of the last file processed or -1 */
+	File		file;
+	/* number of flush requests merged together */
+	int			ncalls;
+	/* offset to start flushing (minimum of all offsets) */
+	int64		offset;
+
+	/*
+	 * Size (minimum extent to cover all flushed data). If 0 byt ncalls > 0,
+	 * the whole file should be flushed.
+	 */
+	int64		nbytes;
+}	FileFlushContext;
+
+/*
+ * By default coalesce up to 32 flush requests to the same file. As flush
+ * requests usually are BLCKSZ large, that amounts to about the size of common
+ * IO request queues.
+ */
+#define FLUSH_CONTEXT_DEFAULT_MAX_COALESCE 64
 
 /*
  * prototypes for functions in fd.c
@@ -70,11 +98,16 @@ extern File OpenTemporaryFile(bool interXact);
 extern void FileClose(File file);
 extern int	FilePrefetch(File file, off_t offset, int amount);
 extern int	FileRead(File file, char *buffer, int amount);
-extern int	FileWrite(File file, char *buffer, int amount);
+extern int FileWrite(File file, char *buffer, int amount,
+		  FileFlushContext * flush_context);
 extern int	FileSync(File file);
 extern off_t FileSeek(File file, off_t offset, int whence);
 extern int	FileTruncate(File file, off_t offset);
 extern char *FilePathName(File file);
+extern void FlushContextInit(FileFlushContext *context, int max_coalesce);
+extern void FlushContextIssuePending(FileFlushContext *context);
+extern void FlushContextSchedule(FileFlushContext *context, File file,
+								 off_t offset, off_t nbytes);
 
 /* Operations that allow use of regular stdio --- USE WITH CAUTION */
 extern FILE *AllocateFile(const char *name, const char *mode);
@@ -112,7 +145,7 @@ extern int	pg_fsync(int fd);
 extern int	pg_fsync_no_writethrough(int fd);
 extern int	pg_fsync_writethrough(int fd);
 extern int	pg_fdatasync(int fd);
-extern int	pg_flush_data(int fd, off_t offset, off_t amount);
+extern void	pg_flush_data(int fd, off_t offset, off_t amount);
 extern void fsync_fname(char *fname, bool isdir);
 extern void SyncDataDirectory(void);
 
diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h
index 69a624f..e95b859 100644
--- a/src/include/storage/smgr.h
+++ b/src/include/storage/smgr.h
@@ -16,6 +16,7 @@
 
 #include "fmgr.h"
 #include "storage/block.h"
+#include "storage/fd.h"
 #include "storage/relfilenode.h"
 
 
@@ -95,7 +96,8 @@ extern void smgrprefetch(SMgrRelation reln, ForkNumber forknum,
 extern void smgrread(SMgrRelation reln, ForkNumber forknum,
 		 BlockNumber blocknum, char *buffer);
 extern void smgrwrite(SMgrRelation reln, ForkNumber forknum,
-		  BlockNumber blocknum, char *buffer, bool skipFsync);
+		  BlockNumber blocknum, char *buffer, bool skipFsync,
+		  FileFlushContext *flush_context);
 extern BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum);
 extern void smgrtruncate(SMgrRelation reln, ForkNumber forknum,
 			 BlockNumber nblocks);
@@ -121,7 +123,8 @@ extern void mdprefetch(SMgrRelation reln, ForkNumber forknum,
 extern void mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 	   char *buffer);
 extern void mdwrite(SMgrRelation reln, ForkNumber forknum,
-		BlockNumber blocknum, char *buffer, bool skipFsync);
+		BlockNumber blocknum, char *buffer, bool skipFsync,
+		FileFlushContext *flush_context);
 extern BlockNumber mdnblocks(SMgrRelation reln, ForkNumber forknum);
 extern void mdtruncate(SMgrRelation reln, ForkNumber forknum,
 		   BlockNumber nblocks);
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 03e1d2c..2f00050 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -576,6 +576,7 @@ FileNameMap
 FindSplitData
 FixedParallelState
 FixedParamState
+FileFlushContext
 FmgrBuiltin
 FmgrHookEventType
 FmgrInfo
-- 
2.6.0.rc3

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to