From 87d430aa6cc2e86e263bf45bf305158ec8caa459 Mon Sep 17 00:00:00 2001
From: Amit Kapila <akapila@postgresql.org>
Date: Fri, 3 Apr 2020 18:21:05 +0530
Subject: [PATCH v14 1/4] Add infrastructure to track WAL usage.

This allows gathering the WAL generation statistics for each statement
execution.  The three statistics that we collect are number of WAL records,
the number of full page writes and the amount of WAL bytes generated.

This helps the users who have write-intensive workload to see the impact
of I/O due to WAL.  This further enables us to see approximately what
percentage of overall WAL is due to full page writes.

In future, we can extend this functionality to allow us to compute the
exact amount of WAL data due to full page writes.

This patch in itself is just an infrastructure to compute WAL usage data.
The upcoming patches will expose this data via explain, auto_explain,
pg_stat_statements and verbose (auto)vacuum output.

Author: Kirill Bychik, Julien Rouhaud
Reviewed-by: Dilip Kumar, Fujii Masao and Amit Kapila
Discussion: https://postgr.es/m/CAB-hujrP8ZfUkvL5OYETipQwA=e3n7oqHFU=4ZLxWS_Cza3kQQ@mail.gmail.com
---
 src/backend/access/heap/vacuumlazy.c    | 37 +++++++++++++++++------
 src/backend/access/nbtree/nbtsort.c     | 40 +++++++++++++++++++++++++
 src/backend/access/transam/xlog.c       | 12 +++++++-
 src/backend/access/transam/xloginsert.c | 13 +++++---
 src/backend/executor/execParallel.c     | 36 ++++++++++++++++------
 src/backend/executor/instrument.c       | 53 +++++++++++++++++++++++++++++----
 src/include/access/xlog.h               |  3 +-
 src/include/executor/execParallel.h     |  1 +
 src/include/executor/instrument.h       | 18 +++++++++--
 src/tools/pgindent/typedefs.list        |  1 +
 10 files changed, 182 insertions(+), 32 deletions(-)

diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 9f9596c..3ca7f5d 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -139,6 +139,7 @@
 #define PARALLEL_VACUUM_KEY_DEAD_TUPLES		2
 #define PARALLEL_VACUUM_KEY_QUERY_TEXT		3
 #define PARALLEL_VACUUM_KEY_BUFFER_USAGE	4
+#define PARALLEL_VACUUM_KEY_WAL_USAGE		5
 
 /*
  * Macro to check if we are in a parallel vacuum.  If true, we are in the
@@ -275,6 +276,9 @@ typedef struct LVParallelState
 	/* Points to buffer usage area in DSM */
 	BufferUsage *buffer_usage;
 
