From c1c3b700679a5376362120488cd68b0d0c3033e5 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Wed, 24 Nov 2021 10:32:56 -0500
Subject: [PATCH v21 3/8] Add IO operation counters to PgBackendStatus

Add an array of counters to PgBackendStatus which count the buffers
allocated, extended, fsynced, and written by a given backend.

Each "IO Op" (alloc, fsync, extend, write) is counted per "IO Path"
(direct, local, shared, or strategy).

"local" and "shared" IO Path counters count operations on local and
shared buffers.

The "strategy" IO Path counts buffers alloc'd/written/read/fsync'd as
part of a BufferAccessStrategy.

The "direct" IO Path counts blocks of IO which are read, written, or
fsync'd using smgrwrite/extend/immedsync directly (as opposed to through
[Local]BufferAlloc()).

With this commit, backends increment a counter in the array in their
PgBackendStatus when performing an IO operation.

Future patches will persist the IO stat counters from a backend's
PgBackendStatus upon backend exit and use the counters to provide
observability of database IO operations.

Note that this commit does not add code to increment the "direct" path.
A future patch adding wrappers for smgrwrite(), smgrextend(), and
smgrimmedsync() would provide a good location to call pgstat_inc_ioop()
for unbuffered IO and avoid regressions for future users of these
functions.

Author: Melanie Plageman <melanieplageman@gmail.com>
Reviewed-by: Justin Pryzby <pryzby@telsasoft.com>
Discussion: https://www.postgresql.org/message-id/flat/20200124195226.lth52iydq2n2uilq%40alap3.anarazel.de
---
 src/backend/postmaster/checkpointer.c       |  1 +
 src/backend/storage/buffer/bufmgr.c         | 47 +++++++++++---
 src/backend/storage/buffer/freelist.c       | 22 ++++++-
 src/backend/storage/buffer/localbuf.c       |  3 +
 src/backend/storage/sync/sync.c             |  1 +
 src/backend/utils/activity/backend_status.c | 10 +++
 src/include/storage/buf_internals.h         |  4 +-
 src/include/utils/backend_status.h          | 68 +++++++++++++++++++++
 8 files changed, 141 insertions(+), 15 deletions(-)

diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c
index 4488e3a443..4e88327425 100644
--- a/src/backend/postmaster/checkpointer.c
+++ b/src/backend/postmaster/checkpointer.c
@@ -1106,6 +1106,7 @@ ForwardSyncRequest(const FileTag *ftag, SyncRequestType type)
 		if (!AmBackgroundWriterProcess())
 			CheckpointerShmem->num_backend_fsync++;
 		LWLockRelease(CheckpointerCommLock);
+		pgstat_inc_ioop(IOOP_FSYNC, IOPATH_SHARED);
 		return false;
 	}
 
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index f5459c68f8..a6e446f29a 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -481,7 +481,7 @@ static BufferDesc *BufferAlloc(SMgrRelation smgr,
 							   BlockNumber blockNum,
 							   BufferAccessStrategy strategy,
 							   bool *foundPtr);
