On Fri, 25 Aug 2023 at 17:40, vignesh C <vignes...@gmail.com> wrote:
>
> On Sat, 19 Aug 2023 at 11:53, Amit Kapila <amit.kapil...@gmail.com> wrote:
> >
> > It's entirely possible for a logical slot to have a confirmed_flush
> > LSN higher than the last value saved on disk while not being marked as
> > dirty.  It's currently not a problem to lose that value during a clean
> > shutdown / restart cycle but to support the upgrade of logical slots
> > [1] (see latest patch at [2]), we seem to rely on that value being
> > properly persisted to disk. During the upgrade, we need to verify that
> > all the data prior to shudown_checkpoint for the logical slots has
> > been consumed, otherwise, the downstream may miss some data. Now, to
> > ensure the same, we are planning to compare the confirm_flush LSN
> > location with the latest shudown_checkpoint location which means that
> > the confirm_flush LSN should be updated after restart.
> >
> > I think this is inefficient even without an upgrade because, after the
> > restart, this may lead to decoding some data again. Say, we process
> > some transactions for which we didn't send anything downstream (the
> > changes got filtered) but the confirm_flush LSN is updated due to
> > keepalives. As we don't flush the latest value of confirm_flush LSN,
> > it may lead to processing the same changes again.
>
> I was able to test and verify that we were not processing the same
> changes again.
> Note: The 0001-Add-logs-to-skip-transaction-filter-insert-operation.patch
> has logs to print if a decode transaction is skipped and also a log to
> mention if any operation is filtered.
> The test.sh script has the steps for a) setting up logical replication
> for a table b) perform insert on table that need to be published (this
> will be replicated to the subscriber) c) perform insert on a table
> that will not be published (this insert will be filtered, it will not
> be replicated) d) sleep for 5 seconds e) stop the server f) start the
> server
> I used the following steps, do the following in HEAD:
> a) Apply 0001-Add-logs-to-skip-transaction-filter-insert-operation.patch
> patch in Head and build the binaries b) execute test.sh c) view N1.log
> file to see that the insert operations were filtered again by seeing
> the following logs:
> LOG:  Filter insert for table tbl2
> ...
> ===restart===
> ...
> LOG:  Skipping transaction 0/156AD10 as start decode at is greater 0/156AE40
> ...
> LOG:  Filter insert for table tbl2
>
> We can see that the insert operations on tbl2 which was filtered
> before server was stopped is again filtered after restart too in HEAD.
>
> Lets see that the same changes were not processed again with patch:
> a) Apply v4-0001-Persist-to-disk-logical-slots-during-a-shutdown-c.patch
> from [1] also apply
> 0001-Add-logs-to-skip-transaction-filter-insert-operation.patch patch
> and build the binaries b) execute test.sh c) view N1.log file to see
> that the insert operations were skipped after restart of server by
> seeing the following logs:
> LOG:  Filter insert for table tbl2
> ...
> ===restart===
> ...
> Skipping transaction 0/156AD10 as start decode at is greater 0/156AFB0
> ...
> Skipping transaction 0/156AE80 as start decode at is greater 0/156AFB0
>
> We can see that the insert operations on tbl2 are not processed again
> after restart with the patch.

Here is another way to test using pg_replslotdata approach that was
proposed earlier at [1].
I have rebased this on top of HEAD and the v5 version for the same is attached.

We can use the same test as test.sh shared at [2].
When executed with HEAD, it was noticed that confirmed_flush points to
WAL location before both the transaction:
slot_name  slot_type    datoid   persistency     xmin  catalog_xmin
  restart_lsn         confirmed_flush     two_phase_at  two_phase
plugin
---------       ---------         ------     ----------           ----
    -----------               -----------            ---------------
         ------------          ---------          ------
sub            logical          5          persistent       0
735                      0/1531E28        0/1531E60             0/0
               0                   pgoutput

WAL record information generated using pg_walinspect for various
records at and after confirmed_flush WAL 0/1531E60:
 row_number | start_lsn |  end_lsn  | prev_lsn  | xid |
resource_manager |     record_type     | record_length |
main_data_length | fpi_length |
                               description
                                                                     |
                 block_ref
------------+-----------+-----------+-----------+-----+------------------+---------------------+---------------+------------------+------------+-------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------
          1 | 0/1531E60 | 0/1531EA0 | 0/1531E28 |   0 | Heap2
  | PRUNE               |            57 |                9 |
0 | snapshotConflictHorizon: 0, nredirected: 0, ndead: 1, nunused: 0,
redirected: [], dead: [1], unused: []
                                                                     |