+	/* Points to WAL usage area in DSM */
+	WalUsage   *wal_usage;
+
 	/*
 	 * The number of indexes that support parallel index bulk-deletion and
 	 * parallel index cleanup respectively.
@@ -2143,8 +2147,8 @@ lazy_parallel_vacuum_indexes(Relation *Irel, IndexBulkDeleteResult **stats,
 						  vacrelstats->dead_tuples, nindexes, vacrelstats);
 
 	/*
-	 * Next, accumulate buffer usage.  (This must wait for the workers to
-	 * finish, or we might get incomplete data.)
+	 * Next, accumulate buffer and WAL usage.  (This must wait for the workers
+	 * to finish, or we might get incomplete data.)
 	 */
 	if (nworkers > 0)
 	{
@@ -2154,7 +2158,7 @@ lazy_parallel_vacuum_indexes(Relation *Irel, IndexBulkDeleteResult **stats,
 		WaitForParallelWorkersToFinish(lps->pcxt);
 
 		for (i = 0; i < lps->pcxt->nworkers_launched; i++)
-			InstrAccumParallelQuery(&lps->buffer_usage[i]);
+			InstrAccumParallelQuery(&lps->buffer_usage[i], &lps->wal_usage[i]);
 	}
 
 	/*
@@ -3171,6 +3175,7 @@ begin_parallel_vacuum(Oid relid, Relation *Irel, LVRelStats *vacrelstats,
 	LVShared   *shared;
 	LVDeadTuples *dead_tuples;
 	BufferUsage *buffer_usage;
+	WalUsage   *wal_usage;
 	bool	   *can_parallel_vacuum;
 	long		maxtuples;
 	char	   *sharedquery;
@@ -3255,15 +3260,19 @@ begin_parallel_vacuum(Oid relid, Relation *Irel, LVRelStats *vacrelstats,
 	shm_toc_estimate_keys(&pcxt->estimator, 1);
 
 	/*
-	 * Estimate space for BufferUsage -- PARALLEL_VACUUM_KEY_BUFFER_USAGE.
+	 * Estimate space for BufferUsage and WalUsage --
+	 * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE.
 	 *
 	 * If there are no extensions loaded that care, we could skip this.  We
-	 * have no way of knowing whether anyone's looking at pgBufferUsage, so do
-	 * it unconditionally.
+	 * have no way of knowing whether anyone's looking at pgBufferUsage or
+	 * pgWalUsage, so do it unconditionally.
 	 */
 	shm_toc_estimate_chunk(&pcxt->estimator,
 						   mul_size(sizeof(BufferUsage), pcxt->nworkers));
 	shm_toc_estimate_keys(&pcxt->estimator, 1);
+	shm_toc_estimate_chunk(&pcxt->estimator,
+						   mul_size(sizeof(WalUsage), pcxt->nworkers));
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
 
 	/* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */
 	querylen = strlen(debug_query_string);
@@ -3299,11 +3308,18 @@ begin_parallel_vacuum(Oid relid, Relation *Irel, LVRelStats *vacrelstats,
 	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_DEAD_TUPLES, dead_tuples);
 	vacrelstats->dead_tuples = dead_tuples;
 
-	/* Allocate space for each worker's BufferUsage; no need to initialize */
+	/*
+	 * Allocate space for each worker's BufferUsage and WalUsage; no need to
+	 * initialize
+	 */
 	buffer_usage = shm_toc_allocate(pcxt->toc,
 									mul_size(sizeof(BufferUsage), pcxt->nworkers));
 	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, buffer_usage);
 	lps->buffer_usage = buffer_usage;
+	wal_usage = shm_toc_allocate(pcxt->toc,
+								 mul_size(sizeof(WalUsage), pcxt->nworkers));
+	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_WAL_USAGE, wal_usage);
+	lps->wal_usage = wal_usage;
 
 	/* Store query string for workers */
 	sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
@@ -3435,6 +3451,7 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
 	LVShared   *lvshared;
 	LVDeadTuples *dead_tuples;
 	BufferUsage *buffer_usage;
+	WalUsage   *wal_usage;
 	int			nindexes;
 	char	   *sharedquery;
 	IndexBulkDeleteResult **stats;
@@ -3511,9 +3528,11 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
 	parallel_vacuum_index(indrels, stats, lvshared, dead_tuples, nindexes,
 						  &vacrelstats);
 
-	/* Report buffer usage during parallel execution */
+	/* Report buffer/WAL usage during parallel execution */
 	buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
-	InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber]);
+	wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false);
+	InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
+						  &wal_usage[ParallelWorkerNumber]);
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index 3924945..4a85865 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -67,6 +67,7 @@
 #include "access/xloginsert.h"
 #include "catalog/index.h"
 #include "commands/progress.h"
+#include "executor/instrument.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "storage/smgr.h"
@@ -81,6 +82,7 @@
 #define PARALLEL_KEY_TUPLESORT			UINT64CONST(0xA000000000000002)
 #define PARALLEL_KEY_TUPLESORT_SPOOL2	UINT64CONST(0xA000000000000003)
 #define PARALLEL_KEY_QUERY_TEXT			UINT64CONST(0xA000000000000004)
+#define PARALLEL_KEY_WAL_USAGE			UINT64CONST(0xA000000000000005)
 
 /*
  * DISABLE_LEADER_PARTICIPATION disables the leader's participation in
@@ -203,6 +205,7 @@ typedef struct BTLeader
 	Sharedsort *sharedsort;
 	Sharedsort *sharedsort2;
 	Snapshot	snapshot;
+	WalUsage   *walusage;
 } BTLeader;
 
 /*
@@ -1476,6 +1479,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
 	Sharedsort *sharedsort2;
 	BTSpool    *btspool = buildstate->spool;
 	BTLeader   *btleader = (BTLeader *) palloc0(sizeof(BTLeader));
+	WalUsage   *walusage;
 	bool		leaderparticipates = true;
 	char	   *sharedquery;
 	int			querylen;
@@ -1528,6 +1532,18 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
 		shm_toc_estimate_keys(&pcxt->estimator, 3);
 	}
 
+	/*
+	 * Estimate space for WalUsage -- PARALLEL_KEY_WAL_USAGE
+	 *
+	 * WalUsage during execution of maintenance command can be used by an
+	 * extension that reports the WAL usage, such as pg_stat_statements. We
+	 * have no way of knowing whether anyone's looking at pgWalUsage, so do it
+	 * unconditionally.
+	 */
+	shm_toc_estimate_chunk(&pcxt->estimator,
+						   mul_size(sizeof(WalUsage), pcxt->nworkers));
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+
 	/* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */
 	querylen = strlen(debug_query_string);
 	shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
