On Sun, Dec 25, 2022 at 4:55 PM Dilip Kumar <dilipbal...@gmail.com> wrote:
>
> > > This adds copying of the whole page (at least) at every WAL *record*
> > > read,
> >
> > In the worst case yes, but that may not always be true. On a typical
> > production server with decent write traffic, it happens that the
> > callers of WALRead() read a full WAL page of size XLOG_BLCKSZ bytes or
> > MAX_SEND_SIZE bytes.
>
> I agree with this.
>
> > > This patch copies the bleeding edge WAL page without recording the
> > > (next) insertion point nor checking whether all in-progress insertion
> > > behind the target LSN have finished. Thus the copied page may have
> > > holes.  That being said, the sequential-reading nature and the fact
> > > that WAL buffers are zero-initialized may make it work for recovery,
> > > but I don't think this also works for replication.
> >
> > WALRead() callers are smart enough to take the flushed bytes only.
> > Although they read the whole WAL page, they calculate the valid bytes.
>
> Right
>
> On first read the patch looks good, although it needs some more
> thoughts on 'XXX' comments in the patch.

Thanks a lot for reviewing.

Here are some open points that I mentioned in v1 patch:

1.
+     * XXX: Perhaps, measuring the immediate lock availability and its impact
+     * on concurrent WAL writers is a good idea here.

It was shown in my testng upthread [1] that the patch does no harm in
this regard. It will be great if other members try testing in their
respective environments and use cases.

2.
+     * XXX: Perhaps, returning if lock is not immediately available a good idea
+     * here. The caller can then go ahead with reading WAL from WAL file.

After thinking a bit more on this, ISTM that doing the above is right
to not cause any contention when the lock is busy. I've done so in the
v2 patch.

3.
+     * XXX: Perhaps, quickly finding if the given WAL record is in WAL buffers
+     * a good idea here. This avoids unnecessary lock acquire-release cycles.
+     * One way to do that is by maintaining oldest WAL record that's currently
+     * present in WAL buffers.

I think by doing the above we might end up creating a new point of
contention. Because shared variables to track min and max available
LSNs in the WAL buffers will need to be protected against all the
concurrent writers. Also, with the change that's done in (2) above,
that is, quickly exiting if the lock was busy, this comment seems
unnecessary to worry about. Hence, I decided to leave it there.

4.
+         * XXX: Perhaps, we can further go and validate the found page header,
+         * record header and record at least in assert builds, something like
+         * the xlogreader.c does and return if any of those validity checks
+         * fail. Having said that, we stick to the minimal checks for now.

I was being over-cautious initially. The fact that we acquire
WALBufMappingLock while reading the needed WAL buffer page itself
guarantees that no one else initializes it/makes it ready for next use
in AdvanceXLInsertBuffer(). The checks that we have for page header
(xlp_magic, xlp_pageaddr and xlp_tli) in the patch are enough for us
to ensure that we're not reading a page that got just initialized. The
callers will anyway perform extensive checks on page and record in
XLogReaderValidatePageHeader() and ValidXLogRecordHeader()
respectively. If any such failures occur after reading WAL from WAL
buffers, then that must be treated as a bug IMO. Hence, I don't think
we need to do the above.

> And also I do not like that XLogReadFromBuffers() is using 3 bools
> hit/partial hit/miss, instead of this we can use an enum or some
> tristate variable, I think that will be cleaner.

Yeah, that seems more verbose, all that information can be deduced
from requested bytes and read bytes, I've done so in the v2 patch.

Please review the attached v2 patch further.

I'm also attaching two helper patches (as .txt files) herewith for
testing that basically adds WAL read stats -
USE-ON-HEAD-Collect-WAL-read-from-file-stats.txt - apply on HEAD and
monitor pg_stat_replication for per-walsender WAL read from WAL file
stats. USE-ON-PATCH-Collect-WAL-read-from-buffers-and-file-stats.txt -
apply on v2 patch and monitor pg_stat_replication for per-walsender
WAL read from WAL buffers and WAL file stats.

[1] 
https://www.postgresql.org/message-id/CALj2ACXUbvON86vgwTkum8ab3bf1%3DHkMxQ5hZJZS3ZcJn8NEXQ%40mail.gmail.com

--
Bharath Rupireddy
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
From 6517e50f482f88ea5185609ff4dcf0e0256475d5 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostg...@gmail.com>
Date: Mon, 26 Dec 2022 08:14:11 +0000
Subject: [PATCH v2] Collect WAL read from file stats for WAL senders

---
 doc/src/sgml/monitoring.sgml                | 31 +++++++++++
 src/backend/access/transam/xlogreader.c     | 33 ++++++++++--
 src/backend/access/transam/xlogutils.c      |  2 +-
 src/backend/catalog/system_views.sql        |  5 +-
 src/backend/replication/walsender.c         | 58 +++++++++++++++++++--
 src/bin/pg_waldump/pg_waldump.c             |  2 +-
 src/include/access/xlogreader.h             | 21 ++++++--
 src/include/catalog/pg_proc.dat             |  6 +--
 src/include/replication/walsender_private.h |  4 ++
 src/test/regress/expected/rules.out         |  7 ++-
 10 files changed, 151 insertions(+), 18 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 363b183e5f..fdf4c7d774 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -2615,6 +2615,37 @@ SELECT pid, wait_event_type, wait_event FROM 