blkref #0: rel 1663/5/1255 fork main blk 58
          2 | 0/1531EA0 | 0/1531EE0 | 0/1531E60 | 735 | Heap
  | INSERT+INIT         |            59 |                3 |
0 | off: 1, flags: 0x08

                                                                     |
blkref #0: rel 1663/5/16384 fork main blk 0
          3 | 0/1531EE0 | 0/1531F20 | 0/1531EA0 | 735 | Heap
  | INSERT              |            59 |                3 |
0 | off: 2, flags: 0x08

                                                                     |
blkref #0: rel 1663/5/16384 fork main blk 0
          4 | 0/1531F20 | 0/1531F60 | 0/1531EE0 | 735 | Heap
  | INSERT              |            59 |                3 |
0 | off: 3, flags: 0x08

                                                                     |
blkref #0: rel 1663/5/16384 fork main blk 0
          5 | 0/1531F60 | 0/1531FA0 | 0/1531F20 | 735 | Heap
  | INSERT              |            59 |                3 |
0 | off: 4, flags: 0x08

                                                                     |
blkref #0: rel 1663/5/16384 fork main blk 0
          6 | 0/1531FA0 | 0/1531FE0 | 0/1531F60 | 735 | Heap
  | INSERT              |            59 |                3 |
0 | off: 5, flags: 0x08

                                                                     |
blkref #0: rel 1663/5/16384 fork main blk 0
          7 | 0/1531FE0 | 0/1532028 | 0/1531FA0 | 735 | Transaction
  | COMMIT              |            46 |               20 |
0 | 2023-08-27 23:22:17.161215+05:30

                                                                     |
          8 | 0/1532028 | 0/1532068 | 0/1531FE0 | 736 | Heap
  | INSERT+INIT         |            59 |                3 |
0 | off: 1, flags: 0x08

                                                                     |
blkref #0: rel 1663/5/16387 fork main blk 0
          9 | 0/1532068 | 0/15320A8 | 0/1532028 | 736 | Heap
  | INSERT              |            59 |                3 |
0 | off: 2, flags: 0x08

                                                                     |
blkref #0: rel 1663/5/16387 fork main blk 0
         10 | 0/15320A8 | 0/15320E8 | 0/1532068 | 736 | Heap
  | INSERT              |            59 |                3 |
0 | off: 3, flags: 0x08

                                                                     |
blkref #0: rel 1663/5/16387 fork main blk 0
         11 | 0/15320E8 | 0/1532128 | 0/15320A8 | 736 | Heap
  | INSERT              |            59 |                3 |
0 | off: 4, flags: 0x08

                                                                     |
blkref #0: rel 1663/5/16387 fork main blk 0
         12 | 0/1532128 | 0/1532168 | 0/15320E8 | 736 | Heap
  | INSERT              |            59 |                3 |
0 | off: 5, flags: 0x08

                                                                     |
blkref #0: rel 1663/5/16387 fork main blk 0
         13 | 0/1532168 | 0/1532198 | 0/1532128 | 736 | Transaction
  | COMMIT              |            46 |               20 |
0 | 2023-08-27 23:22:17.174756+05:30

                                                                     |
         14 | 0/1532198 | 0/1532210 | 0/1532168 |   0 | XLOG
  | CHECKPOINT_SHUTDOWN |           114 |               88 |
0 | redo 0/1532198; tli 1; prev tli 1; fpw true; xid 0:737; oid 16399;
 multi 1; offset 0; oldest xid 723 in DB 1; oldest multi 1 in DB 1;
oldest/newest commit timestamp xid: 0/0; oldest running xid 0;
shutdown |

Whereas the same test executed with the patch applied shows that
confirmed_flush points to CHECKPOINT_SHUTDOWN record:
slot_name   slot_type   datoid persistency    xmin catalog_xmin
restart_lsn       confirmed_flush    two_phase_at  two_phase
    plugin
---------         ---------       ------    -----------       ---
-----------           -----------           ---------------
-----------  ---------               ------
sub              logical       5          persistent    0        735
               0/1531E28       0/1532198            0/0          0
        pgoutput

WAL record information generated using pg_walinspect for various
records at and after confirmed_flush WAL 0/1532198:
 row_number | start_lsn |  end_lsn  | prev_lsn  | xid |
resource_manager |     record_type     | record_length |
main_data_length | fpi_length |
                               description
                                                                     |
block_ref
------------+-----------+-----------+-----------+-----+------------------+---------------------+---------------+------------------+------------+-------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------------------+-----------
          1 | 0/1532198 | 0/1532210 | 0/1532168 |   0 | XLOG
  | CHECKPOINT_SHUTDOWN |           114 |               88 |
0 | redo 0/1532198; tli 1; prev tli 1; fpw true; xid 0:737; oid 16399;
 multi 1; offset 0; oldest xid 723 in DB 1; oldest multi 1 in DB 1;
