On Fri, Apr 2, 2021 at 9:29 AM Masahiko Sawada <sawada.m...@gmail.com> wrote:
>
> On Fri, Apr 2, 2021 at 1:55 AM vignesh C <vignes...@gmail.com> wrote:
> >
> > On Thu, Apr 1, 2021 at 5:58 PM Amit Kapila <amit.kapil...@gmail.com> wrote:
> > >
> > > On Thu, Apr 1, 2021 at 3:43 PM vignesh C <vignes...@gmail.com> wrote:
> > > >
> > > > On Wed, Mar 31, 2021 at 11:32 AM vignesh C <vignes...@gmail.com> wrote:
> > > > >
> > > > > On Tue, Mar 30, 2021 at 11:00 AM Andres Freund <and...@anarazel.de> 
> > > > > wrote:
> > > > > >
> > > > > > Hi,
> > > > > >
> > > > > > On 2021-03-30 10:13:29 +0530, vignesh C wrote:
> > > > > > > On Tue, Mar 30, 2021 at 6:28 AM Andres Freund 
> > > > > > > <and...@anarazel.de> wrote:
> > > > > > > > Any chance you could write a tap test exercising a few of these 
> > > > > > > > cases?
> > > > > > >
> > > > > > > I can try to write a patch for this if nobody objects.
> > > > > >
> > > > > > Cool!
> > > > > >
> > > > >
> > > > > Attached a patch which has the test for the first scenario.
> > > > >
> > > > > > > > E.g. things like:
> > > > > > > >
> > > > > > > > - create a few slots, drop one of them, shut down, start up, 
> > > > > > > > verify
> > > > > > > >   stats are still sane
> > > > > > > > - create a few slots, shut down, manually remove a slot, lower
> > > > > > > >   max_replication_slots, start up
> > > > > > >
> > > > > > > Here by "manually remove a slot", do you mean to remove the slot
> > > > > > > manually from the pg_replslot folder?
> > > > > >
> > > > > > Yep - thereby allowing max_replication_slots after the 
> > > > > > shutdown/start to
> > > > > > be lower than the number of slots-stats objects.
> > > > >
> > > > > I have not included the 2nd test in the patch as the test fails with
> > > > > following warnings and also displays the statistics of the removed
> > > > > slot:
> > > > > WARNING:  problem in alloc set Statistics snapshot: detected write
> > > > > past chunk end in block 0x55d038b8e410, chunk 0x55d038b8e438
> > > > > WARNING:  problem in alloc set Statistics snapshot: detected write
> > > > > past chunk end in block 0x55d038b8e410, chunk 0x55d038b8e438
> > > > >
> > > > > This happens because the statistics file has an additional slot
> > > > > present even though the replication slot was removed.  I felt this
> > > > > issue should be fixed. I will try to fix this issue and send the
> > > > > second test along with the fix.
> > > >
> > > > I felt from the statistics collector process, there is no way in which
> > > > we can identify if the replication slot is present or not because the
> > > > statistic collector process does not have access to shared memory.
> > > > Anything that the statistic collector process does independently by
> > > > traversing and removing the statistics of the replication slot
> > > > exceeding the max_replication_slot has its drawback of removing some
> > > > valid replication slot's statistics data.
> > > > Any thoughts on how we can identify the replication slot which has been 
> > > > dropped?
> > > > Can someone point me to the shared stats patch link with which message
> > > > loss can be avoided. I wanted to see a scenario where something like
> > > > the slot is dropped but the statistics are not updated because of an
> > > > immediate shutdown or server going down abruptly can occur or not with
> > > > the shared stats patch.
> > > >
> > >
> > > I don't think it is easy to simulate a scenario where the 'drop'
> > > message is dropped and I think that is why the test contains the step
> > > to manually remove the slot. At this stage, you can probably provide a
> > > test patch and a code-fix patch where it just drops the extra slots
> > > from the stats file. That will allow us to test it with a shared
> > > memory stats patch on which Andres and Horiguchi-San are working. If
> > > we still continue to pursue with current approach then as Andres
> > > suggested we might send additional information from
> > > RestoreSlotFromDisk to keep it in sync.
> >
> > Thanks for your comments, Attached patch has the fix for the same.
> > Also attached a couple of more patches which addresses the comments
> > which Andres had listed i.e changing char to NameData type and also to
> > display the unspilled/unstreamed transaction information in the
> > replication statistics.
> > Thoughts?
>
> Thank you for the patches!
>
> I've looked at those patches and here are some comments on 0001, 0002,
> and 0003 patch:

Thanks for the comments.