@@ -1599,6 +1615,11 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
 	memcpy(sharedquery, debug_query_string, querylen + 1);
 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery);
 
+	/* Allocate space for each worker's WalUsage; no need to initialize */
+	walusage = shm_toc_allocate(pcxt->toc,
+								mul_size(sizeof(WalUsage), pcxt->nworkers));
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage);
+
 	/* Launch workers, saving status for leader/caller */
 	LaunchParallelWorkers(pcxt);
 	btleader->pcxt = pcxt;
@@ -1609,6 +1630,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
 	btleader->sharedsort = sharedsort;
 	btleader->sharedsort2 = sharedsort2;
 	btleader->snapshot = snapshot;
+	btleader->walusage = walusage;
 
 	/* If no workers were successfully launched, back out (do serial build) */
 	if (pcxt->nworkers_launched == 0)
@@ -1637,8 +1659,18 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
 static void
 _bt_end_parallel(BTLeader *btleader)
 {
+	int			i;
+
 	/* Shutdown worker processes */
 	WaitForParallelWorkersToFinish(btleader->pcxt);
+
+	/*
+	 * Next, accumulate WAL usage.  (This must wait for the workers to finish,
+	 * or we might get incomplete data.)
+	 */
+	for (i = 0; i < btleader->pcxt->nworkers_launched; i++)
+		InstrAccumParallelQuery(NULL, &btleader->walusage[i]);
+
 	/* Free last reference to MVCC snapshot, if one was used */
 	if (IsMVCCSnapshot(btleader->snapshot))
 		UnregisterSnapshot(btleader->snapshot);
@@ -1769,6 +1801,7 @@ _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc)
 	Relation	indexRel;
 	LOCKMODE	heapLockmode;
 	LOCKMODE	indexLockmode;
+	WalUsage   *walusage;
 	int			sortmem;
 
 #ifdef BTREE_BUILD_STATS
@@ -1830,11 +1863,18 @@ _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc)
 		tuplesort_attach_shared(sharedsort2, seg);
 	}
 
+	/* Prepare to track buffer usage during parallel execution */
+	InstrStartParallelQuery();
+
 	/* Perform sorting of spool, and possibly a spool2 */
 	sortmem = maintenance_work_mem / btshared->scantuplesortstates;
 	_bt_parallel_scan_and_sort(btspool, btspool2, btshared, sharedsort,
 							   sharedsort2, sortmem, false);
 
+	/* Report WAL usage during parallel execution */
+	walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
+	InstrEndParallelQuery(NULL, &walusage[ParallelWorkerNumber]);
+
 #ifdef BTREE_BUILD_STATS
 	if (log_btree_build_stats)
 	{
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 977d448..50b78f3 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -43,6 +43,7 @@
 #include "commands/progress.h"
 #include "commands/tablespace.h"
 #include "common/controldata_utils.h"
+#include "executor/instrument.h"
 #include "miscadmin.h"
 #include "pg_trace.h"
 #include "pgstat.h"
@@ -996,7 +997,8 @@ static void WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt);
 XLogRecPtr
 XLogInsertRecord(XLogRecData *rdata,
 				 XLogRecPtr fpw_lsn,
-				 uint8 flags)
+				 uint8 flags,
+				 int num_fpw)
 {
 	XLogCtlInsert *Insert = &XLogCtl->Insert;
 	pg_crc32c	rdata_crc;
@@ -1252,6 +1254,14 @@ XLogInsertRecord(XLogRecData *rdata,
 	ProcLastRecPtr = StartPos;
 	XactLastRecEnd = EndPos;
 
+	/* Report WAL traffic to the instrumentation. */
+	if (inserted)
+	{
+		pgWalUsage.wal_bytes += rechdr->xl_tot_len;
+		pgWalUsage.wal_records++;
+		pgWalUsage.wal_num_fpw += num_fpw;
+	}
+
 	return EndPos;
 }
 
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index a618dec..5e032e7 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -25,6 +25,7 @@
 #include "access/xloginsert.h"
 #include "catalog/pg_control.h"
 #include "common/pg_lzcompress.h"
+#include "executor/instrument.h"
 #include "miscadmin.h"
 #include "pg_trace.h"
 #include "replication/origin.h"
@@ -108,7 +109,7 @@ static MemoryContext xloginsert_cxt;
 
 static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info,
 									   XLogRecPtr RedoRecPtr, bool doPageWrites,
-									   XLogRecPtr *fpw_lsn);
+									   XLogRecPtr *fpw_lsn, int *num_fpw);
 static bool XLogCompressBackupBlock(char *page, uint16 hole_offset,
 									uint16 hole_length, char *dest, uint16 *dlen);
 