pg_stat_activity WHERE wait_event i
        Send time of last reply message received from standby server
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>wal_read</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times WAL data is read from disk
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>wal_read_bytes</structfield> <type>numeric</type>
+      </para>
+      <para>
+       Total amount of WAL read from disk in bytes
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>wal_read_time</structfield> <type>double precision</type>
+      </para>
+      <para>
+       Total amount of time spent reading WAL from disk via
+       <function>WALRead</function> request, in milliseconds
+       (if <xref linkend="guc-track-wal-io-timing"/> is enabled,
+       otherwise zero).
+      </para></entry>
+     </row>
+
     </tbody>
    </tgroup>
   </table>
diff --git a/src/backend/access/transam/xlogreader.c 
b/src/backend/access/transam/xlogreader.c
index a38a80e049..7453724a07 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -31,6 +31,7 @@
 #include "access/xlogrecord.h"
 #include "catalog/pg_control.h"
 #include "common/pg_lzcompress.h"
+#include "portability/instr_time.h"
 #include "replication/origin.h"
 
 #ifndef FRONTEND
@@ -1489,9 +1490,9 @@ err:
  * WAL buffers when possible.
  */
 bool
-WALRead(XLogReaderState *state,
-               char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
-               WALReadError *errinfo)
+WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count,
+               TimeLineID tli, WALReadError *errinfo, WALReadStats *stats,
+               bool capture_wal_io_timing)
 {
        char       *p;
        XLogRecPtr      recptr;
@@ -1506,6 +1507,7 @@ WALRead(XLogReaderState *state,
                uint32          startoff;
                int                     segbytes;
                int                     readbytes;
+               instr_time      start;
 
                startoff = XLogSegmentOffset(recptr, state->segcxt.ws_segsize);
 
@@ -1540,6 +1542,10 @@ WALRead(XLogReaderState *state,
                else
                        segbytes = nbytes;
 
+               /* Measure I/O timing to read WAL data if requested by the 
caller. */
+               if (stats != NULL && capture_wal_io_timing)
+                       INSTR_TIME_SET_CURRENT(start);
+
 #ifndef FRONTEND
                pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
 #endif
@@ -1552,6 +1558,27 @@ WALRead(XLogReaderState *state,
                pgstat_report_wait_end();
 #endif
 
+               /* Collect I/O stats if requested by the caller. */
+               if (stats != NULL)
+               {
+                       /* Increment the number of times WAL is read from disk. 
*/
+                       stats->wal_read++;
+
+                       /* Collect bytes read. */
+                       if (readbytes > 0)
+                               stats->wal_read_bytes += readbytes;
+
+                       /* Increment the I/O timing. */
+                       if (capture_wal_io_timing)
+                       {
+                               instr_time      duration;
+
+                               INSTR_TIME_SET_CURRENT(duration);
+                               INSTR_TIME_SUBTRACT(duration, start);
+                               stats->wal_read_time += 
INSTR_TIME_GET_MICROSEC(duration);
+                       }
+               }
+
                if (readbytes <= 0)
                {
                        errinfo->wre_errno = errno;
diff --git a/src/backend/access/transam/xlogutils.c 
b/src/backend/access/transam/xlogutils.c
index 563cba258d..372de2c7d8 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -1027,7 +1027,7 @@ read_local_xlog_page_guts(XLogReaderState *state, 
XLogRecPtr targetPagePtr,
         * zero-padded up to the page boundary if it's incomplete.
         */
        if (!WALRead(state, cur_page, targetPagePtr, XLOG_BLCKSZ, tli,
-                                &errinfo))
+                                &errinfo, NULL, false))
                WALReadRaiseError(&errinfo);
 
        /* number of valid bytes in the buffer */
diff --git a/src/backend/catalog/system_views.sql 
b/src/backend/catalog/system_views.sql
index 2d8104b090..b47f44a852 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -892,7 +892,10 @@ CREATE VIEW pg_stat_replication AS
             W.replay_lag,
             W.sync_priority,
             W.sync_state,
-            W.reply_time
+            W.reply_time,
+            W.wal_read,
+            W.wal_read_bytes,
+            W.wal_read_time
     FROM pg_stat_get_activity(NULL) AS S
         JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid)
         LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid);
diff --git a/src/backend/replication/walsender.c 
b/src/backend/replication/walsender.c
index c11bb3716f..fa02e327f2 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -259,7 +259,7 @@ static bool TransactionIdInRecentPast(TransactionId xid, 
uint32 epoch);
 
 static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
                                                          TimeLineID *tli_p);
-
+static void WalSndAccumulateWalReadStats(WALReadStats *stats);
 
 /* Initialize walsender process before entering the main command loop */
 void
@@ -907,6 +907,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr 
targetPagePtr, int req
        WALReadError errinfo;
        XLogSegNo       segno;
        TimeLineID      currTLI = GetWALInsertionTimeLine();
+       WALReadStats    stats;
 
        /*
         * Since logical decoding is only permitted on a primary server, we know
@@ -932,6 +933,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr 
targetPagePtr, int req
        else
                count = flushptr - targetPagePtr;       /* part of the page 
available */
 
+       MemSet(&stats, 0, sizeof(WALReadStats));
+
        /* now actually read the data, we know it's there */
        if (!WALRead(state,
                                 cur_page,
@@ -940,9 +943,13 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr 
targetPagePtr, int req
                                 state->seg.ws_tli, /* Pass the current TLI 
because only
                                                                         * 
WalSndSegmentOpen controls whether new
                                                                         * TLI 
is needed. */
-                                &errinfo))
+                                &errinfo,
+                                &stats,
+                                track_wal_io_timing))
                WALReadRaiseError(&errinfo);
 