oldest/newest commit timestamp xid: 0/0; oldest running xid 0;
shutdown |
(1 row)

[1] - 
https://www.postgresql.org/message-id/flat/CALj2ACW0rV5gWK8A3m6_X62qH%2BVfaq5hznC%3Di0R5Wojt5%2Byhyw%40mail.gmail.com
[2] - 
https://www.postgresql.org/message-id/CALDaNm2BboFuFVYxyzP4wkv7%3D8%2B_TwsD%2BugyGhtibTSF4m4XRg%40mail.gmail.com

Regards,
Vignesh
From 934f2d32bf6d3b3bb8ae0fcf334aa371fc95de19 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignes...@gmail.com>
Date: Sun, 27 Aug 2023 22:03:27 +0530
Subject: [PATCH v5] pg_replslotdata

TODO: Display invalidation information for replication slot.
---
 src/backend/replication/slot.c            |  39 ---
 src/bin/Makefile                          |   1 +
 src/bin/pg_replslotdata/.gitignore        |   2 +
 src/bin/pg_replslotdata/Makefile          |  44 +++
 src/bin/pg_replslotdata/nls.mk            |   6 +
 src/bin/pg_replslotdata/pg_replslotdata.c | 362 ++++++++++++++++++++++
 src/bin/pg_replslotdata/t/001_basic.pl    |  11 +
 src/include/replication/slot.h            |  97 +-----
 src/include/replication/slot_common.h     | 147 +++++++++
 9 files changed, 574 insertions(+), 135 deletions(-)
 create mode 100644 src/bin/pg_replslotdata/.gitignore
 create mode 100644 src/bin/pg_replslotdata/Makefile
 create mode 100644 src/bin/pg_replslotdata/nls.mk
 create mode 100644 src/bin/pg_replslotdata/pg_replslotdata.c
 create mode 100644 src/bin/pg_replslotdata/t/001_basic.pl
 create mode 100644 src/include/replication/slot_common.h

diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index bb09c4010f..567d61540a 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -53,45 +53,6 @@
 #include "storage/procarray.h"
 #include "utils/builtins.h"
 
-/*
- * Replication slot on-disk data structure.
- */
-typedef struct ReplicationSlotOnDisk
-{
-	/* first part of this struct needs to be version independent */
-
-	/* data not covered by checksum */
-	uint32		magic;
-	pg_crc32c	checksum;
-
-	/* data covered by checksum */
-	uint32		version;
-	uint32		length;
-
-	/*
-	 * The actual data in the slot that follows can differ based on the above
-	 * 'version'.
-	 */
-
-	ReplicationSlotPersistentData slotdata;
-} ReplicationSlotOnDisk;
-
-/* size of version independent data */
-#define ReplicationSlotOnDiskConstantSize \
-	offsetof(ReplicationSlotOnDisk, slotdata)
-/* size of the part of the slot not covered by the checksum */
-#define ReplicationSlotOnDiskNotChecksummedSize  \
-	offsetof(ReplicationSlotOnDisk, version)
-/* size of the part covered by the checksum */
-#define ReplicationSlotOnDiskChecksummedSize \
-	sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
-/* size of the slot data that is version dependent */
-#define ReplicationSlotOnDiskV2Size \
-	sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
-
-#define SLOT_MAGIC		0x1051CA1	/* format identifier */
-#define SLOT_VERSION	3		/* version for new files */
-
 /* Control array for replication slot management */
 ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
 
diff --git a/src/bin/Makefile b/src/bin/Makefile
index 373077bf52..1db7dd00d4 100644
--- a/src/bin/Makefile
+++ b/src/bin/Makefile
@@ -23,6 +23,7 @@ SUBDIRS = \
 	pg_controldata \
 	pg_ctl \
 	pg_dump \
+	pg_replslotdata \
 	pg_resetwal \
 	pg_rewind \
 	pg_test_fsync \