@@ -448,6 +449,7 @@ XLogInsert(RmgrId rmid, uint8 info)
 		bool		doPageWrites;
 		XLogRecPtr	fpw_lsn;
 		XLogRecData *rdt;
+		int			num_fpw = 0;
 
 		/*
 		 * Get values needed to decide whether to do full-page writes. Since
@@ -457,9 +459,9 @@ XLogInsert(RmgrId rmid, uint8 info)
 		GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
 
 		rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites,
-								 &fpw_lsn);
+								 &fpw_lsn, &num_fpw);
 
-		EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags);
+		EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags, num_fpw);
 	} while (EndPos == InvalidXLogRecPtr);
 
 	XLogResetInsertion();
@@ -482,7 +484,7 @@ XLogInsert(RmgrId rmid, uint8 info)
 static XLogRecData *
 XLogRecordAssemble(RmgrId rmid, uint8 info,
 				   XLogRecPtr RedoRecPtr, bool doPageWrites,
-				   XLogRecPtr *fpw_lsn)
+				   XLogRecPtr *fpw_lsn, int *num_fpw)
 {
 	XLogRecData *rdt;
 	uint32		total_len = 0;
@@ -635,6 +637,9 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 			 */
 			bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE;
 
+			/* Report a full page image constructed for the WAL record */
+			*num_fpw += 1;
+
 			/*
 			 * Construct XLogRecData entries for the page content.
 			 */
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index a753d6e..b7d0719 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -12,7 +12,7 @@
  * workers and ensuring that their state generally matches that of the
  * leader; see src/backend/access/transam/README.parallel for details.
  * However, we must save and restore relevant executor state, such as
- * any ParamListInfo associated with the query, buffer usage info, and
+ * any ParamListInfo associated with the query, buffer/WAL usage info, and
  * the actual plan to be passed down to the worker.
  *
  * IDENTIFICATION
@@ -62,6 +62,7 @@
 #define PARALLEL_KEY_DSA				UINT64CONST(0xE000000000000007)
 #define PARALLEL_KEY_QUERY_TEXT		UINT64CONST(0xE000000000000008)
 #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
+#define PARALLEL_KEY_WAL_USAGE			UINT64CONST(0xE00000000000000A)
 
 #define PARALLEL_TUPLE_QUEUE_SIZE		65536
 
@@ -573,6 +574,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
 	char	   *pstmt_space;
 	char	   *paramlistinfo_space;
 	BufferUsage *bufusage_space;
+	WalUsage   *walusage_space;
 	SharedExecutorInstrumentation *instrumentation = NULL;
 	SharedJitInstrumentation *jit_instrumentation = NULL;
 	int			pstmt_len;
@@ -646,6 +648,13 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
 						   mul_size(sizeof(BufferUsage), pcxt->nworkers));
 	shm_toc_estimate_keys(&pcxt->estimator, 1);
 
+	/*
+	 * Same thing for WalUsage.
+	 */
+	shm_toc_estimate_chunk(&pcxt->estimator,
+						   mul_size(sizeof(WalUsage), pcxt->nworkers));
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+
 	/* Estimate space for tuple queues. */
 	shm_toc_estimate_chunk(&pcxt->estimator,
 						   mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
@@ -728,6 +737,12 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
 	pei->buffer_usage = bufusage_space;
 
+	/* Same for WalUsage. */
+	walusage_space = shm_toc_allocate(pcxt->toc,
+									  mul_size(sizeof(WalUsage), pcxt->nworkers));
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space);
+	pei->wal_usage = walusage_space;
+
 	/* Set up the tuple queues that the workers will write into. */
 	pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
 