+       WalSndAccumulateWalReadStats(&stats);
+
        /*
         * After reading into the buffer, check that what we read was valid. We 
do
         * this after reading, because even though the segment was present when 
we
@@ -2610,6 +2617,9 @@ InitWalSenderSlot(void)
                        walsnd->sync_standby_priority = 0;
                        walsnd->latch = &MyProc->procLatch;
                        walsnd->replyTime = 0;
+                       walsnd->wal_read_stats.wal_read = 0;
+                       walsnd->wal_read_stats.wal_read_bytes = 0;
+                       walsnd->wal_read_stats.wal_read_time = 0;
                        SpinLockRelease(&walsnd->mutex);
                        /* don't need the lock anymore */
                        MyWalSnd = (WalSnd *) walsnd;
@@ -2730,6 +2740,7 @@ XLogSendPhysical(void)
        Size            nbytes;
        XLogSegNo       segno;
        WALReadError errinfo;
+       WALReadStats stats;
 
        /* If requested switch the WAL sender to the stopping state. */
        if (got_STOPPING)
@@ -2945,6 +2956,8 @@ XLogSendPhysical(void)
        enlargeStringInfo(&output_message, nbytes);
 
 retry:
+       MemSet(&stats, 0, sizeof(WALReadStats));
+
        if (!WALRead(xlogreader,
                                 &output_message.data[output_message.len],
                                 startptr,
@@ -2952,9 +2965,13 @@ retry:
                                 xlogreader->seg.ws_tli,        /* Pass the 
current TLI because
                                                                                
         * only WalSndSegmentOpen controls
                                                                                
         * whether new TLI is needed. */
-                                &errinfo))
+                                &errinfo,
+                                &stats,
+                                track_wal_io_timing))
                WALReadRaiseError(&errinfo);
 
+       WalSndAccumulateWalReadStats(&stats);
+
        /* See logical_read_xlog_page(). */
        XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize);
        CheckXLogRemoved(segno, xlogreader->seg.ws_tli);
@@ -3458,7 +3475,7 @@ offset_to_interval(TimeOffset offset)
 Datum
 pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_WAL_SENDERS_COLS   12
+#define PG_STAT_GET_WAL_SENDERS_COLS   15
        ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
        SyncRepStandbyData *sync_standbys;
        int                     num_standbys;
@@ -3487,9 +3504,13 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
                WalSndState state;
                TimestampTz replyTime;
                bool            is_sync_standby;
+               int64           wal_read;
+               uint64          wal_read_bytes;
+               int64           wal_read_time;
                Datum           values[PG_STAT_GET_WAL_SENDERS_COLS];
                bool            nulls[PG_STAT_GET_WAL_SENDERS_COLS] = {0};
                int                     j;
+               char            buf[256];
 
                /* Collect data from shared memory */
                SpinLockAcquire(&walsnd->mutex);
@@ -3509,6 +3530,9 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
                applyLag = walsnd->applyLag;
                priority = walsnd->sync_standby_priority;
                replyTime = walsnd->replyTime;
+               wal_read = walsnd->wal_read_stats.wal_read;
+               wal_read_bytes = walsnd->wal_read_stats.wal_read_bytes;
+               wal_read_time = walsnd->wal_read_stats.wal_read_time;
                SpinLockRelease(&walsnd->mutex);
 
                /*
@@ -3605,6 +3629,18 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
                                nulls[11] = true;
                        else
                                values[11] = TimestampTzGetDatum(replyTime);
+
+                       values[12] = Int64GetDatum(wal_read);
+
+                       /* Convert to numeric. */
+                       snprintf(buf, sizeof buf, UINT64_FORMAT, 
wal_read_bytes);
+                       values[13] = DirectFunctionCall3(numeric_in,
+                                                                               
         CStringGetDatum(buf),
+                                                                               
         ObjectIdGetDatum(0),
+                                                                               
         Int32GetDatum(-1));
