From 65c54baa51a26a0189024b01cef21d495e5a49ea Mon Sep 17 00:00:00 2001
From: Amit Kapila <akapila@postgresql.org>
Date: Thu, 8 Oct 2020 18:44:22 +0530
Subject: [PATCH v2] Track streaming stats.

---
 doc/src/sgml/monitoring.sgml                    | 34 +++++++++++++++++++++++++
 src/backend/catalog/system_views.sql            |  3 +++
 src/backend/postmaster/pgstat.c                 |  8 +++++-
 src/backend/replication/logical/logical.c       | 19 +++++++++-----
 src/backend/replication/logical/reorderbuffer.c | 10 ++++++++
 src/backend/replication/slot.c                  |  2 +-
 src/backend/utils/adt/pgstatfuncs.c             |  9 ++++---
 src/include/catalog/pg_proc.dat                 |  6 ++---
 src/include/pgstat.h                            |  8 +++++-
 src/include/replication/reorderbuffer.h         |  3 +++
 src/test/regress/expected/rules.out             |  5 +++-
 11 files changed, 91 insertions(+), 16 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 6656676..b182a9e 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -2631,6 +2631,40 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>stream_txns</structfield> <type>bigint</type>
+       </para>
+       <para>
+        Number of in-progress transactions streamed to subscriber after
+        memory used by logical decoding exceeds <literal>logical_decoding_work_mem</literal>.
+        Streaming only works with toplevel transactions (subtransactions can't
+        be streamed independently), so the counter does not get incremented for
+        subtransactions.
+       </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>stream_count</structfield><type>bigint</type>
+       </para>
+       <para>
+        Number of times in-progress transactions were streamed to subscriber.
+        Transactions may get streamed repeatedly, and this counter gets incremented
+        on every such invocation.
+       </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>stream_bytes</structfield><type>bigint</type>
+       </para>
+       <para>
+        Amount of decoded in-progress transaction data streamed to subscriber.
+       </para>
+      </entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
         <structfield>stats_reset</structfield> <type>timestamp with time zone</type>
        </para>
        <para>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index c293907..dd5584f 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -802,6 +802,9 @@ CREATE VIEW pg_stat_replication_slots AS
             s.spill_txns,
             s.spill_count,
             s.spill_bytes,
+            s.stream_txns,
+            s.stream_count,
+            s.stream_bytes,
             s.stats_reset
     FROM pg_stat_get_replication_slots() AS s;
 
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 822f0eb..f7a9dac 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -1708,7 +1708,7 @@ pgstat_report_tempfile(size_t filesize)
  */
 void
 pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount,
-					   int spillbytes)
+					   int spillbytes, int streamtxns, int streamcount, int  streambytes)
 {
 	PgStat_MsgReplSlot msg;
 
@@ -1721,6 +1721,9 @@ pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount,
 	msg.m_spill_txns = spilltxns;
 	msg.m_spill_count = spillcount;
 	msg.m_spill_bytes = spillbytes;
+	msg.m_stream_txns = streamtxns;
+	msg.m_stream_count = streamcount;
+	msg.m_stream_bytes = streambytes;
 	pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
 }
 
@@ -6892,6 +6895,9 @@ pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len)
 		replSlotStats[idx].spill_txns += msg->m_spill_txns;
 		replSlotStats[idx].spill_count += msg->m_spill_count;
 		replSlotStats[idx].spill_bytes += msg->m_spill_bytes;
+		replSlotStats[idx].stream_txns += msg->m_stream_txns;
+		replSlotStats[idx].stream_count += msg->m_stream_count;
+		replSlotStats[idx].stream_bytes += msg->m_stream_bytes;
 	}
 }
 
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 3346df3..0e5ede1 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1471,21 +1471,28 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 	ReorderBuffer *rb = ctx->reorder;
 
 	/*
-	 * Nothing to do if we haven't spilled anything since the last time the
-	 * stats has been sent.
+	 * Nothing to do if we haven't spilled or streamed anything since the last
+	 * time the stats has been sent.
 	 */
-	if (rb->spillBytes <= 0)
+	if (rb->spillBytes <= 0 && rb->streamBytes <= 0)
 		return;
 
-	elog(DEBUG2, "UpdateSpillStats: updating stats %p %lld %lld %lld",
+	elog(DEBUG2, "UpdateSpillStats: updating stats %p %lld %lld %lld %lld %lld %lld",
 		 rb,
 		 (long long) rb->spillTxns,
 		 (long long) rb->spillCount,
-		 (long long) rb->spillBytes);
+		 (long long) rb->spillBytes,
+		 (long long) rb->streamTxns,
+		 (long long) rb->streamCount,
+		 (long long) rb->streamBytes);
 
 	pgstat_report_replslot(NameStr(ctx->slot->data.name),
-						   rb->spillTxns, rb->spillCount, rb->spillBytes);
+						   rb->spillTxns, rb->spillCount, rb->spillBytes,
+						   rb->streamTxns, rb->streamCount, rb->streamBytes);
 	rb->spillTxns = 0;
 	rb->spillCount = 0;
 	rb->spillBytes = 0;
