From f385b89e938efaeab201abddf507fa47e5c43a1e Mon Sep 17 00:00:00 2001
From: Amit Kapila <akapila@postgresql.org>
Date: Wed, 14 Oct 2020 08:28:35 +0530
Subject: [PATCH v4] Track statistics for streaming of changes from
 ReorderBuffer.

This adds the statistics about transactions streamed to the decoding
output plugin from ReorderBuffer. Users can query the
pg_stat_replication_slots view to check these stats and call
pg_stat_reset_replication_slot to reset the stats of a particular slot.
Users can pass NULL in pg_stat_reset_replication_slot to reset stats of
all the slots.

Commit 9868167500 has added the basic infrastructure to capture the stats
of slot and this commit extends the statistics collector to track
additional information about slots.

Author: Ajin Cherian and Amit Kapila
---
 doc/src/sgml/monitoring.sgml                  | 34 +++++++++++++++++++
 src/backend/catalog/system_views.sql          |  3 ++
 src/backend/postmaster/pgstat.c               | 11 +++++-
 src/backend/replication/logical/logical.c     | 19 +++++++----
 .../replication/logical/reorderbuffer.c       | 20 +++++++++++
 src/backend/replication/slot.c                |  2 +-
 src/backend/utils/adt/pgstatfuncs.c           |  9 +++--
 src/include/catalog/pg_proc.dat               |  6 ++--
 src/include/pgstat.h                          |  8 ++++-
 src/include/replication/reorderbuffer.h       |  3 ++
 src/test/regress/expected/rules.out           |  5 ++-
 11 files changed, 104 insertions(+), 16 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 66566765f0..5b5222e3fa 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -2629,6 +2629,40 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>stream_txns</structfield> <type>bigint</type>
+       </para>
+       <para>
+        Number of in-progress transactions streamed to the decoding output plugin
+        after memory used by logical decoding exceeds
+        <literal>logical_decoding_work_mem</literal>. Streaming only works with
+        toplevel transactions (subtransactions can't be streamed independently),
+        so the counter does not get incremented for subtransactions.
+       </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>stream_count</structfield><type>bigint</type>
+       </para>
+       <para>
+        Number of times in-progress transactions were streamed to subscriber.
+        Transactions may get streamed repeatedly, and this counter gets incremented
+        on every such invocation.
+       </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>stream_bytes</structfield><type>bigint</type>
+       </para>
+       <para>
+        Amount of decoded in-progress transaction data streamed to subscriber.
+       </para>
+      </entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
         <structfield>stats_reset</structfield> <type>timestamp with time zone</type>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index c29390760f..dd5584f1d0 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -802,6 +802,9 @@ CREATE VIEW pg_stat_replication_slots AS
             s.spill_txns,
             s.spill_count,
             s.spill_bytes,
+            s.stream_txns,
+            s.stream_count,
+            s.stream_bytes,
             s.stats_reset
     FROM pg_stat_get_replication_slots() AS s;
 
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 822f0ebc62..b3513f3943 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -1708,7 +1708,7 @@ pgstat_report_tempfile(size_t filesize)
  */
 void
 pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount,
-					   int spillbytes)
+					   int spillbytes, int streamtxns, int streamcount, int  streambytes)
 {
 	PgStat_MsgReplSlot msg;
 
@@ -1721,6 +1721,9 @@ pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount,
 	msg.m_spill_txns = spilltxns;
 	msg.m_spill_count = spillcount;
 	msg.m_spill_bytes = spillbytes;
+	msg.m_stream_txns = streamtxns;
+	msg.m_stream_count = streamcount;
+	msg.m_stream_bytes = streambytes;
 	pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
 }
 
@@ -6892,6 +6895,9 @@ pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len)
 		replSlotStats[idx].spill_txns += msg->m_spill_txns;
 		replSlotStats[idx].spill_count += msg->m_spill_count;
 		replSlotStats[idx].spill_bytes += msg->m_spill_bytes;
+		replSlotStats[idx].stream_txns += msg->m_stream_txns;
+		replSlotStats[idx].stream_count += msg->m_stream_count;
+		replSlotStats[idx].stream_bytes += msg->m_stream_bytes;
 	}
 }
 