-static void FlushBuffer(BufferDesc *buf, SMgrRelation reln);
+static void FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOPath iopath);
 static void FindAndDropRelFileNodeBuffers(RelFileNode rnode,
 										  ForkNumber forkNum,
 										  BlockNumber nForkBlock,
@@ -973,6 +973,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 
 	if (isExtend)
 	{
+		pgstat_inc_ioop(IOOP_EXTEND, IOPATH_SHARED);
 		/* new buffers are zero-filled */
 		MemSet((char *) bufBlock, 0, BLCKSZ);
 		/* don't set checksum for all-zero page */
@@ -1173,6 +1174,8 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 	/* Loop here in case we have to try another victim buffer */
 	for (;;)
 	{
+		bool		from_ring;
+
 		/*
 		 * Ensure, while the spinlock's not yet held, that there's a free
 		 * refcount entry.
@@ -1183,7 +1186,7 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 		 * Select a victim buffer.  The buffer is returned with its header
 		 * spinlock still held!
 		 */
-		buf = StrategyGetBuffer(strategy, &buf_state);
+		buf = StrategyGetBuffer(strategy, &buf_state, &from_ring);
 
 		Assert(BUF_STATE_GET_REFCOUNT(buf_state) == 0);
 
@@ -1220,6 +1223,8 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 			if (LWLockConditionalAcquire(BufferDescriptorGetContentLock(buf),
 										 LW_SHARED))
 			{
+				IOPath		iopath;
+
 				/*
 				 * If using a nondefault strategy, and writing the buffer
 				 * would require a WAL flush, let the strategy decide whether
@@ -1237,7 +1242,7 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 					UnlockBufHdr(buf, buf_state);
 
 					if (XLogNeedsFlush(lsn) &&
-						StrategyRejectBuffer(strategy, buf))
+						StrategyRejectBuffer(strategy, buf, &from_ring))
 					{
 						/* Drop lock/pin and loop around for another buffer */
 						LWLockRelease(BufferDescriptorGetContentLock(buf));
@@ -1246,13 +1251,27 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 					}
 				}
 
+				/*
+				 * When a strategy is in use, if the dirty buffer was selected
+				 * from the strategy ring and we did not bother checking the
+				 * freelist or doing a clock sweep to look for a clean shared
+				 * buffer to use, the write will be counted as a strategy
+				 * write. However, if the dirty buffer was obtained from the
+				 * freelist or a clock sweep, it is counted as a regular write.
+				 *
+				 * When a strategy is not in use, at this point the write can
+				 * only be a "regular" write of a dirty buffer.
+				 */
+
+				iopath = from_ring ? IOPATH_STRATEGY : IOPATH_SHARED;
+
 				/* OK, do the I/O */
 				TRACE_POSTGRESQL_BUFFER_WRITE_DIRTY_START(forkNum, blockNum,
 														  smgr->smgr_rnode.node.spcNode,
 														  smgr->smgr_rnode.node.dbNode,
 														  smgr->smgr_rnode.node.relNode);
 
-				FlushBuffer(buf, NULL);
+				FlushBuffer(buf, NULL, iopath);
 				LWLockRelease(BufferDescriptorGetContentLock(buf));
 
 				ScheduleBufferTagForWriteback(&BackendWritebackContext,
@@ -2553,10 +2572,11 @@ SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context)
 	 * Pin it, share-lock it, write it.  (FlushBuffer will do nothing if the
 	 * buffer is clean by the time we've locked it.)
 	 */
+
 	PinBuffer_Locked(bufHdr);
 	LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED);
 
-	FlushBuffer(bufHdr, NULL);
+	FlushBuffer(bufHdr, NULL, IOPATH_SHARED);
 
 	LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
 
@@ -2804,9 +2824,12 @@ BufferGetTag(Buffer buffer, RelFileNode *rnode, ForkNumber *forknum,
  *
  * If the caller has an smgr reference for the buffer's relation, pass it
  * as the second parameter.  If not, pass NULL.
+ *
+ * IOPath will always be IOPATH_SHARED except when a buffer access strategy is
+ * used and the buffer being flushed is a buffer from the strategy ring.
  */
 static void
-FlushBuffer(BufferDesc *buf, SMgrRelation reln)
+FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOPath iopath)
 {
 	XLogRecPtr	recptr;
 	ErrorContextCallback errcallback;
@@ -2898,6 +2921,8 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln)
 			  bufToWrite,
 			  false);
 
+	pgstat_inc_ioop(IOOP_WRITE, iopath);
+
 	if (track_io_timing)
 	{
 		INSTR_TIME_SET_CURRENT(io_time);
@@ -3534,6 +3559,8 @@ FlushRelationBuffers(Relation rel)
 						  localpage,
 						  false);
 
+				pgstat_inc_ioop(IOOP_WRITE, IOPATH_LOCAL);
+
 				buf_state &= ~(BM_DIRTY | BM_JUST_DIRTIED);
 				pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
 
@@ -3569,7 +3596,7 @@ FlushRelationBuffers(Relation rel)
 		{
 			PinBuffer_Locked(bufHdr);
 			LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED);
-			FlushBuffer(bufHdr, RelationGetSmgr(rel));
+			FlushBuffer(bufHdr, RelationGetSmgr(rel), IOPATH_SHARED);
 			LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
 			UnpinBuffer(bufHdr, true);
 		}
@@ -3665,7 +3692,7 @@ FlushRelationsAllBuffers(SMgrRelation *smgrs, int nrels)
 		{
 			PinBuffer_Locked(bufHdr);
 			LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED);
-			FlushBuffer(bufHdr, srelent->srel);
+			FlushBuffer(bufHdr, srelent->srel, IOPATH_SHARED);
 			LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
 			UnpinBuffer(bufHdr, true);
 		}
@@ -3721,7 +3748,7 @@ FlushDatabaseBuffers(Oid dbid)
 		{
 			PinBuffer_Locked(bufHdr);
 			LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED);
-			FlushBuffer(bufHdr, NULL);
+			FlushBuffer(bufHdr, NULL, IOPATH_SHARED);
 			LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
 			UnpinBuffer(bufHdr, true);
 		}
@@ -3748,7 +3775,7 @@ FlushOneBuffer(Buffer buffer)
 
 	Assert(LWLockHeldByMe(BufferDescriptorGetContentLock(bufHdr)));
 