+	rb->streamTxns = 0;
+	rb->streamCount = 0;
+	rb->streamBytes = 0;
 }
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 189641b..28d3980 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -346,6 +346,9 @@ ReorderBufferAllocate(void)
 	buffer->spillTxns = 0;
 	buffer->spillCount = 0;
 	buffer->spillBytes = 0;
+	buffer->streamTxns = 0;
+	buffer->streamCount = 0;
+	buffer->streamBytes = 0;
 
 	buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
 
@@ -3520,6 +3523,13 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 		txn->snapshot_now = NULL;
 	}
 
+
+	rb->streamCount += 1;
+	rb->streamBytes += txn->total_size;
+
+	/* Don't consider already streamed transaction. */
+	rb->streamTxns += (rbtxn_is_streamed(txn)) ? 0 : 1;
+
 	/* Process and send the changes to output plugin. */
 	ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
 							command_id, true);
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 220b4cd..09be1d8 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -320,7 +320,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	 * ReplicationSlotAllocationLock.
 	 */
 	if (SlotIsLogical(slot))
-		pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0);
+		pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0, 0, 0, 0);
 
 	/*
 	 * Now that the slot has been marked as in_use and active, it's safe to
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 0d0d2e6..ae87bc1 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2153,7 +2153,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_REPLICATION_SLOT_CLOS 5
+#define PG_STAT_GET_REPLICATION_SLOT_CLOS 8
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	TupleDesc	tupdesc;
 	Tuplestorestate *tupstore;
@@ -2201,11 +2201,14 @@ pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
 		values[1] = Int64GetDatum(s->spill_txns);
 		values[2] = Int64GetDatum(s->spill_count);
 		values[3] = Int64GetDatum(s->spill_bytes);
+		values[4] = Int64GetDatum(s->stream_txns);
+		values[5] = Int64GetDatum(s->stream_count);
+		values[6] = Int64GetDatum(s->stream_bytes);
 
 		if (s->stat_reset_timestamp == 0)
-			nulls[4] = true;
+			nulls[7] = true;
 		else
-			values[4] = TimestampTzGetDatum(s->stat_reset_timestamp);
+			values[7] = TimestampTzGetDatum(s->stat_reset_timestamp);
 
 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
 	}
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 22340ba..1b64aa8 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5261,9 +5261,9 @@
   proname => 'pg_stat_get_replication_slots', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => '',
-  proallargtypes => '{text,int8,int8,int8,timestamptz}',
-  proargmodes => '{o,o,o,o,o}',
-  proargnames => '{name,spill_txns,spill_count,spill_bytes,stats_reset}',
+  proallargtypes => '{text,int8,int8,int8,int8,int8,int8,timestamptz}',
+  proargmodes => '{o,o,o,o,o,o,o,o}',
+  proargnames => '{name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,stats_reset}',
   prosrc => 'pg_stat_get_replication_slots' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', proisstrict => 'f', provolatile => 's',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index a821ff4..960533b 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -492,6 +492,9 @@ typedef struct PgStat_MsgReplSlot
 	PgStat_Counter m_spill_txns;
 	PgStat_Counter m_spill_count;
 	PgStat_Counter m_spill_bytes;
+	PgStat_Counter	m_stream_txns;
+	PgStat_Counter	m_stream_count;
+	PgStat_Counter	m_stream_bytes;
 } PgStat_MsgReplSlot;
 
 
@@ -823,6 +826,9 @@ typedef struct PgStat_ReplSlotStats
 	PgStat_Counter spill_txns;
 	PgStat_Counter spill_count;
 	PgStat_Counter spill_bytes;
+	PgStat_Counter stream_txns;
+	PgStat_Counter stream_count;
+	PgStat_Counter stream_bytes;
 	TimestampTz stat_reset_timestamp;
 } PgStat_ReplSlotStats;
 
@@ -1387,7 +1393,7 @@ extern void pgstat_report_deadlock(void);
 extern void pgstat_report_checksum_failures_in_db(Oid dboid, int failurecount);
 extern void pgstat_report_checksum_failure(void);
 extern void pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount,
-								   int spillbytes);
+								   int spillbytes, int streamtxns, int streamcount, int streambytes);
 extern void pgstat_report_replslot_drop(const char *slotname);
 
 extern void pgstat_initialize(void);
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 0cc3aeb..018ad4c 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -543,6 +543,9 @@ struct ReorderBuffer
 	int64		spillTxns;		/* number of transactions spilled to disk */
 	int64		spillCount;		/* spill-to-disk invocation counter */
 	int64		spillBytes;		/* amount of data spilled to disk */
+	int64		streamTxns;		/* number of transactions spilled to disk */
+	int64		streamCount;	/* streaming invocation counter */
+	int64		streamBytes;	/* amount of data streamed to subscriber */
 };
 
 
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index cf2a9b4..576166d 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2022,8 +2022,11 @@ pg_stat_replication_slots| SELECT s.name,
     s.spill_txns,
     s.spill_count,
     s.spill_bytes,
+    s.stream_txns,
+    s.stream_count,
+    s.stream_bytes,
     s.stats_reset
-   FROM pg_stat_get_replication_slots() s(name, spill_txns, spill_count, spill_bytes, stats_reset);
+   FROM pg_stat_get_replication_slots() s(name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, stats_reset);
 pg_stat_slru| SELECT s.name,
     s.blks_zeroed,
     s.blks_hit,
-- 
1.8.3.1