@@ -7125,6 +7131,9 @@ pgstat_reset_replslot(int i, TimestampTz ts)
 	replSlotStats[i].spill_txns = 0;
 	replSlotStats[i].spill_count = 0;
 	replSlotStats[i].spill_bytes = 0;
+	replSlotStats[i].stream_txns = 0;
+	replSlotStats[i].stream_count = 0;
+	replSlotStats[i].stream_bytes = 0;
 	replSlotStats[i].stat_reset_timestamp = ts;
 }
 
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 8675832f4d..d5cfbeaa4a 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1471,21 +1471,28 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 	ReorderBuffer *rb = ctx->reorder;
 
 	/*
-	 * Nothing to do if we haven't spilled anything since the last time the
-	 * stats has been sent.
+	 * Nothing to do if we haven't spilled or streamed anything since the last
+	 * time the stats has been sent.
 	 */
-	if (rb->spillBytes <= 0)
+	if (rb->spillBytes <= 0 && rb->streamBytes <= 0)
 		return;
 
-	elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld",
+	elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld",
 		 rb,
 		 (long long) rb->spillTxns,
 		 (long long) rb->spillCount,
-		 (long long) rb->spillBytes);
+		 (long long) rb->spillBytes,
+		 (long long) rb->streamTxns,
+		 (long long) rb->streamCount,
+		 (long long) rb->streamBytes);
 
 	pgstat_report_replslot(NameStr(ctx->slot->data.name),
-						   rb->spillTxns, rb->spillCount, rb->spillBytes);
+						   rb->spillTxns, rb->spillCount, rb->spillBytes,
+						   rb->streamTxns, rb->streamCount, rb->streamBytes);
 	rb->spillTxns = 0;
 	rb->spillCount = 0;
 	rb->spillBytes = 0;
+	rb->streamTxns = 0;
+	rb->streamCount = 0;
+	rb->streamBytes = 0;
 }
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 4cb27f2224..8585d1d6c7 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -346,6 +346,9 @@ ReorderBufferAllocate(void)
 	buffer->spillTxns = 0;
 	buffer->spillCount = 0;
 	buffer->spillBytes = 0;
+	buffer->streamTxns = 0;
+	buffer->streamCount = 0;
+	buffer->streamBytes = 0;
 
 	buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
 
@@ -3440,6 +3443,8 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 {
 	Snapshot	snapshot_now;
 	CommandId	command_id;
+	Size		stream_bytes;
+	bool		txn_is_streamed;
 
 	/* We can never reach here for a subtransaction. */
 	Assert(txn->toptxn == NULL);
@@ -3520,10 +3525,25 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 		txn->snapshot_now = NULL;
 	}
 
+	/*
+	 * Remember this information to be used later to update stats. We can't
+	 * update the stats here as an error while processing the changes would
+	 * lead to the accumulation of stats even though we haven't streamed all
+	 * the changes.
+	 */
+	txn_is_streamed = rbtxn_is_streamed(txn);
+	stream_bytes = txn->total_size;
+
 	/* Process and send the changes to output plugin. */
 	ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
 							command_id, true);
 
+	rb->streamCount += 1;
+	rb->streamBytes += stream_bytes;
+
+	/* Don't consider already streamed transaction. */
+	rb->streamTxns += (txn_is_streamed) ? 0 : 1;
+
 	Assert(dlist_is_empty(&txn->changes));
 	Assert(txn->nentries == 0);
 	Assert(txn->nentries_mem == 0);
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 220b4cd6e9..09be1d8c48 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -320,7 +320,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	 * ReplicationSlotAllocationLock.
 	 */
 	if (SlotIsLogical(slot))