-	FlushBuffer(bufHdr, NULL);
+	FlushBuffer(bufHdr, NULL, IOPATH_SHARED);
 }
 
 /*
diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c
index 3b98e68d50..15a9de4c9d 100644
--- a/src/backend/storage/buffer/freelist.c
+++ b/src/backend/storage/buffer/freelist.c
@@ -19,6 +19,7 @@
 #include "storage/buf_internals.h"
 #include "storage/bufmgr.h"
 #include "storage/proc.h"
+#include "utils/backend_status.h"
 
 #define INT_ACCESS_ONCE(var)	((int)(*((volatile int *)&(var))))
 
@@ -198,7 +199,7 @@ have_free_buffer(void)
  *	return the buffer with the buffer header spinlock still held.
  */
 BufferDesc *
-StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state)
+StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_ring)
 {
 	BufferDesc *buf;
 	int			bgwprocno;
@@ -212,7 +213,8 @@ StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state)
 	if (strategy != NULL)
 	{
 		buf = GetBufferFromRing(strategy, buf_state);
-		if (buf != NULL)
+		*from_ring = buf != NULL;
+		if (*from_ring)
 			return buf;
 	}
 
@@ -247,6 +249,7 @@ StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state)
 	 * the rate of buffer consumption.  Note that buffers recycled by a
 	 * strategy object are intentionally not counted here.
 	 */
+	pgstat_inc_ioop(IOOP_ALLOC, IOPATH_SHARED);
 	pg_atomic_fetch_add_u32(&StrategyControl->numBufferAllocs, 1);
 
 	/*
@@ -683,8 +686,14 @@ AddBufferToRing(BufferAccessStrategy strategy, BufferDesc *buf)
  * if this buffer should be written and re-used.
  */
 bool
-StrategyRejectBuffer(BufferAccessStrategy strategy, BufferDesc *buf)
+StrategyRejectBuffer(BufferAccessStrategy strategy, BufferDesc *buf, bool *from_ring)
 {
+	/*
+	 * Start by assuming that we will use the dirty buffer selected by
+	 * StrategyGetBuffer().
+	 */
+	*from_ring = true;
+
 	/* We only do this in bulkread mode */
 	if (strategy->btype != BAS_BULKREAD)
 		return false;
@@ -700,5 +709,12 @@ StrategyRejectBuffer(BufferAccessStrategy strategy, BufferDesc *buf)
 	 */
 	strategy->buffers[strategy->current] = InvalidBuffer;
 
+	/*
+	 * Since we will not be writing out a dirty buffer from the ring, set
+	 * from_ring to false so that the caller does not count this write as a
+	 * "strategy write" and can do proper bookkeeping.
+	 */
+	*from_ring = false;
+
 	return true;
 }
diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c
index e71f95ac1f..864b4db20e 100644
--- a/src/backend/storage/buffer/localbuf.c
+++ b/src/backend/storage/buffer/localbuf.c
@@ -20,6 +20,7 @@
 #include "executor/instrument.h"
 #include "storage/buf_internals.h"
 #include "storage/bufmgr.h"
+#include "utils/backend_status.h"
 #include "utils/guc.h"
 #include "utils/memutils.h"
 #include "utils/resowner_private.h"
@@ -226,6 +227,8 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
 				  localpage,
 				  false);
 
+		pgstat_inc_ioop(IOOP_WRITE, IOPATH_LOCAL);
+
 		/* Mark not-dirty now in case we error out below */
 		buf_state &= ~BM_DIRTY;
 		pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
diff --git a/src/backend/storage/sync/sync.c b/src/backend/storage/sync/sync.c
index e161d57761..266fb9ca0a 100644
--- a/src/backend/storage/sync/sync.c
+++ b/src/backend/storage/sync/sync.c
@@ -420,6 +420,7 @@ ProcessSyncRequests(void)
 					total_elapsed += elapsed;
 					processed++;
 