@@ -1069,7 +1084,7 @@ ExecParallelRetrieveJitInstrumentation(PlanState *planstate,
 
 /*
  * Finish parallel execution.  We wait for parallel workers to finish, and
- * accumulate their buffer usage.
+ * accumulate their buffer/WAL usage.
  */
 void
 ExecParallelFinish(ParallelExecutorInfo *pei)
@@ -1109,11 +1124,11 @@ ExecParallelFinish(ParallelExecutorInfo *pei)
 	WaitForParallelWorkersToFinish(pei->pcxt);
 
 	/*
-	 * Next, accumulate buffer usage.  (This must wait for the workers to
+	 * Next, accumulate buffer/WAL usage.  (This must wait for the workers to
 	 * finish, or we might get incomplete data.)
 	 */
 	for (i = 0; i < nworkers; i++)
-		InstrAccumParallelQuery(&pei->buffer_usage[i]);
+		InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]);
 
 	pei->finished = true;
 }
@@ -1333,6 +1348,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 {
 	FixedParallelExecutorState *fpes;
 	BufferUsage *buffer_usage;
+	WalUsage   *wal_usage;
 	DestReceiver *receiver;
 	QueryDesc  *queryDesc;
 	SharedExecutorInstrumentation *instrumentation;
@@ -1386,11 +1402,11 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 	ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
 
 	/*
-	 * Prepare to track buffer usage during query execution.
+	 * Prepare to track buffer/WAL usage during query execution.
 	 *
 	 * We do this after starting up the executor to match what happens in the
-	 * leader, which also doesn't count buffer accesses that occur during
-	 * executor startup.
+	 * leader, which also doesn't count buffer accesses and WAL activity that
+	 * occur during executor startup.
 	 */
 	InstrStartParallelQuery();
 
@@ -1406,9 +1422,11 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 	/* Shut down the executor */
 	ExecutorFinish(queryDesc);
 
-	/* Report buffer usage during parallel execution. */
+	/* Report buffer/WAL usage during parallel execution. */
 	buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
-	InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber]);
+	wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
+	InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
+						  &wal_usage[ParallelWorkerNumber]);
 
 	/* Report instrumentation data if any instrumentation options are set. */
 	if (instrumentation != NULL)
diff --git a/src/backend/executor/instrument.c b/src/backend/executor/instrument.c
index 042e10f..74ee480 100644
--- a/src/backend/executor/instrument.c
+++ b/src/backend/executor/instrument.c
@@ -19,8 +19,11 @@
 
 BufferUsage pgBufferUsage;
 static BufferUsage save_pgBufferUsage;
+WalUsage	pgWalUsage;
+static WalUsage save_pgWalUsage;
 
 static void BufferUsageAdd(BufferUsage *dst, const BufferUsage *add);
+static void WalUsageAdd(WalUsage *dst, WalUsage *add);
 
 
 /* Allocate new instrumentation structure(s) */
@@ -31,15 +34,17 @@ InstrAlloc(int n, int instrument_options)
 
 	/* initialize all fields to zeroes, then modify as needed */
 	instr = palloc0(n * sizeof(Instrumentation));
-	if (instrument_options & (INSTRUMENT_BUFFERS | INSTRUMENT_TIMER))
+	if (instrument_options & (INSTRUMENT_BUFFERS | INSTRUMENT_TIMER | INSTRUMENT_WAL))
 	{
 		bool		need_buffers = (instrument_options & INSTRUMENT_BUFFERS) != 0;
+		bool		need_wal = (instrument_options & INSTRUMENT_WAL) != 0;
 		bool		need_timer = (instrument_options & INSTRUMENT_TIMER) != 0;
 		int			i;
 
 		for (i = 0; i < n; i++)
 		{
 			instr[i].need_bufusage = need_buffers;
+			instr[i].need_walusage = need_wal;
 			instr[i].need_timer = need_timer;
 		}
 	}
@@ -53,6 +58,7 @@ InstrInit(Instrumentation *instr, int instrument_options)
 {
 	memset(instr, 0, sizeof(Instrumentation));
 	instr->need_bufusage = (instrument_options & INSTRUMENT_BUFFERS) != 0;
+	instr->need_walusage = (instrument_options & INSTRUMENT_WAL) != 0;
 	instr->need_timer = (instrument_options & INSTRUMENT_TIMER) != 0;
 }
 