-		pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0);
+		pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0, 0, 0, 0);
 
 	/*
 	 * Now that the slot has been marked as in_use and active, it's safe to
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 0d0d2e6d2b..ae87bc1953 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2153,7 +2153,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_REPLICATION_SLOT_CLOS 5
+#define PG_STAT_GET_REPLICATION_SLOT_CLOS 8
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	TupleDesc	tupdesc;
 	Tuplestorestate *tupstore;
@@ -2201,11 +2201,14 @@ pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
 		values[1] = Int64GetDatum(s->spill_txns);
 		values[2] = Int64GetDatum(s->spill_count);
 		values[3] = Int64GetDatum(s->spill_bytes);
+		values[4] = Int64GetDatum(s->stream_txns);
+		values[5] = Int64GetDatum(s->stream_count);
+		values[6] = Int64GetDatum(s->stream_bytes);
 
 		if (s->stat_reset_timestamp == 0)
-			nulls[4] = true;
+			nulls[7] = true;
 		else
-			values[4] = TimestampTzGetDatum(s->stat_reset_timestamp);
+			values[7] = TimestampTzGetDatum(s->stat_reset_timestamp);
 
 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
 	}
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 22340baf1c..1b64aa831e 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5261,9 +5261,9 @@
   proname => 'pg_stat_get_replication_slots', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => '',
-  proallargtypes => '{text,int8,int8,int8,timestamptz}',
-  proargmodes => '{o,o,o,o,o}',
-  proargnames => '{name,spill_txns,spill_count,spill_bytes,stats_reset}',
+  proallargtypes => '{text,int8,int8,int8,int8,int8,int8,timestamptz}',
+  proargmodes => '{o,o,o,o,o,o,o,o}',
+  proargnames => '{name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,stats_reset}',
   prosrc => 'pg_stat_get_replication_slots' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', proisstrict => 'f', provolatile => 's',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index a821ff4f15..960533beb2 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -492,6 +492,9 @@ typedef struct PgStat_MsgReplSlot
 	PgStat_Counter m_spill_txns;
 	PgStat_Counter m_spill_count;
 	PgStat_Counter m_spill_bytes;
+	PgStat_Counter	m_stream_txns;
+	PgStat_Counter	m_stream_count;
+	PgStat_Counter	m_stream_bytes;
 } PgStat_MsgReplSlot;
 
 
@@ -823,6 +826,9 @@ typedef struct PgStat_ReplSlotStats
 	PgStat_Counter spill_txns;
 	PgStat_Counter spill_count;
 	PgStat_Counter spill_bytes;
+	PgStat_Counter stream_txns;
+	PgStat_Counter stream_count;
+	PgStat_Counter stream_bytes;
 	TimestampTz stat_reset_timestamp;
 } PgStat_ReplSlotStats;
 
@@ -1387,7 +1393,7 @@ extern void pgstat_report_deadlock(void);
 extern void pgstat_report_checksum_failures_in_db(Oid dboid, int failurecount);
 extern void pgstat_report_checksum_failure(void);
 extern void pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount,
-								   int spillbytes);
+								   int spillbytes, int streamtxns, int streamcount, int streambytes);
 extern void pgstat_report_replslot_drop(const char *slotname);
 
 extern void pgstat_initialize(void);
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 0cc3aebb11..84d67db32e 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -543,6 +543,9 @@ struct ReorderBuffer
 	int64		spillTxns;		/* number of transactions spilled to disk */
 	int64		spillCount;		/* spill-to-disk invocation counter */
 	int64		spillBytes;		/* amount of data spilled to disk */
+	int64		streamTxns;		/* number of transactions streamed to the decoding output plugin */
+	int64		streamCount;	/* streaming invocation counter */
+	int64		streamBytes;	/* amount of data streamed to subscriber */
 };
 
 
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index cf2a9b4408..576166dac5 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2022,8 +2022,11 @@ pg_stat_replication_slots| SELECT s.name,
     s.spill_txns,
     s.spill_count,
     s.spill_bytes,
+    s.stream_txns,
+    s.stream_count,
+    s.stream_bytes,
     s.stats_reset
-   FROM pg_stat_get_replication_slots() s(name, spill_txns, spill_count, spill_bytes, stats_reset);
+   FROM pg_stat_get_replication_slots() s(name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, stats_reset);
 pg_stat_slru| SELECT s.name,
     s.blks_zeroed,
     s.blks_hit,
-- 
2.28.0.windows.1