> 0001 patch:
>
> -       values[0] = PointerGetDatum(cstring_to_text(s->slotname));
> +       values[0] = PointerGetDatum(cstring_to_text(s->slotname.data));
>
> We can use NameGetDatum() instead.

I felt we will not be able to use NameGetDatum because this function
will not have access to the value throughout the loop and NameGetDatum
must ensure the pointed-to value has adequate lifetime.

> ---
> 0002 patch:
>
> The patch uses logical replication to test replication slots
> statistics but I think it's necessarily necessary. It would be more
> simple to use logical decoding. Maybe we can add TAP tests to
> contrib/test_decoding.
>

I will try to change it to test_decoding if feasible and post in the
next version.

> ---
> 0003 patch:
>
>  void
>  pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount,
> -                      int spillbytes, int streamtxns, int
> streamcount, int streambytes)
> +                      int spillbytes, int streamtxns, int streamcount,
> +                      int streambytes, int totaltxns, int totalbytes)
>  {
>
> As Andreas pointed out, we should use a struct of stats updates rather
> than adding more arguments to pgstat_report_replslot().
>

Modified as suggested.

> ---
> +     <row>
> +      <entry role="catalog_table_entry"><para role="column_definition">
> +        <structfield>total_bytes</structfield><type>bigint</type>
> +       </para>
> +       <para>
> +        Amount of decoded in-progress transaction data replicated to
> the decoding
> +        output plugin while decoding changes from WAL for this slot.
> This and other
> +        counters for this slot can be used to gauge the network I/O
> which occurred
> +        during logical decoding and allow tuning
> <literal>logical_decoding_work_mem</literal>.
> +       </para>
> +      </entry>
> +     </row>
>
> As I mentioned in another reply, I think users should not gauge the
> network I/O which occurred during logical decoding using by those
> counters since the actual amount of network I/O is affected by table
> filtering and row filtering discussed on another thread[1]. Also,
> since this is total bytes I'm not sure how users can use this value to
> tune logical_decoding_work_mem. I agree to track both the total bytes
> and the total number of transactions passed to the decoding plugin but
> I think the description needs to be updated. How about the following
> description for example?
>
> Amount of decoded transaction data sent to the decoding output plugin
> while decoding changes from WAL for this slot. This and total_txn for
> this slot can be used to gauge the total amount of data during logical
> decoding.
>

Modified as suggested.

> ---
> I think we can merge 0001 and 0003 patches.

I have merged them.
Attached V2 patch which has the fixes for the same.
Thoughts?

Regards,
Vignesh
From 846b0fea8b1c88d3b818f37cd382cc8a2e3fb57d Mon Sep 17 00:00:00 2001
From: vignesh <vignes...@gmail.com>
Date: Thu, 1 Apr 2021 20:50:10 +0530
Subject: [PATCH v2 3/3] Handle overwriting of replication slot statistic
 issue.

There is a remote scenario where one of the replication slots is dropped and
the drop slot statistics message is not received by the statistic collector
process, now if the max_replication_slots is reduced to the actual number of
replication slots that are in use and the publisher is re-started then the
statistics process will not be aware of this and the statistic collector
process will write beyond the slots available, fixed it by skipping the
replication slot statistic that are after max_replication_slot.
---
 src/backend/postmaster/pgstat.c           | 20 ++++++++
 src/test/subscription/t/020_repl_stats.pl | 57 +++++++++++++++++++++--
 2 files changed, 74 insertions(+), 3 deletions(-)

diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 1a54477c83..e27cd4d339 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -5122,6 +5122,26 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
 					memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats));
 					goto done;
 				}
+
+				/*
+				 * There is a remote scenario where one of the replication slots
+				 * is dropped and the drop slot statistics message is not
+				 * received by the statistic collector process, now if the
+				 * max_replication_slots is reduced to the actual number of
+				 * replication slots that are in use and the publisher is
+				 * re-started then the statistics process will not be aware of
+				 * this. To avoid writing beyond the max_replication_slots
+				 * this replication slot statistic information will be skipped.
+				 */
+				if (max_replication_slots == nReplSlotStats)
+				{
+					ereport(pgStatRunningInCollector ? LOG : WARNING,
+							(errmsg("skipping \"%s\" replication slot statistics as pg_stat_replication_slots does not have enough slots",
+									NameStr(replSlotStats[nReplSlotStats].slotname))));
+					memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats));
+					goto done;
+				}
+
 				nReplSlotStats++;
 				break;
 
diff --git a/src/test/subscription/t/020_repl_stats.pl b/src/test/subscription/t/020_repl_stats.pl
index 4be58d417a..b0674f2d41 100644
--- a/src/test/subscription/t/020_repl_stats.pl
+++ b/src/test/subscription/t/020_repl_stats.pl
@@ -2,9 +2,10 @@
 # view for logical replication.
 use strict;
 use warnings;