diff --git a/src/bin/pg_replslotdata/.gitignore b/src/bin/pg_replslotdata/.gitignore
new file mode 100644
index 0000000000..13a4afb8ef
--- /dev/null
+++ b/src/bin/pg_replslotdata/.gitignore
@@ -0,0 +1,2 @@
+/pg_replslotdata
+/tmp_check/
diff --git a/src/bin/pg_replslotdata/Makefile b/src/bin/pg_replslotdata/Makefile
new file mode 100644
index 0000000000..69518ee53b
--- /dev/null
+++ b/src/bin/pg_replslotdata/Makefile
@@ -0,0 +1,44 @@
+#-------------------------------------------------------------------------
+#
+# Makefile for src/bin/pg_replslotdata
+#
+# Copyright (c) 1998-2021, PostgreSQL Global Development Group
+#
+# src/bin/pg_replslotdata/Makefile
+#
+#-------------------------------------------------------------------------
+
+PGFILEDESC = "pg_replslotdata - provides information about the replication slots from $PGDATA/pg_replslot/<slot_name> $PGDATA/pg_replslot/<slot_name>"
+PGAPPICON=win32
+
+subdir = src/bin/pg_replslotdata
+top_builddir = ../../..
+include $(top_builddir)/src/Makefile.global
+
+OBJS = \
+	$(WIN32RES) \
+	pg_replslotdata.o
+
+all: pg_replslotdata
+
+pg_replslotdata: $(OBJS) | submake-libpgport
+	$(CC) $(CFLAGS) $^ $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
+
+install: all installdirs
+	$(INSTALL_PROGRAM) pg_replslotdata$(X) '$(DESTDIR)$(bindir)/pg_replslotdata$(X)'
+
+installdirs:
+	$(MKDIR_P) '$(DESTDIR)$(bindir)'
+
+uninstall:
+	rm -f '$(DESTDIR)$(bindir)/pg_replslotdata$(X)'
+
+clean distclean maintainer-clean:
+	rm -f pg_replslotdata$(X) $(OBJS)
+	rm -rf tmp_check
+
+check:
+	$(prove_check)
+
+installcheck:
+	$(prove_installcheck)
diff --git a/src/bin/pg_replslotdata/nls.mk b/src/bin/pg_replslotdata/nls.mk
new file mode 100644
index 0000000000..74bee593c9
--- /dev/null
+++ b/src/bin/pg_replslotdata/nls.mk
@@ -0,0 +1,6 @@
+# src/bin/pg_replslotdata/nls.mk
+CATALOG_NAME     = pg_replslotdata
+AVAIL_LANGUAGES  = cs de el es fr ja ko pl ru sv tr uk vi zh_CN
+GETTEXT_FILES    = $(FRONTEND_COMMON_GETTEXT_FILES) pg_replslotdata.c
+GETTEXT_TRIGGERS = $(FRONTEND_COMMON_GETTEXT_TRIGGERS)
+GETTEXT_FLAGS    = $(FRONTEND_COMMON_GETTEXT_FLAGS)
diff --git a/src/bin/pg_replslotdata/pg_replslotdata.c b/src/bin/pg_replslotdata/pg_replslotdata.c
new file mode 100644
index 0000000000..aed5d6750c
--- /dev/null
+++ b/src/bin/pg_replslotdata/pg_replslotdata.c
@@ -0,0 +1,362 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_replslotdata.c - provides information about the replication slots
+ * from $PGDATA/pg_replslot/<slot_name>.
+ *
+ * Copyright (c) 2021, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  src/bin/pg_replslotdata/pg_replslotdata.c
+ *-------------------------------------------------------------------------
+ */
+/*
+ * We have to use postgres.h not postgres_fe.h here, because there's so much
+ * backend-only stuff in the XLOG include files we need.  But we need a
+ * frontend-ish environment otherwise.  Hence this ugly hack.
+ */
+#define FRONTEND 1
+
+#include "postgres.h"
+
+#include <dirent.h>
+#include <sys/stat.h>
+
+#include "access/xlog.h"
+#include "access/xlog_internal.h"
+#include "common/logging.h"
+#include "common/string.h"
+#include "getopt_long.h"
+#include "pg_getopt.h"
+#include "replication/slot_common.h"
+
+static bool verbose = false;
+
+static void process_replslots(void);
+static void read_and_display_repl_slot(const char *name);
+
+static void
+usage(const char *progname)
+{
+	printf(_("%s displays information about the replication slots from $PGDATA/pg_replslot/<slot_name>.\n\n"), progname);
+	printf(_("Usage:\n"));
+	printf(_("  %s [OPTION] [DATADIR]\n"), progname);
+	printf(_("\nOptions:\n"));
+	printf(_(" [-D, --pgdata=]DATADIR  data directory\n"));
+	printf(_("  -V, --version          output version information, then exit\n"));
+	printf(_("  -v, --verbose          write a lot of output\n"));
+	printf(_("  -?, --help             show this help, then exit\n"));
+	printf(_("\nIf no data directory (DATADIR) is specified, "
+			 "the environment variable PGDATA\nis used.\n\n"));
+	printf(_("Report bugs to <%s>.\n"), PACKAGE_BUGREPORT);
+	printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
+}
+
+static void
+process_replslots(void)
+{
+	DIR		   *rsdir;
+	struct dirent *rsde;
+	uint32		cnt = 0;
+
+	rsdir = opendir("pg_replslot");
+	if (rsdir == NULL)
+	{
+		pg_log_error("could not open directory \"%s\": %m", "pg_replslot");
+		exit(1);
+	}
+
+	/* XXX: comment here about the format spefiiers */
+	printf("%-64s %9s %10s %11s %10s %12s %21s %21s %21s %21s %10s %20s\n"
+		   "%-64s %9s %10s %11s %10s %12s %21s %21s %21s %21s %10s %20s\n",
+		   "slot_name", "slot_type", "datoid", "persistency", "xmin", "catalog_xmin", "restart_lsn", "invalidated_at", "confirmed_flush", "two_phase_at", "two_phase", "plugin",
+		   "---------", "---------", "------", "-----------", "----", "------------", "-----------", "--------------", "---------------", "------------", "---------", "------");
+
+	while (errno = 0, (rsde = readdir(rsdir)) != NULL)
+	{
+		struct stat statbuf;
+		char		path[MAXPGPATH];
+
+		if (strcmp(rsde->d_name, ".") == 0 ||
+			strcmp(rsde->d_name, "..") == 0)
+			continue;
+
+		snprintf(path, sizeof(path), "pg_replslot/%s", rsde->d_name);
+
+		/* we're only creating directories here, skip if it's not our's */
+		if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
+			continue;
+
+		/* we crashed while a slot was being setup or deleted, clean up */
+		if (pg_str_endswith(rsde->d_name, ".tmp"))
+		{
+			pg_log_warning("server was crashed while the slot \"%s\" was being setup or deleted",
+						   rsde->d_name);
+			continue;
+		}
+
+		/* looks like a slot in a normal state, restore */
+		read_and_display_repl_slot(rsde->d_name);
+		cnt++;
+	}
+
+	if (errno)
+	{
+		pg_log_error("could not read directory \"%s\": %m", "pg_replslot");
+		exit(1);
+	}
+
+	if (cnt == 0)
+	{
+		pg_log_info("no replication slots were found");
+		exit(0);
+	}
+
+	if (closedir(rsdir))
+	{
+		pg_log_error("could not close directory \"%s\": %m", "pg_replslot");
+		exit(1);
+	}
+}
+
+static void
+read_and_display_repl_slot(const char *name)
+{
+	ReplicationSlotOnDisk cp;
+	char		slotdir[MAXPGPATH];
+	char		path[MAXPGPATH];
+	char		restart_lsn[NAMEDATALEN];
+	char		confirmed_flush[NAMEDATALEN];
+	char		two_phase_at[NAMEDATALEN];
+	char		persistency[NAMEDATALEN];
+	int			fd;
+	int			readBytes;
+	pg_crc32c	checksum;
+
+	/* delete temp file if it exists */
+	sprintf(slotdir, "pg_replslot/%s", name);
+	sprintf(path, "%s/state.tmp", slotdir);
+
+	fd = open(path, O_RDONLY | PG_BINARY, 0);
+
+	if (fd > 0)
+	{
+		pg_log_error("found temporary state file \"%s\": %m", path);
+		exit(1);
+	}
+
+	sprintf(path, "%s/state", slotdir);
+
+	if (verbose)
+		pg_log_info("reading replication slot from \"%s\"", path);
+
+	fd = open(path, O_RDONLY | PG_BINARY, 0);
+
+	/*
+	 * We do not need to handle this as we are rename()ing the directory into
+	 * place only after we fsync()ed the state file.
+	 */
+	if (fd < 0)
+	{
+		pg_log_error("could not open file \"%s\": %m", path);
+		exit(1);
+	}
+
+	if (verbose)
+		pg_log_info("reading version independent replication slot state file");
+
+	/* read part of statefile that's guaranteed to be version independent */
+	readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
+	if (readBytes != ReplicationSlotOnDiskConstantSize)
+	{
+		if (readBytes < 0)
+		{
+			pg_log_error("could not read file \"%s\": %m", path);
+			exit(1);
+		}
+		else
+		{
+			pg_log_error("could not read file \"%s\": read %d of %zu",
+						 path, readBytes,
+						 (Size) ReplicationSlotOnDiskConstantSize);
+			exit(1);
+		}
+	}
+
+	/* verify magic */
+	if (cp.magic != SLOT_MAGIC)
+	{
+		pg_log_error("replication slot file \"%s\" has wrong magic number: %u instead of %u",
+					 path, cp.magic, SLOT_MAGIC);
+		exit(1);
+	}
+
+	/* verify version */
+	if (cp.version != SLOT_VERSION)
+	{
+		pg_log_error("replication slot file \"%s\" has unsupported version %u",
+					 path, cp.version);
+		exit(1);
+	}
+
+	/* boundary check on length */
+	if (cp.length != ReplicationSlotOnDiskV2Size)
+	{
+		pg_log_error("replication slot file \"%s\" has corrupted length %u",
+					 path, cp.length);
+		exit(1);
+	}
+
+	if (verbose)
+		pg_log_info("reading the entire replication slot state file");
+
+	/* now that we know the size, read the entire file */
+	readBytes = read(fd,
+					 (char *) &cp + ReplicationSlotOnDiskConstantSize,
+					 cp.length);
+	if (readBytes != cp.length)
+	{
+		if (readBytes < 0)
+		{
+			pg_log_error("could not read file \"%s\": %m", path);
+			exit(1);
+		}
+		else
+		{
+			pg_log_error("could not read file \"%s\": read %d of %zu",
+						 path, readBytes, (Size) cp.length);
+			exit(1);
+		}
+	}
+
+	if (close(fd) != 0)
+	{
+		pg_log_error("could not close file \"%s\": %m", path);
+		exit(1);
+	}
+
+	/* now verify the CRC */
+	INIT_CRC32C(checksum);
+	COMP_CRC32C(checksum,
+				(char *) &cp + ReplicationSlotOnDiskNotChecksummedSize,
+				ReplicationSlotOnDiskChecksummedSize);
+	FIN_CRC32C(checksum);
+
+	if (!EQ_CRC32C(checksum, cp.checksum))
+	{
+		pg_log_error("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
+					 path, checksum, cp.checksum);
+		exit(1);
+	}
+
+	sprintf(restart_lsn, "%X/%X", LSN_FORMAT_ARGS(cp.slotdata.restart_lsn));
+	sprintf(confirmed_flush, "%X/%X", LSN_FORMAT_ARGS(cp.slotdata.confirmed_flush));
+	sprintf(two_phase_at, "%X/%X", LSN_FORMAT_ARGS(cp.slotdata.two_phase_at));
+
+	if (cp.slotdata.persistency == RS_PERSISTENT)
+		sprintf(persistency, "persistent");
+	else if (cp.slotdata.persistency == RS_EPHEMERAL)
+		sprintf(persistency, "ephemeral");
+	else if (cp.slotdata.persistency == RS_TEMPORARY)
+		sprintf(persistency, "temporary");
+
+	/* display the slot information */
+	printf("%-64s %9s %10u %11s %10u %12u %21s %21s %21s %10d %20s\n",
+		   NameStr(cp.slotdata.name),
+		   cp.slotdata.database == InvalidOid ? "physical" : "logical",
+		   cp.slotdata.database,
+		   persistency,
+		   cp.slotdata.xmin,
+		   cp.slotdata.catalog_xmin,
+		   restart_lsn,
+		   confirmed_flush,
+		   two_phase_at,
+		   cp.slotdata.two_phase,
+		   NameStr(cp.slotdata.plugin));
+}
+
+int
+main(int argc, char *argv[])
+{
+	static struct option long_options[] = {
+		{"pgdata", required_argument, NULL, 'D'},
+		{"verbose", no_argument, NULL, 'v'},
+		{NULL, 0, NULL, 0}
+	};
+
+	char	   *DataDir = NULL;
+	const char *progname;
+	int			c;
+
+	pg_logging_init(argv[0]);
+	set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_replslotdata"));
+	progname = get_progname(argv[0]);
+
+	if (argc > 1)
+	{
+		if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
+		{
+			usage(progname);
+			exit(0);
+		}
+		if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
+		{
+			puts("pg_replslotdata (PostgreSQL) " PG_VERSION);
+			exit(0);
+		}
+	}
+
+	while ((c = getopt_long(argc, argv, "D:v", long_options, NULL)) != -1)
+	{
+		switch (c)
+		{
+			case 'D':
+				DataDir = optarg;
+				break;
+			case 'v':
+				verbose = true;
+				break;
+			default:
+				fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
+				exit(1);
+		}
+	}
+
+	if (DataDir == NULL)
+	{
+		if (optind < argc)
+			DataDir = argv[optind++];
+		else
+			DataDir = getenv("PGDATA");
+	}
+
+	/* complain if any arguments remain */
+	if (optind < argc)
+	{
+		pg_log_error("too many command-line arguments (first is \"%s\")",
+					 argv[optind]);
+		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+				progname);
+		exit(1);
+	}
+
+	if (DataDir == NULL)
+	{
+		pg_log_error("no data directory specified");
+		fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
+		exit(1);
+	}
+
+	if (verbose)
+		pg_log_info("data directory: \"%s\"", DataDir);
+
+	if (chdir(DataDir) < 0)
+	{
+		pg_log_error("could not change directory to \"%s\": %m",
+					 DataDir);
+		exit(1);
+	}
+
+	process_replslots();
+
+	return 0;
+}
diff --git a/src/bin/pg_replslotdata/t/001_basic.pl b/src/bin/pg_replslotdata/t/001_basic.pl
new file mode 100644
index 0000000000..d6830dc2ac
--- /dev/null
+++ b/src/bin/pg_replslotdata/t/001_basic.pl
@@ -0,0 +1,11 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+use strict;
+use warnings;
+use PostgreSQL::Test::Utils;
+use Test::More tests => 8;
+
+program_help_ok('pg_replslotdata');
+program_version_ok('pg_replslotdata');
+program_options_handling_ok('pg_replslotdata');
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index a8a89dc784..a7d16a37a3 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -15,104 +15,9 @@
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
 #include "storage/spin.h"