+
+                       /* Convert counter from microsec to millisec for 
display. */
+                       values[14] = Float8GetDatum(((double) wal_read_time) / 
1000.0);
                }
 
                tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
@@ -3849,3 +3885,17 @@ LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
        Assert(time != 0);
        return now - time;
 }
+
+/*
+ * Function to accumulate WAL Read stats for WAL sender.
+ */
+static void
+WalSndAccumulateWalReadStats(WALReadStats *stats)
+{
+       /* Collect I/O stats for walsender. */
+       SpinLockAcquire(&MyWalSnd->mutex);
+       MyWalSnd->wal_read_stats.wal_read += stats->wal_read;
+       MyWalSnd->wal_read_stats.wal_read_bytes += stats->wal_read_bytes;
+       MyWalSnd->wal_read_stats.wal_read_time += stats->wal_read_time;
+       SpinLockRelease(&MyWalSnd->mutex);
+}
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index 9993378ca5..698ce1e9f7 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -364,7 +364,7 @@ WALDumpReadPage(XLogReaderState *state, XLogRecPtr 
targetPagePtr, int reqLen,
        }
 
        if (!WALRead(state, readBuff, targetPagePtr, count, private->timeline,
-                                &errinfo))
+                                &errinfo, NULL, false))
        {
                WALOpenSegment *seg = &errinfo.wre_seg;
                char            fname[MAXPGPATH];
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index e87f91316a..26a2c975de 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -389,9 +389,24 @@ typedef struct WALReadError
        WALOpenSegment wre_seg;         /* Segment we tried to read from. */
 } WALReadError;
 
-extern bool WALRead(XLogReaderState *state,
-                                       char *buf, XLogRecPtr startptr, Size 
count,
-                                       TimeLineID tli, WALReadError *errinfo);
+/*
+ * WAL read stats from WALRead that the callers can use.
+ */
+typedef struct WALReadStats
+{
+       /* Number of times WAL read from disk. */
+       int64   wal_read;
+
+       /* Total amount of WAL read from disk in bytes. */
+       uint64  wal_read_bytes;
+
+       /* Total amount of time spent reading WAL from disk. */
+       int64   wal_read_time;
+} WALReadStats;
+
+extern bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr,
+                                       Size count, TimeLineID tli, 
WALReadError *errinfo,
+                                       WALReadStats *stats, bool 
capture_wal_io_timing);
 
 /* Functions for decoding an XLogRecord */
 
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 7056c95371..18320cf846 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5391,9 +5391,9 @@
   proname => 'pg_stat_get_wal_senders', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => '',
-  proallargtypes => 
'{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text,timestamptz}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => 
'{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state,reply_time}',
+  proallargtypes => 
'{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text,timestamptz,int8,numeric,float8}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => 
'{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state,reply_time,wal_read,wal_read_bytes,wal_read_time}',
   prosrc => 'pg_stat_get_wal_senders' },
 { oid => '3317', descr => 'statistics: information about WAL receiver',
   proname => 'pg_stat_get_wal_receiver', proisstrict => 'f', provolatile => 
's',
diff --git a/src/include/replication/walsender_private.h 
b/src/include/replication/walsender_private.h
index 7897c74589..35413ea0d2 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -13,6 +13,7 @@
 #define _WALSENDER_PRIVATE_H
 
 #include "access/xlog.h"
+#include "access/xlogreader.h"
 #include "nodes/nodes.h"
 #include "replication/syncrep.h"
 #include "storage/latch.h"
@@ -78,6 +79,9 @@ typedef struct WalSnd
         * Timestamp of the last message received from standby.
         */
        TimestampTz replyTime;
+
+       /* WAL read stats for walsender. */
+       WALReadStats wal_read_stats;
 } WalSnd;
 
 extern PGDLLIMPORT WalSnd *MyWalSnd;
diff --git a/src/test/regress/expected/rules.out 
b/src/test/regress/expected/rules.out
index fb9f936d43..fd9d298e79 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2054,9 +2054,12 @@ pg_stat_replication| SELECT s.pid,
     w.replay_lag,
     w.sync_priority,
     w.sync_state,
-    w.reply_time
+    w.reply_time,
+    w.wal_read,
+    w.wal_read_bytes,
+    w.wal_read_time
    FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, 
application_name, state, query, wait_event_type, wait_event, xact_start, 
query_start, backend_start, state_change, client_addr, client_hostname, 
client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, 
sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, 
gss_princ, gss_enc, leader_pid, query_id)
-     JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, 
flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, 
sync_state, reply_time) ON ((s.pid = w.pid)))
+     JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, 
flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, 
sync_state, reply_time, wal_read, wal_read_bytes, wal_read_time) ON ((s.pid = 
w.pid)))
      LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
 pg_stat_replication_slots| SELECT s.slot_name,
     s.spill_txns,
-- 
2.34.1

Attachment: v2-0001-Improve-WALRead-to-suck-data-directly-from-WAL-bu.patch
Description: Binary data