+use File::Path qw(rmtree);
 use PostgresNode;
 use TestLib;
-use Test::More tests => 5;
+use Test::More tests => 7;
 
 # Create publisher node.
 my $node_publisher = get_new_node('publisher');
@@ -25,6 +26,10 @@ my $node_subscriber3 = get_new_node('subscriber3');
 $node_subscriber3->init(allows_streaming => 'logical');
 $node_subscriber3->start;
 
+my $node_subscriber4 = get_new_node('subscriber4');
+$node_subscriber4->init(allows_streaming => 'logical');
+$node_subscriber4->start;
+
 # Create table on publisher.
 $node_publisher->safe_psql('postgres',
         "CREATE TABLE test_tab (a int primary key, b varchar)");
@@ -33,6 +38,7 @@ $node_publisher->safe_psql('postgres',
 $node_subscriber1->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)");
 $node_subscriber2->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)");
 $node_subscriber3->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)");
+$node_subscriber4->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)");
 
 # Setup logical replication.
 my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
@@ -62,9 +68,18 @@ $node_subscriber3->safe_psql('postgres',
 	PUBLICATION tap_pub WITH (streaming = off)"
 );
 
+my $appname4 = 'tap_sub4';
+$node_subscriber4->safe_psql('postgres',
+        "CREATE SUBSCRIPTION tap_sub4
+        CONNECTION '$publisher_connstr
+        application_name=$appname4'
+        PUBLICATION tap_pub WITH (streaming = off)"
+);
+
 $node_publisher->wait_for_catchup($appname1);
 $node_publisher->wait_for_catchup($appname2);
 $node_publisher->wait_for_catchup($appname3);
+$node_publisher->wait_for_catchup($appname4);
 
 # Interleave a pair of transactions, each exceeding the 64kB limit.
 my $in  = '';
@@ -100,6 +115,7 @@ $h->finish;    # errors make the next test fail, so ignore them here
 $node_publisher->wait_for_catchup($appname1);
 $node_publisher->wait_for_catchup($appname2);
 $node_publisher->wait_for_catchup($appname3);
+$node_publisher->wait_for_catchup($appname4);
 
 # Verify data is replicated to the subscribers.
 my $result =
@@ -114,6 +130,10 @@ $result =
   $node_subscriber3->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
 is($result, qq(3332|3332|3332), 'check publisher data is replicated to the subscriber');
 
+$result =
+  $node_subscriber4->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3332|3332|3332), 'check publisher data is replicated to the subscriber');
+
 # Test to verify replication statistics data is updated in
 # pg_stat_replication_slots statistics view.
 $result = $node_publisher->safe_psql('postgres',
@@ -129,13 +149,43 @@ $result = $node_publisher->safe_psql('postgres',
 );
 is($result, qq(tap_sub1|f|f|f|t|t|t|
 tap_sub2|t|t|t|f|f|f|
-tap_sub3|t|t|t|f|f|f|), 'check replication statistics are updated');
+tap_sub3|t|t|t|f|f|f|
+tap_sub4|t|t|t|f|f|f|), 'check replication statistics are updated');
 
 # Test to drop one of the subscribers and verify replication statistics data is
 # fine after publisher is restarted.
-$node_subscriber3->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub3;");
+$node_subscriber4->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub4;");
+
+$node_publisher->stop;
+$node_publisher->start;
+
+# Verify statistics data present in pg_stat_replication_slots are sane after
+# publisher is restarted
+$result = $node_publisher->safe_psql('postgres',
+        "SELECT slot_name,
+                spill_txns > 0 AS spill_txns,
+                spill_count > 0 AS spill_count,
+                spill_bytes > 0 AS spill_bytes,
+                stream_txns > 0 AS stream_txns,
+                stream_count > 0 AS stream_count,
+                stream_bytes > 0 AS stream_bytes,
+                stats_reset
+        FROM pg_stat_replication_slots ORDER BY slot_name"
+);
+is($result, qq(tap_sub1|f|f|f|t|t|t|
+tap_sub2|t|t|t|f|f|f|
+tap_sub3|t|t|t|f|f|f|), 'check replication statistics are updated');
 
+# Test to remove one of the replication slots and adjust max_replication_slots
+# accordingly to the number of slots and verify replication statistics data is
+# fine after publisher is restarted.
 $node_publisher->stop;
+my $publisher_data = $node_publisher->data_dir;
+my $subscriber3_replslotdir = "$publisher_data/pg_replslot/tap_sub3";
+
+rmtree($subscriber3_replslotdir);
+
+$node_publisher->append_conf('postgresql.conf', 'max_replication_slots = 2');
 $node_publisher->start;
 
 # Verify statistics data present in pg_stat_replication_slots are sane after