+#include "replication/slot_common.h"
 #include "replication/walreceiver.h"
 
-/*
- * Behaviour of replication slots, upon release or crash.
- *
- * Slots marked as PERSISTENT are crash-safe and will not be dropped when
- * released. Slots marked as EPHEMERAL will be dropped when released or after
- * restarts.  Slots marked TEMPORARY will be dropped at the end of a session
- * or on error.
- *
- * EPHEMERAL is used as a not-quite-ready state when creating persistent
- * slots.  EPHEMERAL slots can be made PERSISTENT by calling
- * ReplicationSlotPersist().  For a slot that goes away at the end of a
- * session, TEMPORARY is the appropriate choice.
- */
-typedef enum ReplicationSlotPersistency
-{
-	RS_PERSISTENT,
-	RS_EPHEMERAL,
-	RS_TEMPORARY
-} ReplicationSlotPersistency;
-
-/*
- * Slots can be invalidated, e.g. due to max_slot_wal_keep_size. If so, the
- * 'invalidated' field is set to a value other than _NONE.
- */
-typedef enum ReplicationSlotInvalidationCause
-{
-	RS_INVAL_NONE,
-	/* required WAL has been removed */
-	RS_INVAL_WAL_REMOVED,
-	/* required rows have been removed */
-	RS_INVAL_HORIZON,
-	/* wal_level insufficient for slot */
-	RS_INVAL_WAL_LEVEL,
-} ReplicationSlotInvalidationCause;
-
-/*
- * On-Disk data of a replication slot, preserved across restarts.
- */
-typedef struct ReplicationSlotPersistentData
-{
-	/* The slot's identifier */
-	NameData	name;
-
-	/* database the slot is active on */
-	Oid			database;
-
-	/*
-	 * The slot's behaviour when being dropped (or restored after a crash).
-	 */
-	ReplicationSlotPersistency persistency;
-
-	/*
-	 * xmin horizon for data
-	 *
-	 * NB: This may represent a value that hasn't been written to disk yet;
-	 * see notes for effective_xmin, below.
-	 */
-	TransactionId xmin;
-
-	/*
-	 * xmin horizon for catalog tuples
-	 *
-	 * NB: This may represent a value that hasn't been written to disk yet;
-	 * see notes for effective_xmin, below.
-	 */
-	TransactionId catalog_xmin;
-
-	/* oldest LSN that might be required by this replication slot */
-	XLogRecPtr	restart_lsn;
-
-	/* RS_INVAL_NONE if valid, or the reason for having been invalidated */
-	ReplicationSlotInvalidationCause invalidated;
-
-	/*
-	 * Oldest LSN that the client has acked receipt for.  This is used as the
-	 * start_lsn point in case the client doesn't specify one, and also as a
-	 * safety measure to jump forwards in case the client specifies a
-	 * start_lsn that's further in the past than this value.
-	 */
-	XLogRecPtr	confirmed_flush;
-
-	/*
-	 * LSN at which we enabled two_phase commit for this slot or LSN at which
-	 * we found a consistent point at the time of slot creation.
-	 */
-	XLogRecPtr	two_phase_at;
-
-	/*
-	 * Allow decoding of prepared transactions?
-	 */
-	bool		two_phase;
-
-	/* plugin name */
-	NameData	plugin;
-} ReplicationSlotPersistentData;
-
 /*
  * Shared memory state of a single replication slot.
  *
diff --git a/src/include/replication/slot_common.h b/src/include/replication/slot_common.h
new file mode 100644
index 0000000000..ff4556ff22
--- /dev/null
+++ b/src/include/replication/slot_common.h
@@ -0,0 +1,147 @@
+/*-------------------------------------------------------------------------
+ * slot_common.h
+ *	   Replication slot management.
+ *
+ * Copyright (c) 2021, PostgreSQL Global Development Group
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef SLOT_COMMON_H
+#define SLOT_COMMON_H
+
+/*
+ * Slots can be invalidated, e.g. due to max_slot_wal_keep_size. If so, the
+ * 'invalidated' field is set to a value other than _NONE.
+ */
+typedef enum ReplicationSlotInvalidationCause
+{
+	RS_INVAL_NONE,
+	/* required WAL has been removed */
+	RS_INVAL_WAL_REMOVED,
+	/* required rows have been removed */
+	RS_INVAL_HORIZON,
+	/* wal_level insufficient for slot */
+	RS_INVAL_WAL_LEVEL,
+} ReplicationSlotInvalidationCause;
+
+/*
+ * Behaviour of replication slots, upon release or crash.
+ *
+ * Slots marked as PERSISTENT are crash-safe and will not be dropped when
+ * released. Slots marked as EPHEMERAL will be dropped when released or after
+ * restarts.  Slots marked TEMPORARY will be dropped at the end of a session
+ * or on error.
+ *
+ * EPHEMERAL is used as a not-quite-ready state when creating persistent
+ * slots.  EPHEMERAL slots can be made PERSISTENT by calling
+ * ReplicationSlotPersist().  For a slot that goes away at the end of a
+ * session, TEMPORARY is the appropriate choice.
+ */
+typedef enum ReplicationSlotPersistency
+{
+	RS_PERSISTENT,
+	RS_EPHEMERAL,
+	RS_TEMPORARY
+} ReplicationSlotPersistency;
+
+/*
+ * On-Disk data of a replication slot, preserved across restarts.
+ */
+typedef struct ReplicationSlotPersistentData
+{
+	/* The slot's identifier */
+	NameData	name;
+
+	/* database the slot is active on */
+	Oid			database;
+
+	/*
+	 * The slot's behaviour when being dropped (or restored after a crash).
+	 */
+	ReplicationSlotPersistency persistency;
+
+	/*
+	 * xmin horizon for data
+	 *
+	 * NB: This may represent a value that hasn't been written to disk yet;
+	 * see notes for effective_xmin, below.
+	 */
+	TransactionId xmin;
+
+	/*
+	 * xmin horizon for catalog tuples
+	 *
+	 * NB: This may represent a value that hasn't been written to disk yet;
+	 * see notes for effective_xmin, below.
+	 */
+	TransactionId catalog_xmin;
+
+	/* oldest LSN that might be required by this replication slot */
+	XLogRecPtr	restart_lsn;
+
+	/* RS_INVAL_NONE if valid, or the reason for having been invalidated */
+	ReplicationSlotInvalidationCause invalidated;
+
+	/*
+	 * Oldest LSN that the client has acked receipt for.  This is used as the
+	 * start_lsn point in case the client doesn't specify one, and also as a
+	 * safety measure to jump forwards in case the client specifies a
+	 * start_lsn that's further in the past than this value.
+	 */
+	XLogRecPtr	confirmed_flush;
+
+	/*
+	 * LSN at which we enabled two_phase commit for this slot or LSN at which
+	 * we found a consistent point at the time of slot creation.
+	 */
+	XLogRecPtr	two_phase_at;
+
+	/*
+	 * Allow decoding of prepared transactions?
+	 */
+	bool		two_phase;
+
+	/* plugin name */
+	NameData	plugin;
+} ReplicationSlotPersistentData;
+
+/*
+ * Replication slot on-disk data structure.
+ */
+typedef struct ReplicationSlotOnDisk
+{
+	/* first part of this struct needs to be version independent */
+
+	/* data not covered by checksum */
+	uint32		magic;
+	pg_crc32c	checksum;
+
+	/* data covered by checksum */
+	uint32		version;
+	uint32		length;
+
+	/*
+	 * The actual data in the slot that follows can differ based on the above
+	 * 'version'.
+	 */
+
+	ReplicationSlotPersistentData slotdata;
+} ReplicationSlotOnDisk;
+
+/* size of version independent data */
+#define ReplicationSlotOnDiskConstantSize \
+	offsetof(ReplicationSlotOnDisk, slotdata)
+/* size of the part of the slot not covered by the checksum */
+#define ReplicationSlotOnDiskNotChecksummedSize \
+	offsetof(ReplicationSlotOnDisk, version)
+/* size of the part covered by the checksum */
+#define ReplicationSlotOnDiskChecksummedSize \
+	sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
+/* size of the slot data that is version dependent */
+#define ReplicationSlotOnDiskV2Size \
+	sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
+
+#define SLOT_MAGIC		0x1051CA1	/* format identifier */
+#define SLOT_VERSION	3		/* version for new files */
+
+#endif							/* SLOT_COMMON_H */
-- 
2.34.1

Reply via email to