From f90dfcbd1968280feec6d116568697225854ac40 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostg...@gmail.com>
Date: Mon, 26 Dec 2022 08:13:07 +0000
Subject: [PATCH v2] Collect WAL read from buffers and file stats for WAL
 senders

---
 doc/src/sgml/monitoring.sgml                | 61 +++++++++++++++
 src/backend/access/transam/xlogreader.c     | 56 +++++++++++++-
 src/backend/access/transam/xlogutils.c      |  2 +-
 src/backend/catalog/system_views.sql        |  8 +-
 src/backend/replication/walsender.c         | 85 ++++++++++++++++++++-
 src/bin/pg_waldump/pg_waldump.c             |  2 +-
 src/include/access/xlogreader.h             | 30 +++++++-
 src/include/catalog/pg_proc.dat             |  6 +-
 src/include/replication/walsender_private.h |  4 +
 src/test/regress/expected/rules.out         | 10 ++-
 10 files changed, 246 insertions(+), 18 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 363b183e5f..239e0b0db9 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -2615,6 +2615,67 @@ SELECT pid, wait_event_type, wait_event FROM 
pg_stat_activity WHERE wait_event i
        Send time of last reply message received from standby server
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>wal_read</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times WAL data is read from disk
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>wal_read_bytes</structfield> <type>numeric</type>
+      </para>
+      <para>
+       Total amount of WAL read from disk in bytes
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>wal_read_time</structfield> <type>double precision</type>
+      </para>
+      <para>
+       Total amount of time spent reading WAL from disk via
+       <function>WALRead</function> request, in milliseconds
+       (if <xref linkend="guc-track-wal-io-timing"/> is enabled,
+       otherwise zero).
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>wal_read_buffers</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times WAL data is read from WAL buffers
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>wal_read_bytes_buffers</structfield> <type>numeric</type>
+      </para>
+      <para>
+       Total amount of WAL read from WAL buffers in bytes
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>wal_read_time_buffers</structfield> <type>double 
precision</type>
+      </para>
+      <para>
+       Total amount of time spent reading WAL from WAL buffers via
+       <function>WALRead</function> request, in milliseconds
+       (if <xref linkend="guc-track-wal-io-timing"/> is enabled,
+       otherwise zero).
+      </para></entry>
+     </row>
+
     </tbody>
    </tgroup>
   </table>
diff --git a/src/backend/access/transam/xlogreader.c 
b/src/backend/access/transam/xlogreader.c
index 4a2e7af169..b9dfd4fde7 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -31,6 +31,7 @@
 #include "access/xlogrecord.h"
 #include "catalog/pg_control.h"
 #include "common/pg_lzcompress.h"
+#include "portability/instr_time.h"
 #include "replication/origin.h"
 
 #ifndef FRONTEND
@@ -1488,9 +1489,9 @@ err:
  * When possible, this function reads data directly from WAL buffers.
  */
 bool