@@ -158,4 +208,5 @@ tap_sub2|t|t|t|f|f|f|), 'check replication statistics are updated');
 $node_subscriber1->stop;
 $node_subscriber2->stop;
 $node_subscriber3->stop;
+$node_subscriber4->stop;
 $node_publisher->stop;
-- 
2.25.1

From 0db04f815adcdda32355c71a8ea03c71e1665e87 Mon Sep 17 00:00:00 2001
From: vignesh <vignes...@gmail.com>
Date: Thu, 1 Apr 2021 19:47:01 +0530
Subject: [PATCH v2 2/3] Added tests for verification of logical replication
 statistics.

Added tests for verification of logical replication statistics after
restart of server.
---
 src/test/subscription/t/020_repl_stats.pl | 161 ++++++++++++++++++++++
 1 file changed, 161 insertions(+)
 create mode 100644 src/test/subscription/t/020_repl_stats.pl

diff --git a/src/test/subscription/t/020_repl_stats.pl b/src/test/subscription/t/020_repl_stats.pl
new file mode 100644
index 0000000000..4be58d417a
--- /dev/null
+++ b/src/test/subscription/t/020_repl_stats.pl
@@ -0,0 +1,161 @@
+# Test replication statistics data gets updated in pg_stat_replication_slots
+# view for logical replication.
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 5;
+
+# Create publisher node.
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB');
+$node_publisher->start;
+
+# Create subscriber nodes.
+my $node_subscriber1 = get_new_node('subscriber1');
+$node_subscriber1->init(allows_streaming => 'logical');
+$node_subscriber1->start;
+
+my $node_subscriber2 = get_new_node('subscriber2');
+$node_subscriber2->init(allows_streaming => 'logical');
+$node_subscriber2->start;
+
+my $node_subscriber3 = get_new_node('subscriber3');
+$node_subscriber3->init(allows_streaming => 'logical');
+$node_subscriber3->start;
+
+# Create table on publisher.
+$node_publisher->safe_psql('postgres',
+        "CREATE TABLE test_tab (a int primary key, b varchar)");
+
+# Create table on subscribers.
+$node_subscriber1->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)");
+$node_subscriber2->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)");
+$node_subscriber3->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)");
+
+# Setup logical replication.
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab");
+
+my $appname1 = 'tap_sub1';
+$node_subscriber1->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub1
+	CONNECTION '$publisher_connstr
+	application_name=$appname1'
+	PUBLICATION tap_pub WITH (streaming = on)"
+);
+
+my $appname2 = 'tap_sub2';
+$node_subscriber2->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub2
+	CONNECTION '$publisher_connstr
+	application_name=$appname2'
+	PUBLICATION tap_pub WITH (streaming = off)"
+);
+
+my $appname3 = 'tap_sub3';
+$node_subscriber3->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub3
+	CONNECTION '$publisher_connstr
+	application_name=$appname3'
+	PUBLICATION tap_pub WITH (streaming = off)"
+);
+
+$node_publisher->wait_for_catchup($appname1);
+$node_publisher->wait_for_catchup($appname2);
+$node_publisher->wait_for_catchup($appname3);
+
+# Interleave a pair of transactions, each exceeding the 64kB limit.
+my $in  = '';
+my $out = '';
+
+my $timer = IPC::Run::timeout(180);
+
+my $h = $node_publisher->background_psql('postgres', \$in, \$out, $timer,
+        on_error_stop => 0);
+
+$in .= q{
+BEGIN;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+DELETE FROM test_tab WHERE mod(a,3) = 0;
+};
+$h->pump_nb;
+
+$node_publisher->safe_psql(
+        'postgres', q{
+BEGIN;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(5001, 9999) s(i);
+DELETE FROM test_tab WHERE a > 5000;
+COMMIT;
+});
+
+$in .= q{
+COMMIT;
+\q
+};
+$h->finish;    # errors make the next test fail, so ignore them here
+
+$node_publisher->wait_for_catchup($appname1);
+$node_publisher->wait_for_catchup($appname2);
+$node_publisher->wait_for_catchup($appname3);
+
+# Verify data is replicated to the subscribers.
+my $result =
+  $node_subscriber1->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3332|3332|3332), 'check publisher data is replicated to the subscriber');
+
+$result =
+  $node_subscriber2->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3332|3332|3332), 'check publisher data is replicated to the subscriber');
+
+$result =
+  $node_subscriber3->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3332|3332|3332), 'check publisher data is replicated to the subscriber');
+
+# Test to verify replication statistics data is updated in
+# pg_stat_replication_slots statistics view.
+$result = $node_publisher->safe_psql('postgres',
+	"SELECT slot_name,
+		spill_txns > 0 AS spill_txns,
+		spill_count > 0 AS spill_count,
+		spill_bytes > 0 AS spill_bytes,
+		stream_txns > 0 AS stream_txns,
+		stream_count > 0 AS stream_count,
+		stream_bytes > 0 AS stream_bytes,
+		stats_reset
+	FROM pg_stat_replication_slots ORDER BY slot_name"
+);
+is($result, qq(tap_sub1|f|f|f|t|t|t|
+tap_sub2|t|t|t|f|f|f|
+tap_sub3|t|t|t|f|f|f|), 'check replication statistics are updated');
+
+# Test to drop one of the subscribers and verify replication statistics data is
+# fine after publisher is restarted.
+$node_subscriber3->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub3;");
+
+$node_publisher->stop;
+$node_publisher->start;
+
+# Verify statistics data present in pg_stat_replication_slots are sane after
+# publisher is restarted
+$result = $node_publisher->safe_psql('postgres',
+        "SELECT slot_name,
+                spill_txns > 0 AS spill_txns,
+                spill_count > 0 AS spill_count,
+                spill_bytes > 0 AS spill_bytes,
+                stream_txns > 0 AS stream_txns,
+                stream_count > 0 AS stream_count,
+                stream_bytes > 0 AS stream_bytes,
+                stats_reset
+        FROM pg_stat_replication_slots ORDER BY slot_name"
+);
+is($result, qq(tap_sub1|f|f|f|t|t|t|
+tap_sub2|t|t|t|f|f|f|), 'check replication statistics are updated');
+
+# shutdown
+$node_subscriber1->stop;
+$node_subscriber2->stop;
+$node_subscriber3->stop;
+$node_publisher->stop;
-- 
2.25.1