+					pgstat_inc_ioop(IOOP_FSYNC, IOPATH_SHARED);
 					if (log_checkpoints)
 						elog(DEBUG1, "checkpoint sync: number=%d file=%s time=%.3f ms",
 							 processed,
diff --git a/src/backend/utils/activity/backend_status.c b/src/backend/utils/activity/backend_status.c
index 079321599d..79410e0b2c 100644
--- a/src/backend/utils/activity/backend_status.c
+++ b/src/backend/utils/activity/backend_status.c
@@ -405,6 +405,16 @@ pgstat_bestart(void)
 	lbeentry.st_progress_command_target = InvalidOid;
 	lbeentry.st_query_id = UINT64CONST(0);
 
+	for (int i = 0; i < IOPATH_NUM_TYPES; i++)
+	{
+		IOOpCounters *io_ops = &lbeentry.io_path_stats[i];
+
+		pg_atomic_init_u64(&io_ops->allocs, 0);
+		pg_atomic_init_u64(&io_ops->extends, 0);
+		pg_atomic_init_u64(&io_ops->fsyncs, 0);
+		pg_atomic_init_u64(&io_ops->writes, 0);
+	}
+
 	/*
 	 * we don't zero st_progress_param here to save cycles; nobody should
 	 * examine it until st_progress_command has been set to something other
diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index b903d2bcaf..64f4fc3b96 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -310,10 +310,10 @@ extern void ScheduleBufferTagForWriteback(WritebackContext *context, BufferTag *
 
 /* freelist.c */
 extern BufferDesc *StrategyGetBuffer(BufferAccessStrategy strategy,
-									 uint32 *buf_state);
+									 uint32 *buf_state, bool *from_ring);
 extern void StrategyFreeBuffer(BufferDesc *buf);
 extern bool StrategyRejectBuffer(BufferAccessStrategy strategy,
-								 BufferDesc *buf);
+								 BufferDesc *buf, bool *from_ring);
 
 extern int	StrategySyncStart(uint32 *complete_passes, uint32 *num_buf_alloc);
 extern void StrategyNotifyBgWriter(int bgwprocno);
diff --git a/src/include/utils/backend_status.h b/src/include/utils/backend_status.h
index 8217d0cb6b..950b7396a5 100644
--- a/src/include/utils/backend_status.h
+++ b/src/include/utils/backend_status.h
@@ -13,6 +13,7 @@
 #include "datatype/timestamp.h"
 #include "libpq/pqcomm.h"
 #include "miscadmin.h"			/* for BackendType */
+#include "port/atomics.h"
 #include "utils/backend_progress.h"
 
 
@@ -31,12 +32,47 @@ typedef enum BackendState
 	STATE_DISABLED
 } BackendState;
 
+/* ----------
+ * IO Stats reporting utility types
+ * ----------
+ */
+
+typedef enum IOOp
+{
+	IOOP_ALLOC,
+	IOOP_EXTEND,
+	IOOP_FSYNC,
+	IOOP_WRITE,
+} IOOp;
+
+#define IOOP_NUM_TYPES (IOOP_WRITE + 1)
+
+typedef enum IOPath
+{
+	IOPATH_DIRECT,
+	IOPATH_LOCAL,
+	IOPATH_SHARED,
+	IOPATH_STRATEGY,
+} IOPath;
+
+#define IOPATH_NUM_TYPES (IOPATH_STRATEGY + 1)
 
 /* ----------
  * Shared-memory data structures
  * ----------
  */
 
+/*
+ * Structure for counting all types of IOOp for a live backend.
+ */
+typedef struct IOOpCounters
+{
+	pg_atomic_uint64 allocs;
+	pg_atomic_uint64 extends;
+	pg_atomic_uint64 fsyncs;
+	pg_atomic_uint64 writes;
+} IOOpCounters;
+
 /*
  * PgBackendSSLStatus
  *
@@ -168,6 +204,12 @@ typedef struct PgBackendStatus
 
 	/* query identifier, optionally computed using post_parse_analyze_hook */
 	uint64		st_query_id;
+
+	/*
+	 * Stats on all IOOps for all IOPaths for this backend. These should be
+	 * incremented whenever an IO Operation is performed.
+	 */
+	IOOpCounters	io_path_stats[IOPATH_NUM_TYPES];
 } PgBackendStatus;
 
 
@@ -296,6 +338,32 @@ extern void pgstat_bestart(void);
 extern void pgstat_clear_backend_activity_snapshot(void);
 
 /* Activity reporting functions */
+
+static inline void
+pgstat_inc_ioop(IOOp io_op, IOPath io_path)
+{
+	IOOpCounters *io_ops;
+	PgBackendStatus *beentry = MyBEEntry;
+
+	Assert(beentry);
+
+	io_ops = &beentry->io_path_stats[io_path];
+	switch (io_op)
+	{
+		case IOOP_ALLOC:
+			pg_atomic_unlocked_inc_counter(&io_ops->allocs);
+			break;
+		case IOOP_EXTEND:
+			pg_atomic_unlocked_inc_counter(&io_ops->extends);
+			break;
+		case IOOP_FSYNC:
+			pg_atomic_unlocked_inc_counter(&io_ops->fsyncs);
+			break;
+		case IOOP_WRITE:
+			pg_atomic_unlocked_inc_counter(&io_ops->writes);
+			break;
+	}
+}
 extern void pgstat_report_activity(BackendState state, const char *cmd_str);
 extern void pgstat_report_query_id(uint64 query_id, bool force);
 extern void pgstat_report_tempfile(size_t filesize);
-- 
2.32.0