-WALRead(XLogReaderState *state,
-               char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
-               WALReadError *errinfo)
+WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count,
+               TimeLineID tli, WALReadError *errinfo, WALReadStats *stats,
+               bool capture_wal_io_timing)
 {
        char       *p;
        XLogRecPtr      recptr;
@@ -1510,10 +1511,33 @@ WALRead(XLogReaderState *state,
        if (!RecoveryInProgress() &&
                tli == GetWALInsertionTimeLine())
        {
+               instr_time      start;
+
+               /* Measure I/O timing to read WAL data if requested by the 
caller. */
+               if (stats != NULL && capture_wal_io_timing)
+                       INSTR_TIME_SET_CURRENT(start);
+
                pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
                XLogReadFromBuffers(startptr, tli, count, buf, &read_bytes);
                pgstat_report_wait_end();
 
+               /* Collect I/O stats if requested by the caller. */
+               if (stats != NULL && read_bytes > 0)
+               {
+                       stats->wal_read_buffers++;
+                       stats->wal_read_bytes_buffers += read_bytes;
+
+                       /* Increment the I/O timing. */
+                       if (capture_wal_io_timing)
+                       {
+                               instr_time      duration;
+
+                               INSTR_TIME_SET_CURRENT(duration);
+                               INSTR_TIME_SUBTRACT(duration, start);
+                               stats->wal_read_time_buffers += 
INSTR_TIME_GET_MICROSEC(duration);
+                       }
+               }
+
                /*
                 * Check if we have read fully (hit), partially (partial hit) or
                 * nothing (miss) from WAL buffers. If we have read either 
partially or
@@ -1549,6 +1573,7 @@ WALRead(XLogReaderState *state,
                uint32          startoff;
                int                     segbytes;
                int                     readbytes;
+               instr_time      start;
 
                startoff = XLogSegmentOffset(recptr, state->segcxt.ws_segsize);
 
@@ -1583,6 +1608,10 @@ WALRead(XLogReaderState *state,
                else
                        segbytes = nbytes;
 
+               /* Measure I/O timing to read WAL data if requested by the 
caller. */
+               if (stats != NULL && capture_wal_io_timing)
+                       INSTR_TIME_SET_CURRENT(start);
+
 #ifndef FRONTEND
                pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
 #endif
@@ -1595,6 +1624,27 @@ WALRead(XLogReaderState *state,
                pgstat_report_wait_end();
 #endif
 
+               /* Collect I/O stats if requested by the caller. */
+               if (stats != NULL)
+               {
+                       /* Increment the number of times WAL is read from disk. 
*/
+                       stats->wal_read++;
+
+                       /* Collect bytes read. */
+                       if (readbytes > 0)
+                               stats->wal_read_bytes += readbytes;
+
+                       /* Increment the I/O timing. */
+                       if (capture_wal_io_timing)
+                       {
+                               instr_time      duration;
+
+                               INSTR_TIME_SET_CURRENT(duration);
+                               INSTR_TIME_SUBTRACT(duration, start);
+                               stats->wal_read_time += 
INSTR_TIME_GET_MICROSEC(duration);
+                       }
+               }
+
                if (readbytes <= 0)
                {
                        errinfo->wre_errno = errno;
diff --git a/src/backend/access/transam/xlogutils.c 
b/src/backend/access/transam/xlogutils.c
index 563cba258d..372de2c7d8 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -1027,7 +1027,7 @@ read_local_xlog_page_guts(XLogReaderState *state, 
XLogRecPtr targetPagePtr,
         * zero-padded up to the page boundary if it's incomplete.
         */
        if (!WALRead(state, cur_page, targetPagePtr, XLOG_BLCKSZ, tli,
-                                &errinfo))
+                                &errinfo, NULL, false))
                WALReadRaiseError(&errinfo);
 
        /* number of valid bytes in the buffer */
diff --git a/src/backend/catalog/system_views.sql 
b/src/backend/catalog/system_views.sql
index 2d8104b090..bf6315df27 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -892,7 +892,13 @@ CREATE VIEW pg_stat_replication AS
             W.replay_lag,
             W.sync_priority,
             W.sync_state,
-            W.reply_time
+            W.reply_time,
+            W.wal_read,
+            W.wal_read_bytes,
+            W.wal_read_time,
+            W.wal_read_buffers,
+            W.wal_read_bytes_buffers,
+            W.wal_read_time_buffers
     FROM pg_stat_get_activity(NULL) AS S
         JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid)
         LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid);
diff --git a/src/backend/replication/walsender.c 
b/src/backend/replication/walsender.c
index c11bb3716f..d3393b2b63 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -259,7 +259,7 @@ static bool TransactionIdInRecentPast(TransactionId xid, 
uint32 epoch);
 
 static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
                                                          TimeLineID *tli_p);
-
+static void WalSndAccumulateWalReadStats(WALReadStats *stats);
 
 /* Initialize walsender process before entering the main command loop */
 void
@@ -907,6 +907,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr 
targetPagePtr, int req
        WALReadError errinfo;
        XLogSegNo       segno;
        TimeLineID      currTLI = GetWALInsertionTimeLine();
+       WALReadStats    stats;
 
        /*
         * Since logical decoding is only permitted on a primary server, we know
@@ -932,6 +933,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr 
targetPagePtr, int req
        else
                count = flushptr - targetPagePtr;       /* part of the page 
available */
 
+       MemSet(&stats, 0, sizeof(WALReadStats));
+
        /* now actually read the data, we know it's there */
        if (!WALRead(state,
                                 cur_page,
@@ -940,9 +943,13 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr 
targetPagePtr, int req
                                 state->seg.ws_tli, /* Pass the current TLI 
because only
                                                                         * 
WalSndSegmentOpen controls whether new
                                                                         * TLI 
is needed. */
-                                &errinfo))
+                                &errinfo,
+                                &stats,
+                                track_wal_io_timing))
                WALReadRaiseError(&errinfo);
 
+       WalSndAccumulateWalReadStats(&stats);
+
        /*
         * After reading into the buffer, check that what we read was valid. We 
do
         * this after reading, because even though the segment was present when 
we
@@ -2610,6 +2617,12 @@ InitWalSenderSlot(void)
                        walsnd->sync_standby_priority = 0;
                        walsnd->latch = &MyProc->procLatch;
                        walsnd->replyTime = 0;
+                       walsnd->wal_read_stats.wal_read = 0;
+                       walsnd->wal_read_stats.wal_read_bytes = 0;
+                       walsnd->wal_read_stats.wal_read_time = 0;
+                       walsnd->wal_read_stats.wal_read_buffers = 0;
+                       walsnd->wal_read_stats.wal_read_bytes_buffers = 0;
+                       walsnd->wal_read_stats.wal_read_time_buffers = 0;
                        SpinLockRelease(&walsnd->mutex);
                        /* don't need the lock anymore */
                        MyWalSnd = (WalSnd *) walsnd;
@@ -2730,6 +2743,7 @@ XLogSendPhysical(void)
        Size            nbytes;
        XLogSegNo       segno;
        WALReadError errinfo;