From ed3af8884463d6858f8b880661d38dfc574a6514 Mon Sep 17 00:00:00 2001
From: vignesh <vignes...@gmail.com>
Date: Thu, 1 Apr 2021 21:15:47 +0530
Subject: [PATCH v2 1/3] Added total txns and total txn bytes to replication
 statistics.

This adds the statistics about total transactions count and total transaction
data logically replicated to the decoding output plugin from ReorderBuffer.
Users can query the pg_stat_replication_slots view to check these stats.
This also includes changing char datatype to NameData datatype for slotname.
---
 doc/src/sgml/monitoring.sgml                  | 24 ++++++++++++
 src/backend/catalog/system_views.sql          |  2 +
 src/backend/postmaster/pgstat.c               | 38 ++++++++++---------
 src/backend/replication/logical/logical.c     | 30 ++++++++++-----
 .../replication/logical/reorderbuffer.c       |  5 +++
 src/backend/replication/slot.c                |  7 +++-
 src/backend/utils/adt/pgstatfuncs.c           | 10 +++--
 src/include/catalog/pg_proc.dat               |  6 +--
 src/include/pgstat.h                          | 15 ++++----
 src/include/replication/reorderbuffer.h       |  4 ++
 src/test/regress/expected/rules.out           |  4 +-
 11 files changed, 103 insertions(+), 42 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 56018745c8..25975d2228 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -2689,6 +2689,30 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
       </entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>total_txns</structfield> <type>bigint</type>
+       </para>
+       <para>
+        Number of decoded transactions sent to the decoding output plugin for
+        this slot. This counter is used to maintain the top level transactions,
+        so the counter is not incremented for subtransactions.
+       </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>total_bytes</structfield><type>bigint</type>
+       </para>
+       <para>
+        Amount of decoded transactions data sent to the decoding output plugin
+        while decoding the changes from WAL for this slot. This and total_txns
+        for this slot can be used to gauge the total amount of data during
+        logical decoding.
+       </para>
+      </entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
         <structfield>stats_reset</structfield> <type>timestamp with time zone</type>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 5f2541d316..e3991eb6f6 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -874,6 +874,8 @@ CREATE VIEW pg_stat_replication_slots AS
             s.stream_txns,
             s.stream_count,
             s.stream_bytes,
+            s.total_txns,
+            s.total_bytes,
             s.stats_reset
     FROM pg_stat_get_replication_slots() AS s;
 
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 498d6ee123..1a54477c83 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -65,6 +65,7 @@
 #include "storage/procsignal.h"
 #include "storage/sinvaladt.h"
 #include "utils/ascii.h"
+#include "utils/builtins.h"
 #include "utils/guc.h"
 #include "utils/memutils.h"
 #include "utils/ps_status.h"