@@ -67,6 +73,9 @@ InstrStartNode(Instrumentation *instr)
 	/* save buffer usage totals at node entry, if needed */
 	if (instr->need_bufusage)
 		instr->bufusage_start = pgBufferUsage;
+
+	if (instr->need_walusage)
+		instr->walusage_start = pgWalUsage;
 }
 
 /* Exit from a plan node */
@@ -95,6 +104,10 @@ InstrStopNode(Instrumentation *instr, double nTuples)
 		BufferUsageAccumDiff(&instr->bufusage,
 							 &pgBufferUsage, &instr->bufusage_start);
 
+	if (instr->need_walusage)
+		WalUsageAccumDiff(&instr->walusage,
+						  &pgWalUsage, &instr->walusage_start);
+
 	/* Is this the first tuple of this cycle? */
 	if (!instr->running)
 	{
@@ -158,6 +171,9 @@ InstrAggNode(Instrumentation *dst, Instrumentation *add)
 	/* Add delta of buffer usage since entry to node's totals */
 	if (dst->need_bufusage)
 		BufferUsageAdd(&dst->bufusage, &add->bufusage);
+
+	if (dst->need_walusage)
+		WalUsageAdd(&dst->walusage, &add->walusage);
 }
 
 /* note current values during parallel executor startup */
@@ -165,21 +181,29 @@ void
 InstrStartParallelQuery(void)
 {
 	save_pgBufferUsage = pgBufferUsage;
+	save_pgWalUsage = pgWalUsage;
 }
 
 /* report usage after parallel executor shutdown */
 void
-InstrEndParallelQuery(BufferUsage *result)
+InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
 {
-	memset(result, 0, sizeof(BufferUsage));
-	BufferUsageAccumDiff(result, &pgBufferUsage, &save_pgBufferUsage);
+	if (bufusage)
+	{
+		memset(bufusage, 0, sizeof(BufferUsage));
+		BufferUsageAccumDiff(bufusage, &pgBufferUsage, &save_pgBufferUsage);
+	}
+	memset(walusage, 0, sizeof(WalUsage));
+	WalUsageAccumDiff(walusage, &pgWalUsage, &save_pgWalUsage);
 }
 
 /* accumulate work done by workers in leader's stats */
 void
-InstrAccumParallelQuery(BufferUsage *result)
+InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
 {
-	BufferUsageAdd(&pgBufferUsage, result);
+	if (bufusage)
+		BufferUsageAdd(&pgBufferUsage, bufusage);
+	WalUsageAdd(&pgWalUsage, walusage);
 }
 
 /* dst += add */
@@ -221,3 +245,20 @@ BufferUsageAccumDiff(BufferUsage *dst,
 	INSTR_TIME_ACCUM_DIFF(dst->blk_write_time,
 						  add->blk_write_time, sub->blk_write_time);
 }
+
+/* helper functions for WAL usage accumulation */
+static void
+WalUsageAdd(WalUsage *dst, WalUsage *add)
+{
+	dst->wal_bytes += add->wal_bytes;
+	dst->wal_records += add->wal_records;
+	dst->wal_num_fpw += add->wal_num_fpw;
+}
+
+void
+WalUsageAccumDiff(WalUsage *dst, const WalUsage *add, const WalUsage *sub)
+{
+	dst->wal_bytes += add->wal_bytes - sub->wal_bytes;
+	dst->wal_records += add->wal_records - sub->wal_records;
+	dst->wal_num_fpw += add->wal_num_fpw - sub->wal_num_fpw;
+}
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 9ec7b31..b91e724 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -259,7 +259,8 @@ struct XLogRecData;
 
 extern XLogRecPtr XLogInsertRecord(struct XLogRecData *rdata,
 								   XLogRecPtr fpw_lsn,
-								   uint8 flags);
+								   uint8 flags,
+								   int num_fpw);
 extern void XLogFlush(XLogRecPtr RecPtr);
 extern bool XLogBackgroundFlush(void);
 extern bool XLogNeedsFlush(XLogRecPtr RecPtr);
diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h
index 17d07cf..5a39a5b 100644
--- a/src/include/executor/execParallel.h
+++ b/src/include/executor/execParallel.h
@@ -26,6 +26,7 @@ typedef struct ParallelExecutorInfo
 	PlanState  *planstate;		/* plan subtree we're running in parallel */
 	ParallelContext *pcxt;		/* parallel context we're using */
 	BufferUsage *buffer_usage;	/* points to bufusage area in DSM */