+       WALReadStats stats;
 
        /* If requested switch the WAL sender to the stopping state. */
        if (got_STOPPING)
@@ -2945,6 +2959,8 @@ XLogSendPhysical(void)
        enlargeStringInfo(&output_message, nbytes);
 
 retry:
+       MemSet(&stats, 0, sizeof(WALReadStats));
+
        if (!WALRead(xlogreader,
                                 &output_message.data[output_message.len],
                                 startptr,
@@ -2952,9 +2968,13 @@ retry:
                                 xlogreader->seg.ws_tli,        /* Pass the 
current TLI because
                                                                                
         * only WalSndSegmentOpen controls
                                                                                
         * whether new TLI is needed. */
-                                &errinfo))
+                                &errinfo,
+                                &stats,
+                                track_wal_io_timing))
                WALReadRaiseError(&errinfo);
 
+       WalSndAccumulateWalReadStats(&stats);
+
        /* See logical_read_xlog_page(). */
        XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize);
        CheckXLogRemoved(segno, xlogreader->seg.ws_tli);
@@ -3458,7 +3478,7 @@ offset_to_interval(TimeOffset offset)
 Datum
 pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_WAL_SENDERS_COLS   12
+#define PG_STAT_GET_WAL_SENDERS_COLS   18
        ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
        SyncRepStandbyData *sync_standbys;
        int                     num_standbys;
@@ -3487,9 +3507,16 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
                WalSndState state;
                TimestampTz replyTime;
                bool            is_sync_standby;
+               int64           wal_read;
+               uint64          wal_read_bytes;
+               int64           wal_read_time;
+               int64           wal_read_buffers;
+               uint64          wal_read_bytes_buffers;
+               int64           wal_read_time_buffers;
                Datum           values[PG_STAT_GET_WAL_SENDERS_COLS];
                bool            nulls[PG_STAT_GET_WAL_SENDERS_COLS] = {0};
                int                     j;
+               char            buf[256];
 
                /* Collect data from shared memory */
                SpinLockAcquire(&walsnd->mutex);
@@ -3509,6 +3536,12 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
                applyLag = walsnd->applyLag;
                priority = walsnd->sync_standby_priority;
                replyTime = walsnd->replyTime;
+               wal_read = walsnd->wal_read_stats.wal_read;
+               wal_read_bytes = walsnd->wal_read_stats.wal_read_bytes;
+               wal_read_time = walsnd->wal_read_stats.wal_read_time;
+               wal_read_buffers = walsnd->wal_read_stats.wal_read_buffers;
+               wal_read_bytes_buffers = 
walsnd->wal_read_stats.wal_read_bytes_buffers;
+               wal_read_time_buffers = 
walsnd->wal_read_stats.wal_read_time_buffers;
                SpinLockRelease(&walsnd->mutex);
 
                /*
@@ -3605,6 +3638,31 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
                                nulls[11] = true;
                        else
                                values[11] = TimestampTzGetDatum(replyTime);
+
+                       values[12] = Int64GetDatum(wal_read);
+
+                       /* Convert to numeric. */
+                       snprintf(buf, sizeof buf, UINT64_FORMAT, 
wal_read_bytes);
+                       values[13] = DirectFunctionCall3(numeric_in,
+                                                                               
         CStringGetDatum(buf),
+                                                                               
         ObjectIdGetDatum(0),
+                                                                               
         Int32GetDatum(-1));
+
+                       /* Convert counter from microsec to millisec for 
display. */
+                       values[14] = Float8GetDatum(((double) wal_read_time) / 
1000.0);
+
+                       values[15] = Int64GetDatum(wal_read_buffers);
+
+                       /* Convert to numeric. */
+                       MemSet(buf, '\0', sizeof buf);
+                       snprintf(buf, sizeof buf, UINT64_FORMAT, 
wal_read_bytes_buffers);
+                       values[16] = DirectFunctionCall3(numeric_in,
+                                                                               
         CStringGetDatum(buf),
+                                                                               
         ObjectIdGetDatum(0),
+                                                                               
         Int32GetDatum(-1));
+
+                       /* Convert counter from microsec to millisec for 
display. */
+                       values[17] = Float8GetDatum(((double) 
wal_read_time_buffers) / 1000.0);
                }
 
                tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
@@ -3849,3 +3907,22 @@ LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
        Assert(time != 0);
        return now - time;
 }