@@ -1550,7 +1551,7 @@ pgstat_reset_replslot_counter(const char *name)
 		if (SlotIsPhysical(slot))
 			return;
 
-		strlcpy(msg.m_slotname, name, NAMEDATALEN);
+		namestrcpy(&msg.m_slotname, name);
 		msg.clearall = false;
 	}
 	else
@@ -1768,10 +1769,7 @@ pgstat_report_tempfile(size_t filesize)
  * ----------
  */
 void
-pgstat_report_replslot(const char *slotname, PgStat_Counter spilltxns,
-					   PgStat_Counter spillcount, PgStat_Counter spillbytes,
-					   PgStat_Counter streamtxns, PgStat_Counter streamcount,
-					   PgStat_Counter streambytes)
+pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat)
 {
 	PgStat_MsgReplSlot msg;
 
@@ -1779,14 +1777,16 @@ pgstat_report_replslot(const char *slotname, PgStat_Counter spilltxns,
 	 * Prepare and send the message
 	 */
 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
-	strlcpy(msg.m_slotname, slotname, NAMEDATALEN);
+	namestrcpy(&msg.m_slotname, NameStr(repSlotStat->slotname));
 	msg.m_drop = false;
-	msg.m_spill_txns = spilltxns;
-	msg.m_spill_count = spillcount;
-	msg.m_spill_bytes = spillbytes;
-	msg.m_stream_txns = streamtxns;
-	msg.m_stream_count = streamcount;
-	msg.m_stream_bytes = streambytes;
+	msg.m_spill_txns = repSlotStat->spill_txns;
+	msg.m_spill_count = repSlotStat->spill_count;;
+	msg.m_spill_bytes = repSlotStat->spill_bytes;
+	msg.m_stream_txns = repSlotStat->stream_txns;
+	msg.m_stream_count = repSlotStat->stream_count;
+	msg.m_stream_bytes = repSlotStat->stream_bytes;
+	msg.m_total_txns = repSlotStat->total_txns;
+	msg.m_total_bytes = repSlotStat->total_bytes;
 	pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
 }
 
@@ -1802,7 +1802,7 @@ pgstat_report_replslot_drop(const char *slotname)
 	PgStat_MsgReplSlot msg;
 
 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
-	strlcpy(msg.m_slotname, slotname, NAMEDATALEN);
+	namestrcpy(&msg.m_slotname, slotname);
 	msg.m_drop = true;
 	pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
 }
@@ -6088,7 +6088,7 @@ pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg,
 	else
 	{
 		/* Get the index of replication slot statistics to reset */
-		idx = pgstat_replslot_index(msg->m_slotname, false);
+		idx = pgstat_replslot_index(NameStr(msg->m_slotname), false);
 
 		/*
 		 * Nothing to do if the given slot entry is not found.  This could
@@ -6386,7 +6386,7 @@ pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len)
 	 * Get the index of replication slot statistics.  On dropping, we don't
 	 * create the new statistics.
 	 */
-	idx = pgstat_replslot_index(msg->m_slotname, !msg->m_drop);
+	idx = pgstat_replslot_index(NameStr(msg->m_slotname), !msg->m_drop);
 
 	/*
 	 * The slot entry is not found or there is no space to accommodate the new
@@ -6418,6 +6418,8 @@ pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len)
 		replSlotStats[idx].stream_txns += msg->m_stream_txns;
 		replSlotStats[idx].stream_count += msg->m_stream_count;
 		replSlotStats[idx].stream_bytes += msg->m_stream_bytes;
+		replSlotStats[idx].total_txns += msg->m_total_txns;
+		replSlotStats[idx].total_bytes += msg->m_total_bytes;
 	}
 }
 
@@ -6655,7 +6657,7 @@ pgstat_replslot_index(const char *name, bool create_it)
 	Assert(nReplSlotStats <= max_replication_slots);
 	for (i = 0; i < nReplSlotStats; i++)
 	{
-		if (strcmp(replSlotStats[i].slotname, name) == 0)
+		if (namestrcmp(&replSlotStats[i].slotname, name) == 0)
 			return i;			/* found */
 	}
 
@@ -6668,7 +6670,7 @@ pgstat_replslot_index(const char *name, bool create_it)
 
 	/* Register new slot */
 	memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats));
-	strlcpy(replSlotStats[nReplSlotStats].slotname, name, NAMEDATALEN);
+	namestrcpy(&replSlotStats[nReplSlotStats].slotname, name);
 
 	return nReplSlotStats++;
 }
@@ -6689,6 +6691,8 @@ pgstat_reset_replslot(int i, TimestampTz ts)
 	replSlotStats[i].stream_txns = 0;
 	replSlotStats[i].stream_count = 0;
 	replSlotStats[i].stream_bytes = 0;