+	WalUsage   *wal_usage;		/* walusage area in DSM */
 	SharedExecutorInstrumentation *instrumentation; /* optional */
 	struct SharedJitInstrumentation *jit_instrumentation;	/* optional */
 	dsa_area   *area;			/* points to DSA area in DSM */
diff --git a/src/include/executor/instrument.h b/src/include/executor/instrument.h
index 3825a5a..64439c6 100644
--- a/src/include/executor/instrument.h
+++ b/src/include/executor/instrument.h
@@ -32,12 +32,20 @@ typedef struct BufferUsage
 	instr_time	blk_write_time; /* time spent writing */
 } BufferUsage;
 
+typedef struct WalUsage
+{
+	long		wal_records;	/* # of WAL records produced */
+	long		wal_num_fpw;	/* # of WAL full page image writes produced */
+	uint64		wal_bytes;		/* size of WAL records produced */
+} WalUsage;
+
 /* Flag bits included in InstrAlloc's instrument_options bitmask */
 typedef enum InstrumentOption
 {
 	INSTRUMENT_TIMER = 1 << 0,	/* needs timer (and row counts) */
 	INSTRUMENT_BUFFERS = 1 << 1,	/* needs buffer usage */
 	INSTRUMENT_ROWS = 1 << 2,	/* needs row count */
+	INSTRUMENT_WAL = 1 << 3,	/* needs WAL usage */
 	INSTRUMENT_ALL = PG_INT32_MAX
 } InstrumentOption;
 
@@ -46,6 +54,7 @@ typedef struct Instrumentation
 	/* Parameters set at node creation: */
 	bool		need_timer;		/* true if we need timer data */
 	bool		need_bufusage;	/* true if we need buffer usage data */
+	bool		need_walusage;	/* true if we need WAL usage data */
 	/* Info about current plan cycle: */
 	bool		running;		/* true if we've completed first tuple */
 	instr_time	starttime;		/* start time of current iteration of node */
@@ -53,6 +62,7 @@ typedef struct Instrumentation
 	double		firsttuple;		/* time for first tuple of this cycle */
 	double		tuplecount;		/* # of tuples emitted so far this cycle */
 	BufferUsage bufusage_start; /* buffer usage at start */
+	WalUsage	walusage_start; /* WAL usage at start */
 	/* Accumulated statistics across all completed cycles: */
 	double		startup;		/* total startup time (in seconds) */
 	double		total;			/* total time (in seconds) */
@@ -62,6 +72,7 @@ typedef struct Instrumentation
 	double		nfiltered1;		/* # of tuples removed by scanqual or joinqual */
 	double		nfiltered2;		/* # of tuples removed by "other" quals */
 	BufferUsage bufusage;		/* total buffer usage */
+	WalUsage	walusage;		/* total WAL usage */
 } Instrumentation;
 
 typedef struct WorkerInstrumentation
@@ -71,6 +82,7 @@ typedef struct WorkerInstrumentation
 } WorkerInstrumentation;
 
 extern PGDLLIMPORT BufferUsage pgBufferUsage;
+extern PGDLLIMPORT WalUsage pgWalUsage;
 
 extern Instrumentation *InstrAlloc(int n, int instrument_options);
 extern void InstrInit(Instrumentation *instr, int instrument_options);
@@ -79,9 +91,11 @@ extern void InstrStopNode(Instrumentation *instr, double nTuples);
 extern void InstrEndLoop(Instrumentation *instr);
 extern void InstrAggNode(Instrumentation *dst, Instrumentation *add);
 extern void InstrStartParallelQuery(void);
-extern void InstrEndParallelQuery(BufferUsage *result);
-extern void InstrAccumParallelQuery(BufferUsage *result);
+extern void InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage);
+extern void InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage);
 extern void BufferUsageAccumDiff(BufferUsage *dst,
 								 const BufferUsage *add, const BufferUsage *sub);
+extern void WalUsageAccumDiff(WalUsage *dst, const WalUsage *add,
+							  const WalUsage *sub);
 
 #endif							/* INSTRUMENT_H */
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 939de98..3462352 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2643,6 +2643,7 @@ WalSndCtlData
 WalSndSendDataCallback
 WalSndState
 WalTimeSample
+WalUsage
 WalWriteMethod
 Walfile
 WindowAgg
-- 
1.8.3.1