+
+/*
+ * Function to accumulate WAL Read stats for WAL sender.
+ */
+static void
+WalSndAccumulateWalReadStats(WALReadStats *stats)
+{
+       /* Collect I/O stats for walsender. */
+       SpinLockAcquire(&MyWalSnd->mutex);
+       MyWalSnd->wal_read_stats.wal_read += stats->wal_read;
+       MyWalSnd->wal_read_stats.wal_read_bytes += stats->wal_read_bytes;
+       MyWalSnd->wal_read_stats.wal_read_time += stats->wal_read_time;
+       MyWalSnd->wal_read_stats.wal_read_buffers += stats->wal_read_buffers;
+       MyWalSnd->wal_read_stats.wal_read_bytes_buffers +=
+                                                                       
stats->wal_read_bytes_buffers;
+       MyWalSnd->wal_read_stats.wal_read_time_buffers +=
+                                                                       
stats->wal_read_time_buffers;
+       SpinLockRelease(&MyWalSnd->mutex);
+}
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index 9993378ca5..698ce1e9f7 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -364,7 +364,7 @@ WALDumpReadPage(XLogReaderState *state, XLogRecPtr 
targetPagePtr, int reqLen,
        }
 
        if (!WALRead(state, readBuff, targetPagePtr, count, private->timeline,
-                                &errinfo))
+                                &errinfo, NULL, false))
        {
                WALOpenSegment *seg = &errinfo.wre_seg;
                char            fname[MAXPGPATH];
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index e87f91316a..9287114779 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -389,9 +389,33 @@ typedef struct WALReadError
        WALOpenSegment wre_seg;         /* Segment we tried to read from. */
 } WALReadError;
 
-extern bool WALRead(XLogReaderState *state,
-                                       char *buf, XLogRecPtr startptr, Size 
count,
-                                       TimeLineID tli, WALReadError *errinfo);
+/*
+ * WAL read stats from WALRead that the callers can use.
+ */
+typedef struct WALReadStats
+{
+       /* Number of times WAL read from disk. */
+       int64   wal_read;
+
+       /* Total amount of WAL read from disk in bytes. */
+       uint64  wal_read_bytes;
+
+       /* Total amount of time spent reading WAL from disk. */
+       int64   wal_read_time;
+
+       /* Number of times WAL read from WAL buffers. */
+       int64   wal_read_buffers;
+
+       /* Total amount of WAL read from WAL buffers in bytes. */
+       uint64  wal_read_bytes_buffers;
+
+       /* Total amount of time spent reading WAL from WAL buffers. */
+       int64   wal_read_time_buffers;
+} WALReadStats;
+
+extern bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr,
+                                       Size count, TimeLineID tli, 
WALReadError *errinfo,
+                                       WALReadStats *stats, bool 
capture_wal_io_timing);
 
 /* Functions for decoding an XLogRecord */
 
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 7056c95371..706a005c2b 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5391,9 +5391,9 @@
   proname => 'pg_stat_get_wal_senders', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => '',
-  proallargtypes => 
'{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text,timestamptz}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => 
'{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state,reply_time}',
+  proallargtypes => 
'{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text,timestamptz,int8,numeric,float8,int8,numeric,float8}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => 
'{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state,reply_time,wal_read,wal_read_bytes,wal_read_time,wal_read_buffers,wal_read_bytes_buffers,wal_read_time_buffers}',
   prosrc => 'pg_stat_get_wal_senders' },
 { oid => '3317', descr => 'statistics: information about WAL receiver',
   proname => 'pg_stat_get_wal_receiver', proisstrict => 'f', provolatile => 
's',
diff --git a/src/include/replication/walsender_private.h 
b/src/include/replication/walsender_private.h
index 7897c74589..35413ea0d2 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -13,6 +13,7 @@
 #define _WALSENDER_PRIVATE_H
 
 #include "access/xlog.h"
+#include "access/xlogreader.h"
 #include "nodes/nodes.h"
 #include "replication/syncrep.h"
 #include "storage/latch.h"
@@ -78,6 +79,9 @@ typedef struct WalSnd
         * Timestamp of the last message received from standby.
         */
        TimestampTz replyTime;
+
+       /* WAL read stats for walsender. */
+       WALReadStats wal_read_stats;
 } WalSnd;
 
 extern PGDLLIMPORT WalSnd *MyWalSnd;
diff --git a/src/test/regress/expected/rules.out 
b/src/test/regress/expected/rules.out
index fb9f936d43..6ae65981c2 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2054,9 +2054,15 @@ pg_stat_replication| SELECT s.pid,
     w.replay_lag,
     w.sync_priority,
     w.sync_state,
-    w.reply_time
+    w.reply_time,
+    w.wal_read,
+    w.wal_read_bytes,
+    w.wal_read_time,
+    w.wal_read_buffers,
+    w.wal_read_bytes_buffers,
+    w.wal_read_time_buffers
    FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, 
application_name, state, query, wait_event_type, wait_event, xact_start, 
query_start, backend_start, state_change, client_addr, client_hostname, 
client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, 
sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, 
gss_princ, gss_enc, leader_pid, query_id)
-     JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, 
flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, 
sync_state, reply_time) ON ((s.pid = w.pid)))
+     JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, 
flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, 
sync_state, reply_time, wal_read, wal_read_bytes, wal_read_time, 
wal_read_buffers, wal_read_bytes_buffers, wal_read_time_buffers) ON ((s.pid = 
w.pid)))
      LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
 pg_stat_replication_slots| SELECT s.slot_name,
     s.spill_txns,
-- 
2.34.1

Reply via email to