+	replSlotStats[i].total_txns = 0;
+	replSlotStats[i].total_bytes = 0;
 	replSlotStats[i].stat_reset_timestamp = ts;
 }
 
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 2f6803637b..d0ad694477 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1763,30 +1763,42 @@ void
 UpdateDecodingStats(LogicalDecodingContext *ctx)
 {
 	ReorderBuffer *rb = ctx->reorder;
+	PgStat_ReplSlotStats repSlotStat;
 
 	/*
-	 * Nothing to do if we haven't spilled or streamed anything since the last
-	 * time the stats has been sent.
+	 * Nothing to do if we don't have any replication stats to be sent.
 	 */
-	if (rb->spillBytes <= 0 && rb->streamBytes <= 0)
+	if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0)
 		return;
 
-	elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld",
+	elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld %lld %lld",
 		 rb,
 		 (long long) rb->spillTxns,
 		 (long long) rb->spillCount,
 		 (long long) rb->spillBytes,
 		 (long long) rb->streamTxns,
 		 (long long) rb->streamCount,
-		 (long long) rb->streamBytes);
-
-	pgstat_report_replslot(NameStr(ctx->slot->data.name),
-						   rb->spillTxns, rb->spillCount, rb->spillBytes,
-						   rb->streamTxns, rb->streamCount, rb->streamBytes);
+		 (long long) rb->streamBytes,
+		 (long long) rb->totalTxns,
+		 (long long) rb->totalBytes);
+
+	namestrcpy(&repSlotStat.slotname, NameStr(ctx->slot->data.name));
+	repSlotStat.spill_txns = rb->spillTxns;
+	repSlotStat.spill_count = rb->spillCount;
+	repSlotStat.spill_bytes = rb->spillBytes;
+	repSlotStat.stream_txns = rb->streamTxns;
+	repSlotStat.stream_count = rb->streamCount;
+	repSlotStat.stream_bytes = rb->streamBytes;
+	repSlotStat.total_txns = rb->totalTxns;
+	repSlotStat.total_bytes = rb->totalBytes;
+
+	pgstat_report_replslot(&repSlotStat);
 	rb->spillTxns = 0;
 	rb->spillCount = 0;
 	rb->spillBytes = 0;
 	rb->streamTxns = 0;
 	rb->streamCount = 0;
 	rb->streamBytes = 0;
+	rb->totalTxns = 0;
+	rb->totalBytes = 0;
 }
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 52d06285a2..5bc0b05a0e 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -350,6 +350,8 @@ ReorderBufferAllocate(void)
 	buffer->streamTxns = 0;
 	buffer->streamCount = 0;
 	buffer->streamBytes = 0;
+	buffer->totalTxns = 0;
+	buffer->totalBytes = 0;
 
 	buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
 
@@ -659,6 +661,8 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create,
 			dlist_push_tail(&rb->toplevel_by_lsn, &txn->node);
 			AssertTXNLsnOrder(rb);
 		}
+
+		rb->totalTxns++;
 	}
 	else
 		txn = NULL;				/* not found and not asked to create */
@@ -3078,6 +3082,7 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 	{
 		txn->size += sz;
 		rb->size += sz;
+		rb->totalBytes += sz;
 
 		/* Update the total size in the top transaction. */
 		if (toptxn)
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 75a087c2f9..f61b163f78 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -328,7 +328,12 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	 * ReplicationSlotAllocationLock.
 	 */
 	if (SlotIsLogical(slot))
-		pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0, 0, 0, 0);
+	{
+		PgStat_ReplSlotStats repSlotStat;
+		MemSet(&repSlotStat, 0, sizeof(PgStat_ReplSlotStats));
+		namestrcpy(&repSlotStat.slotname, NameStr(slot->data.name));
+		pgstat_report_replslot(&repSlotStat);
+	}
 
 	/*
 	 * Now that the slot has been marked as in_use and active, it's safe to
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 9ffbca685c..54eb3d1538 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2279,7 +2279,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_REPLICATION_SLOT_COLS 8
+#define PG_STAT_GET_REPLICATION_SLOT_COLS 10
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	TupleDesc	tupdesc;
 	Tuplestorestate *tupstore;
@@ -2323,18 +2323,20 @@ pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
 		MemSet(values, 0, sizeof(values));
 		MemSet(nulls, 0, sizeof(nulls));
 
-		values[0] = PointerGetDatum(cstring_to_text(s->slotname));
+		values[0] = CStringGetTextDatum(NameStr(s->slotname));
 		values[1] = Int64GetDatum(s->spill_txns);
 		values[2] = Int64GetDatum(s->spill_count);
 		values[3] = Int64GetDatum(s->spill_bytes);
 		values[4] = Int64GetDatum(s->stream_txns);
 		values[5] = Int64GetDatum(s->stream_count);
 		values[6] = Int64GetDatum(s->stream_bytes);
+		values[7] = Int64GetDatum(s->total_txns);
+		values[8] = Int64GetDatum(s->total_bytes);
 
 		if (s->stat_reset_timestamp == 0)
-			nulls[7] = true;
+			nulls[9] = true;
 		else
-			values[7] = TimestampTzGetDatum(s->stat_reset_timestamp);
+			values[9] = TimestampTzGetDatum(s->stat_reset_timestamp);
 
 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
 	}
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 69ffd0c3f4..8b2071c77b 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5296,9 +5296,9 @@
   proname => 'pg_stat_get_replication_slots', prorows => '10',
   proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => '',
-  proallargtypes => '{text,int8,int8,int8,int8,int8,int8,timestamptz}',
-  proargmodes => '{o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,stats_reset}',
+  proallargtypes => '{text,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}',
   prosrc => 'pg_stat_get_replication_slots' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 3247c7b8ad..f794dadffc 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -379,7 +379,7 @@ typedef struct PgStat_MsgResetslrucounter
 typedef struct PgStat_MsgResetreplslotcounter
 {
 	PgStat_MsgHdr m_hdr;
-	char		m_slotname[NAMEDATALEN];
+	NameData      m_slotname;
 	bool		clearall;
 } PgStat_MsgResetreplslotcounter;
 
@@ -507,7 +507,7 @@ typedef struct PgStat_MsgSLRU
 typedef struct PgStat_MsgReplSlot
 {
 	PgStat_MsgHdr m_hdr;
-	char		m_slotname[NAMEDATALEN];
+	NameData	m_slotname;
 	bool		m_drop;
 	PgStat_Counter m_spill_txns;
 	PgStat_Counter m_spill_count;
@@ -515,6 +515,8 @@ typedef struct PgStat_MsgReplSlot
 	PgStat_Counter m_stream_txns;
 	PgStat_Counter m_stream_count;
 	PgStat_Counter m_stream_bytes;
+	PgStat_Counter m_total_txns;
+	PgStat_Counter m_total_bytes;
 } PgStat_MsgReplSlot;
 
 
@@ -872,13 +874,15 @@ typedef struct PgStat_SLRUStats
  */
 typedef struct PgStat_ReplSlotStats
 {
-	char		slotname[NAMEDATALEN];
+	NameData	slotname;
 	PgStat_Counter spill_txns;
 	PgStat_Counter spill_count;
 	PgStat_Counter spill_bytes;
 	PgStat_Counter stream_txns;
 	PgStat_Counter stream_count;
 	PgStat_Counter stream_bytes;
+	PgStat_Counter total_txns;
+	PgStat_Counter total_bytes;
 	TimestampTz stat_reset_timestamp;
 } PgStat_ReplSlotStats;
 
@@ -1234,10 +1238,7 @@ extern void pgstat_report_recovery_conflict(int reason);
 extern void pgstat_report_deadlock(void);
 extern void pgstat_report_checksum_failures_in_db(Oid dboid, int failurecount);
 extern void pgstat_report_checksum_failure(void);
-extern void pgstat_report_replslot(const char *slotname, PgStat_Counter spilltxns,
-								   PgStat_Counter spillcount, PgStat_Counter spillbytes,
-								   PgStat_Counter streamtxns, PgStat_Counter streamcount,
-								   PgStat_Counter streambytes);
+extern void pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat);
 extern void pgstat_report_replslot_drop(const char *slotname);
 
 extern void pgstat_initialize(void);
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 565a961d6a..a372b70b7d 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -618,6 +618,10 @@ struct ReorderBuffer
 	int64		streamTxns;		/* number of transactions streamed */
 	int64		streamCount;	/* streaming invocation counter */
 	int64		streamBytes;	/* amount of data streamed */
+
+	/* Statistics about all the replicated transactions */
+	int64		totalTxns;		/* total number of transactions replicated */
+	int64		totalBytes;		/* total amount of data replicated */
 };
 
 
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 9b59a7b4a5..f0eea8b18f 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2056,8 +2056,10 @@ pg_stat_replication_slots| SELECT s.slot_name,
     s.stream_txns,
     s.stream_count,
     s.stream_bytes,
+    s.total_txns,
+    s.total_bytes,
     s.stats_reset
-   FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, stats_reset);
+   FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_txns, total_bytes, stats_reset);
 pg_stat_slru| SELECT s.name,
     s.blks_zeroed,
     s.blks_hit,
-- 
2.25.1

Reply via email to