On Fri, 25 Aug 2023 at 17:40, vignesh C <[email protected]> wrote:
>
> On Sat, 19 Aug 2023 at 11:53, Amit Kapila <[email protected]> wrote:
> >
> > It's entirely possible for a logical slot to have a confirmed_flush
> > LSN higher than the last value saved on disk while not being marked as
> > dirty. It's currently not a problem to lose that value during a clean
> > shutdown / restart cycle but to support the upgrade of logical slots
> > [1] (see latest patch at [2]), we seem to rely on that value being
> > properly persisted to disk. During the upgrade, we need to verify that
> > all the data prior to shudown_checkpoint for the logical slots has
> > been consumed, otherwise, the downstream may miss some data. Now, to
> > ensure the same, we are planning to compare the confirm_flush LSN
> > location with the latest shudown_checkpoint location which means that
> > the confirm_flush LSN should be updated after restart.
> >
> > I think this is inefficient even without an upgrade because, after the
> > restart, this may lead to decoding some data again. Say, we process
> > some transactions for which we didn't send anything downstream (the
> > changes got filtered) but the confirm_flush LSN is updated due to
> > keepalives. As we don't flush the latest value of confirm_flush LSN,
> > it may lead to processing the same changes again.
>
> I was able to test and verify that we were not processing the same
> changes again.
> Note: The 0001-Add-logs-to-skip-transaction-filter-insert-operation.patch
> has logs to print if a decode transaction is skipped and also a log to
> mention if any operation is filtered.
> The test.sh script has the steps for a) setting up logical replication
> for a table b) perform insert on table that need to be published (this
> will be replicated to the subscriber) c) perform insert on a table
> that will not be published (this insert will be filtered, it will not
> be replicated) d) sleep for 5 seconds e) stop the server f) start the
> server
> I used the following steps, do the following in HEAD:
> a) Apply 0001-Add-logs-to-skip-transaction-filter-insert-operation.patch
> patch in Head and build the binaries b) execute test.sh c) view N1.log
> file to see that the insert operations were filtered again by seeing
> the following logs:
> LOG: Filter insert for table tbl2
> ...
> ===restart===
> ...
> LOG: Skipping transaction 0/156AD10 as start decode at is greater 0/156AE40
> ...
> LOG: Filter insert for table tbl2
>
> We can see that the insert operations on tbl2 which was filtered
> before server was stopped is again filtered after restart too in HEAD.
>
> Lets see that the same changes were not processed again with patch:
> a) Apply v4-0001-Persist-to-disk-logical-slots-during-a-shutdown-c.patch
> from [1] also apply
> 0001-Add-logs-to-skip-transaction-filter-insert-operation.patch patch
> and build the binaries b) execute test.sh c) view N1.log file to see
> that the insert operations were skipped after restart of server by
> seeing the following logs:
> LOG: Filter insert for table tbl2
> ...
> ===restart===
> ...
> Skipping transaction 0/156AD10 as start decode at is greater 0/156AFB0
> ...
> Skipping transaction 0/156AE80 as start decode at is greater 0/156AFB0
>
> We can see that the insert operations on tbl2 are not processed again
> after restart with the patch.
Here is another way to test using pg_replslotdata approach that was
proposed earlier at [1].
I have rebased this on top of HEAD and the v5 version for the same is attached.
We can use the same test as test.sh shared at [2].
When executed with HEAD, it was noticed that confirmed_flush points to
WAL location before both the transaction:
slot_name slot_type datoid persistency xmin catalog_xmin
restart_lsn confirmed_flush two_phase_at two_phase
plugin
--------- --------- ------ ---------- ----
----------- ----------- ---------------
------------ --------- ------
sub logical 5 persistent 0
735 0/1531E28 0/1531E60 0/0
0 pgoutput
WAL record information generated using pg_walinspect for various
records at and after confirmed_flush WAL 0/1531E60:
row_number | start_lsn | end_lsn | prev_lsn | xid |
resource_manager | record_type | record_length |
main_data_length | fpi_length |
description
|
block_ref
------------+-----------+-----------+-----------+-----+------------------+---------------------+---------------+------------------+------------+-------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------
1 | 0/1531E60 | 0/1531EA0 | 0/1531E28 | 0 | Heap2
| PRUNE | 57 | 9 |
0 | snapshotConflictHorizon: 0, nredirected: 0, ndead: 1, nunused: 0,
redirected: [], dead: [1], unused: []
|
blkref #0: rel 1663/5/1255 fork main blk 58
2 | 0/1531EA0 | 0/1531EE0 | 0/1531E60 | 735 | Heap
| INSERT+INIT | 59 | 3 |
0 | off: 1, flags: 0x08
|
blkref #0: rel 1663/5/16384 fork main blk 0
3 | 0/1531EE0 | 0/1531F20 | 0/1531EA0 | 735 | Heap
| INSERT | 59 | 3 |
0 | off: 2, flags: 0x08
|
blkref #0: rel 1663/5/16384 fork main blk 0
4 | 0/1531F20 | 0/1531F60 | 0/1531EE0 | 735 | Heap
| INSERT | 59 | 3 |
0 | off: 3, flags: 0x08
|
blkref #0: rel 1663/5/16384 fork main blk 0
5 | 0/1531F60 | 0/1531FA0 | 0/1531F20 | 735 | Heap
| INSERT | 59 | 3 |
0 | off: 4, flags: 0x08
|
blkref #0: rel 1663/5/16384 fork main blk 0
6 | 0/1531FA0 | 0/1531FE0 | 0/1531F60 | 735 | Heap
| INSERT | 59 | 3 |
0 | off: 5, flags: 0x08
|
blkref #0: rel 1663/5/16384 fork main blk 0
7 | 0/1531FE0 | 0/1532028 | 0/1531FA0 | 735 | Transaction
| COMMIT | 46 | 20 |
0 | 2023-08-27 23:22:17.161215+05:30
|
8 | 0/1532028 | 0/1532068 | 0/1531FE0 | 736 | Heap
| INSERT+INIT | 59 | 3 |
0 | off: 1, flags: 0x08
|
blkref #0: rel 1663/5/16387 fork main blk 0
9 | 0/1532068 | 0/15320A8 | 0/1532028 | 736 | Heap
| INSERT | 59 | 3 |
0 | off: 2, flags: 0x08
|
blkref #0: rel 1663/5/16387 fork main blk 0
10 | 0/15320A8 | 0/15320E8 | 0/1532068 | 736 | Heap
| INSERT | 59 | 3 |
0 | off: 3, flags: 0x08
|
blkref #0: rel 1663/5/16387 fork main blk 0
11 | 0/15320E8 | 0/1532128 | 0/15320A8 | 736 | Heap
| INSERT | 59 | 3 |
0 | off: 4, flags: 0x08
|
blkref #0: rel 1663/5/16387 fork main blk 0
12 | 0/1532128 | 0/1532168 | 0/15320E8 | 736 | Heap
| INSERT | 59 | 3 |
0 | off: 5, flags: 0x08
|
blkref #0: rel 1663/5/16387 fork main blk 0
13 | 0/1532168 | 0/1532198 | 0/1532128 | 736 | Transaction
| COMMIT | 46 | 20 |
0 | 2023-08-27 23:22:17.174756+05:30
|
14 | 0/1532198 | 0/1532210 | 0/1532168 | 0 | XLOG
| CHECKPOINT_SHUTDOWN | 114 | 88 |
0 | redo 0/1532198; tli 1; prev tli 1; fpw true; xid 0:737; oid 16399;
multi 1; offset 0; oldest xid 723 in DB 1; oldest multi 1 in DB 1;
oldest/newest commit timestamp xid: 0/0; oldest running xid 0;
shutdown |
Whereas the same test executed with the patch applied shows that
confirmed_flush points to CHECKPOINT_SHUTDOWN record:
slot_name slot_type datoid persistency xmin catalog_xmin
restart_lsn confirmed_flush two_phase_at two_phase
plugin
--------- --------- ------ ----------- ---
----------- ----------- ---------------
----------- --------- ------
sub logical 5 persistent 0 735
0/1531E28 0/1532198 0/0 0
pgoutput
WAL record information generated using pg_walinspect for various
records at and after confirmed_flush WAL 0/1532198:
row_number | start_lsn | end_lsn | prev_lsn | xid |
resource_manager | record_type | record_length |
main_data_length | fpi_length |
description
|
block_ref
------------+-----------+-----------+-----------+-----+------------------+---------------------+---------------+------------------+------------+-------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------------------+-----------
1 | 0/1532198 | 0/1532210 | 0/1532168 | 0 | XLOG
| CHECKPOINT_SHUTDOWN | 114 | 88 |
0 | redo 0/1532198; tli 1; prev tli 1; fpw true; xid 0:737; oid 16399;
multi 1; offset 0; oldest xid 723 in DB 1; oldest multi 1 in DB 1;
oldest/newest commit timestamp xid: 0/0; oldest running xid 0;
shutdown |
(1 row)
[1] -
https://www.postgresql.org/message-id/flat/CALj2ACW0rV5gWK8A3m6_X62qH%2BVfaq5hznC%3Di0R5Wojt5%2Byhyw%40mail.gmail.com
[2] -
https://www.postgresql.org/message-id/CALDaNm2BboFuFVYxyzP4wkv7%3D8%2B_TwsD%2BugyGhtibTSF4m4XRg%40mail.gmail.com
Regards,
Vignesh
From 934f2d32bf6d3b3bb8ae0fcf334aa371fc95de19 Mon Sep 17 00:00:00 2001
From: Vignesh C <[email protected]>
Date: Sun, 27 Aug 2023 22:03:27 +0530
Subject: [PATCH v5] pg_replslotdata
TODO: Display invalidation information for replication slot.
---
src/backend/replication/slot.c | 39 ---
src/bin/Makefile | 1 +
src/bin/pg_replslotdata/.gitignore | 2 +
src/bin/pg_replslotdata/Makefile | 44 +++
src/bin/pg_replslotdata/nls.mk | 6 +
src/bin/pg_replslotdata/pg_replslotdata.c | 362 ++++++++++++++++++++++
src/bin/pg_replslotdata/t/001_basic.pl | 11 +
src/include/replication/slot.h | 97 +-----
src/include/replication/slot_common.h | 147 +++++++++
9 files changed, 574 insertions(+), 135 deletions(-)
create mode 100644 src/bin/pg_replslotdata/.gitignore
create mode 100644 src/bin/pg_replslotdata/Makefile
create mode 100644 src/bin/pg_replslotdata/nls.mk
create mode 100644 src/bin/pg_replslotdata/pg_replslotdata.c
create mode 100644 src/bin/pg_replslotdata/t/001_basic.pl
create mode 100644 src/include/replication/slot_common.h
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index bb09c4010f..567d61540a 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -53,45 +53,6 @@
#include "storage/procarray.h"
#include "utils/builtins.h"
-/*
- * Replication slot on-disk data structure.
- */
-typedef struct ReplicationSlotOnDisk
-{
- /* first part of this struct needs to be version independent */
-
- /* data not covered by checksum */
- uint32 magic;
- pg_crc32c checksum;
-
- /* data covered by checksum */
- uint32 version;
- uint32 length;
-
- /*
- * The actual data in the slot that follows can differ based on the above
- * 'version'.
- */
-
- ReplicationSlotPersistentData slotdata;
-} ReplicationSlotOnDisk;
-
-/* size of version independent data */
-#define ReplicationSlotOnDiskConstantSize \
- offsetof(ReplicationSlotOnDisk, slotdata)
-/* size of the part of the slot not covered by the checksum */
-#define ReplicationSlotOnDiskNotChecksummedSize \
- offsetof(ReplicationSlotOnDisk, version)
-/* size of the part covered by the checksum */
-#define ReplicationSlotOnDiskChecksummedSize \
- sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
-/* size of the slot data that is version dependent */
-#define ReplicationSlotOnDiskV2Size \
- sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
-
-#define SLOT_MAGIC 0x1051CA1 /* format identifier */
-#define SLOT_VERSION 3 /* version for new files */
-
/* Control array for replication slot management */
ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
diff --git a/src/bin/Makefile b/src/bin/Makefile
index 373077bf52..1db7dd00d4 100644
--- a/src/bin/Makefile
+++ b/src/bin/Makefile
@@ -23,6 +23,7 @@ SUBDIRS = \
pg_controldata \
pg_ctl \
pg_dump \
+ pg_replslotdata \
pg_resetwal \
pg_rewind \
pg_test_fsync \
diff --git a/src/bin/pg_replslotdata/.gitignore b/src/bin/pg_replslotdata/.gitignore
new file mode 100644
index 0000000000..13a4afb8ef
--- /dev/null
+++ b/src/bin/pg_replslotdata/.gitignore
@@ -0,0 +1,2 @@
+/pg_replslotdata
+/tmp_check/
diff --git a/src/bin/pg_replslotdata/Makefile b/src/bin/pg_replslotdata/Makefile
new file mode 100644
index 0000000000..69518ee53b
--- /dev/null
+++ b/src/bin/pg_replslotdata/Makefile
@@ -0,0 +1,44 @@
+#-------------------------------------------------------------------------
+#
+# Makefile for src/bin/pg_replslotdata
+#
+# Copyright (c) 1998-2021, PostgreSQL Global Development Group
+#
+# src/bin/pg_replslotdata/Makefile
+#
+#-------------------------------------------------------------------------
+
+PGFILEDESC = "pg_replslotdata - provides information about the replication slots from $PGDATA/pg_replslot/<slot_name> $PGDATA/pg_replslot/<slot_name>"
+PGAPPICON=win32
+
+subdir = src/bin/pg_replslotdata
+top_builddir = ../../..
+include $(top_builddir)/src/Makefile.global
+
+OBJS = \
+ $(WIN32RES) \
+ pg_replslotdata.o
+
+all: pg_replslotdata
+
+pg_replslotdata: $(OBJS) | submake-libpgport
+ $(CC) $(CFLAGS) $^ $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
+
+install: all installdirs
+ $(INSTALL_PROGRAM) pg_replslotdata$(X) '$(DESTDIR)$(bindir)/pg_replslotdata$(X)'
+
+installdirs:
+ $(MKDIR_P) '$(DESTDIR)$(bindir)'
+
+uninstall:
+ rm -f '$(DESTDIR)$(bindir)/pg_replslotdata$(X)'
+
+clean distclean maintainer-clean:
+ rm -f pg_replslotdata$(X) $(OBJS)
+ rm -rf tmp_check
+
+check:
+ $(prove_check)
+
+installcheck:
+ $(prove_installcheck)
diff --git a/src/bin/pg_replslotdata/nls.mk b/src/bin/pg_replslotdata/nls.mk
new file mode 100644
index 0000000000..74bee593c9
--- /dev/null
+++ b/src/bin/pg_replslotdata/nls.mk
@@ -0,0 +1,6 @@
+# src/bin/pg_replslotdata/nls.mk
+CATALOG_NAME = pg_replslotdata
+AVAIL_LANGUAGES = cs de el es fr ja ko pl ru sv tr uk vi zh_CN
+GETTEXT_FILES = $(FRONTEND_COMMON_GETTEXT_FILES) pg_replslotdata.c
+GETTEXT_TRIGGERS = $(FRONTEND_COMMON_GETTEXT_TRIGGERS)
+GETTEXT_FLAGS = $(FRONTEND_COMMON_GETTEXT_FLAGS)
diff --git a/src/bin/pg_replslotdata/pg_replslotdata.c b/src/bin/pg_replslotdata/pg_replslotdata.c
new file mode 100644
index 0000000000..aed5d6750c
--- /dev/null
+++ b/src/bin/pg_replslotdata/pg_replslotdata.c
@@ -0,0 +1,362 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_replslotdata.c - provides information about the replication slots
+ * from $PGDATA/pg_replslot/<slot_name>.
+ *
+ * Copyright (c) 2021, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/bin/pg_replslotdata/pg_replslotdata.c
+ *-------------------------------------------------------------------------
+ */
+/*
+ * We have to use postgres.h not postgres_fe.h here, because there's so much
+ * backend-only stuff in the XLOG include files we need. But we need a
+ * frontend-ish environment otherwise. Hence this ugly hack.
+ */
+#define FRONTEND 1
+
+#include "postgres.h"
+
+#include <dirent.h>
+#include <sys/stat.h>
+
+#include "access/xlog.h"
+#include "access/xlog_internal.h"
+#include "common/logging.h"
+#include "common/string.h"
+#include "getopt_long.h"
+#include "pg_getopt.h"
+#include "replication/slot_common.h"
+
+static bool verbose = false;
+
+static void process_replslots(void);
+static void read_and_display_repl_slot(const char *name);
+
+static void
+usage(const char *progname)
+{
+ printf(_("%s displays information about the replication slots from $PGDATA/pg_replslot/<slot_name>.\n\n"), progname);
+ printf(_("Usage:\n"));
+ printf(_(" %s [OPTION] [DATADIR]\n"), progname);
+ printf(_("\nOptions:\n"));
+ printf(_(" [-D, --pgdata=]DATADIR data directory\n"));
+ printf(_(" -V, --version output version information, then exit\n"));
+ printf(_(" -v, --verbose write a lot of output\n"));
+ printf(_(" -?, --help show this help, then exit\n"));
+ printf(_("\nIf no data directory (DATADIR) is specified, "
+ "the environment variable PGDATA\nis used.\n\n"));
+ printf(_("Report bugs to <%s>.\n"), PACKAGE_BUGREPORT);
+ printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
+}
+
+static void
+process_replslots(void)
+{
+ DIR *rsdir;
+ struct dirent *rsde;
+ uint32 cnt = 0;
+
+ rsdir = opendir("pg_replslot");
+ if (rsdir == NULL)
+ {
+ pg_log_error("could not open directory \"%s\": %m", "pg_replslot");
+ exit(1);
+ }
+
+ /* XXX: comment here about the format spefiiers */
+ printf("%-64s %9s %10s %11s %10s %12s %21s %21s %21s %21s %10s %20s\n"
+ "%-64s %9s %10s %11s %10s %12s %21s %21s %21s %21s %10s %20s\n",
+ "slot_name", "slot_type", "datoid", "persistency", "xmin", "catalog_xmin", "restart_lsn", "invalidated_at", "confirmed_flush", "two_phase_at", "two_phase", "plugin",
+ "---------", "---------", "------", "-----------", "----", "------------", "-----------", "--------------", "---------------", "------------", "---------", "------");
+
+ while (errno = 0, (rsde = readdir(rsdir)) != NULL)
+ {
+ struct stat statbuf;
+ char path[MAXPGPATH];
+
+ if (strcmp(rsde->d_name, ".") == 0 ||
+ strcmp(rsde->d_name, "..") == 0)
+ continue;
+
+ snprintf(path, sizeof(path), "pg_replslot/%s", rsde->d_name);
+
+ /* we're only creating directories here, skip if it's not our's */
+ if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
+ continue;
+
+ /* we crashed while a slot was being setup or deleted, clean up */
+ if (pg_str_endswith(rsde->d_name, ".tmp"))
+ {
+ pg_log_warning("server was crashed while the slot \"%s\" was being setup or deleted",
+ rsde->d_name);
+ continue;
+ }
+
+ /* looks like a slot in a normal state, restore */
+ read_and_display_repl_slot(rsde->d_name);
+ cnt++;
+ }
+
+ if (errno)
+ {
+ pg_log_error("could not read directory \"%s\": %m", "pg_replslot");
+ exit(1);
+ }
+
+ if (cnt == 0)
+ {
+ pg_log_info("no replication slots were found");
+ exit(0);
+ }
+
+ if (closedir(rsdir))
+ {
+ pg_log_error("could not close directory \"%s\": %m", "pg_replslot");
+ exit(1);
+ }
+}
+
+static void
+read_and_display_repl_slot(const char *name)
+{
+ ReplicationSlotOnDisk cp;
+ char slotdir[MAXPGPATH];
+ char path[MAXPGPATH];
+ char restart_lsn[NAMEDATALEN];
+ char confirmed_flush[NAMEDATALEN];
+ char two_phase_at[NAMEDATALEN];
+ char persistency[NAMEDATALEN];
+ int fd;
+ int readBytes;
+ pg_crc32c checksum;
+
+ /* delete temp file if it exists */
+ sprintf(slotdir, "pg_replslot/%s", name);
+ sprintf(path, "%s/state.tmp", slotdir);
+
+ fd = open(path, O_RDONLY | PG_BINARY, 0);
+
+ if (fd > 0)
+ {
+ pg_log_error("found temporary state file \"%s\": %m", path);
+ exit(1);
+ }
+
+ sprintf(path, "%s/state", slotdir);
+
+ if (verbose)
+ pg_log_info("reading replication slot from \"%s\"", path);
+
+ fd = open(path, O_RDONLY | PG_BINARY, 0);
+
+ /*
+ * We do not need to handle this as we are rename()ing the directory into
+ * place only after we fsync()ed the state file.
+ */
+ if (fd < 0)
+ {
+ pg_log_error("could not open file \"%s\": %m", path);
+ exit(1);
+ }
+
+ if (verbose)
+ pg_log_info("reading version independent replication slot state file");
+
+ /* read part of statefile that's guaranteed to be version independent */
+ readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
+ if (readBytes != ReplicationSlotOnDiskConstantSize)
+ {
+ if (readBytes < 0)
+ {
+ pg_log_error("could not read file \"%s\": %m", path);
+ exit(1);
+ }
+ else
+ {
+ pg_log_error("could not read file \"%s\": read %d of %zu",
+ path, readBytes,
+ (Size) ReplicationSlotOnDiskConstantSize);
+ exit(1);
+ }
+ }
+
+ /* verify magic */
+ if (cp.magic != SLOT_MAGIC)
+ {
+ pg_log_error("replication slot file \"%s\" has wrong magic number: %u instead of %u",
+ path, cp.magic, SLOT_MAGIC);
+ exit(1);
+ }
+
+ /* verify version */
+ if (cp.version != SLOT_VERSION)
+ {
+ pg_log_error("replication slot file \"%s\" has unsupported version %u",
+ path, cp.version);
+ exit(1);
+ }
+
+ /* boundary check on length */
+ if (cp.length != ReplicationSlotOnDiskV2Size)
+ {
+ pg_log_error("replication slot file \"%s\" has corrupted length %u",
+ path, cp.length);
+ exit(1);
+ }
+
+ if (verbose)
+ pg_log_info("reading the entire replication slot state file");
+
+ /* now that we know the size, read the entire file */
+ readBytes = read(fd,
+ (char *) &cp + ReplicationSlotOnDiskConstantSize,
+ cp.length);
+ if (readBytes != cp.length)
+ {
+ if (readBytes < 0)
+ {
+ pg_log_error("could not read file \"%s\": %m", path);
+ exit(1);
+ }
+ else
+ {
+ pg_log_error("could not read file \"%s\": read %d of %zu",
+ path, readBytes, (Size) cp.length);
+ exit(1);
+ }
+ }
+
+ if (close(fd) != 0)
+ {
+ pg_log_error("could not close file \"%s\": %m", path);
+ exit(1);
+ }
+
+ /* now verify the CRC */
+ INIT_CRC32C(checksum);
+ COMP_CRC32C(checksum,
+ (char *) &cp + ReplicationSlotOnDiskNotChecksummedSize,
+ ReplicationSlotOnDiskChecksummedSize);
+ FIN_CRC32C(checksum);
+
+ if (!EQ_CRC32C(checksum, cp.checksum))
+ {
+ pg_log_error("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
+ path, checksum, cp.checksum);
+ exit(1);
+ }
+
+ sprintf(restart_lsn, "%X/%X", LSN_FORMAT_ARGS(cp.slotdata.restart_lsn));
+ sprintf(confirmed_flush, "%X/%X", LSN_FORMAT_ARGS(cp.slotdata.confirmed_flush));
+ sprintf(two_phase_at, "%X/%X", LSN_FORMAT_ARGS(cp.slotdata.two_phase_at));
+
+ if (cp.slotdata.persistency == RS_PERSISTENT)
+ sprintf(persistency, "persistent");
+ else if (cp.slotdata.persistency == RS_EPHEMERAL)
+ sprintf(persistency, "ephemeral");
+ else if (cp.slotdata.persistency == RS_TEMPORARY)
+ sprintf(persistency, "temporary");
+
+ /* display the slot information */
+ printf("%-64s %9s %10u %11s %10u %12u %21s %21s %21s %10d %20s\n",
+ NameStr(cp.slotdata.name),
+ cp.slotdata.database == InvalidOid ? "physical" : "logical",
+ cp.slotdata.database,
+ persistency,
+ cp.slotdata.xmin,
+ cp.slotdata.catalog_xmin,
+ restart_lsn,
+ confirmed_flush,
+ two_phase_at,
+ cp.slotdata.two_phase,
+ NameStr(cp.slotdata.plugin));
+}
+
+int
+main(int argc, char *argv[])
+{
+ static struct option long_options[] = {
+ {"pgdata", required_argument, NULL, 'D'},
+ {"verbose", no_argument, NULL, 'v'},
+ {NULL, 0, NULL, 0}
+ };
+
+ char *DataDir = NULL;
+ const char *progname;
+ int c;
+
+ pg_logging_init(argv[0]);
+ set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_replslotdata"));
+ progname = get_progname(argv[0]);
+
+ if (argc > 1)
+ {
+ if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
+ {
+ usage(progname);
+ exit(0);
+ }
+ if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
+ {
+ puts("pg_replslotdata (PostgreSQL) " PG_VERSION);
+ exit(0);
+ }
+ }
+
+ while ((c = getopt_long(argc, argv, "D:v", long_options, NULL)) != -1)
+ {
+ switch (c)
+ {
+ case 'D':
+ DataDir = optarg;
+ break;
+ case 'v':
+ verbose = true;
+ break;
+ default:
+ fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
+ exit(1);
+ }
+ }
+
+ if (DataDir == NULL)
+ {
+ if (optind < argc)
+ DataDir = argv[optind++];
+ else
+ DataDir = getenv("PGDATA");
+ }
+
+ /* complain if any arguments remain */
+ if (optind < argc)
+ {
+ pg_log_error("too many command-line arguments (first is \"%s\")",
+ argv[optind]);
+ fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+ progname);
+ exit(1);
+ }
+
+ if (DataDir == NULL)
+ {
+ pg_log_error("no data directory specified");
+ fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
+ exit(1);
+ }
+
+ if (verbose)
+ pg_log_info("data directory: \"%s\"", DataDir);
+
+ if (chdir(DataDir) < 0)
+ {
+ pg_log_error("could not change directory to \"%s\": %m",
+ DataDir);
+ exit(1);
+ }
+
+ process_replslots();
+
+ return 0;
+}
diff --git a/src/bin/pg_replslotdata/t/001_basic.pl b/src/bin/pg_replslotdata/t/001_basic.pl
new file mode 100644
index 0000000000..d6830dc2ac
--- /dev/null
+++ b/src/bin/pg_replslotdata/t/001_basic.pl
@@ -0,0 +1,11 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+use strict;
+use warnings;
+use PostgreSQL::Test::Utils;
+use Test::More tests => 8;
+
+program_help_ok('pg_replslotdata');
+program_version_ok('pg_replslotdata');
+program_options_handling_ok('pg_replslotdata');
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index a8a89dc784..a7d16a37a3 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -15,104 +15,9 @@
#include "storage/lwlock.h"
#include "storage/shmem.h"
#include "storage/spin.h"
+#include "replication/slot_common.h"
#include "replication/walreceiver.h"
-/*
- * Behaviour of replication slots, upon release or crash.
- *
- * Slots marked as PERSISTENT are crash-safe and will not be dropped when
- * released. Slots marked as EPHEMERAL will be dropped when released or after
- * restarts. Slots marked TEMPORARY will be dropped at the end of a session
- * or on error.
- *
- * EPHEMERAL is used as a not-quite-ready state when creating persistent
- * slots. EPHEMERAL slots can be made PERSISTENT by calling
- * ReplicationSlotPersist(). For a slot that goes away at the end of a
- * session, TEMPORARY is the appropriate choice.
- */
-typedef enum ReplicationSlotPersistency
-{
- RS_PERSISTENT,
- RS_EPHEMERAL,
- RS_TEMPORARY
-} ReplicationSlotPersistency;
-
-/*
- * Slots can be invalidated, e.g. due to max_slot_wal_keep_size. If so, the
- * 'invalidated' field is set to a value other than _NONE.
- */
-typedef enum ReplicationSlotInvalidationCause
-{
- RS_INVAL_NONE,
- /* required WAL has been removed */
- RS_INVAL_WAL_REMOVED,
- /* required rows have been removed */
- RS_INVAL_HORIZON,
- /* wal_level insufficient for slot */
- RS_INVAL_WAL_LEVEL,
-} ReplicationSlotInvalidationCause;
-
-/*
- * On-Disk data of a replication slot, preserved across restarts.
- */
-typedef struct ReplicationSlotPersistentData
-{
- /* The slot's identifier */
- NameData name;
-
- /* database the slot is active on */
- Oid database;
-
- /*
- * The slot's behaviour when being dropped (or restored after a crash).
- */
- ReplicationSlotPersistency persistency;
-
- /*
- * xmin horizon for data
- *
- * NB: This may represent a value that hasn't been written to disk yet;
- * see notes for effective_xmin, below.
- */
- TransactionId xmin;
-
- /*
- * xmin horizon for catalog tuples
- *
- * NB: This may represent a value that hasn't been written to disk yet;
- * see notes for effective_xmin, below.
- */
- TransactionId catalog_xmin;
-
- /* oldest LSN that might be required by this replication slot */
- XLogRecPtr restart_lsn;
-
- /* RS_INVAL_NONE if valid, or the reason for having been invalidated */
- ReplicationSlotInvalidationCause invalidated;
-
- /*
- * Oldest LSN that the client has acked receipt for. This is used as the
- * start_lsn point in case the client doesn't specify one, and also as a
- * safety measure to jump forwards in case the client specifies a
- * start_lsn that's further in the past than this value.
- */
- XLogRecPtr confirmed_flush;
-
- /*
- * LSN at which we enabled two_phase commit for this slot or LSN at which
- * we found a consistent point at the time of slot creation.
- */
- XLogRecPtr two_phase_at;
-
- /*
- * Allow decoding of prepared transactions?
- */
- bool two_phase;
-
- /* plugin name */
- NameData plugin;
-} ReplicationSlotPersistentData;
-
/*
* Shared memory state of a single replication slot.
*
diff --git a/src/include/replication/slot_common.h b/src/include/replication/slot_common.h
new file mode 100644
index 0000000000..ff4556ff22
--- /dev/null
+++ b/src/include/replication/slot_common.h
@@ -0,0 +1,147 @@
+/*-------------------------------------------------------------------------
+ * slot_common.h
+ * Replication slot management.
+ *
+ * Copyright (c) 2021, PostgreSQL Global Development Group
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef SLOT_COMMON_H
+#define SLOT_COMMON_H
+
+/*
+ * Slots can be invalidated, e.g. due to max_slot_wal_keep_size. If so, the
+ * 'invalidated' field is set to a value other than _NONE.
+ */
+typedef enum ReplicationSlotInvalidationCause
+{
+ RS_INVAL_NONE,
+ /* required WAL has been removed */
+ RS_INVAL_WAL_REMOVED,
+ /* required rows have been removed */
+ RS_INVAL_HORIZON,
+ /* wal_level insufficient for slot */
+ RS_INVAL_WAL_LEVEL,
+} ReplicationSlotInvalidationCause;
+
+/*
+ * Behaviour of replication slots, upon release or crash.
+ *
+ * Slots marked as PERSISTENT are crash-safe and will not be dropped when
+ * released. Slots marked as EPHEMERAL will be dropped when released or after
+ * restarts. Slots marked TEMPORARY will be dropped at the end of a session
+ * or on error.
+ *
+ * EPHEMERAL is used as a not-quite-ready state when creating persistent
+ * slots. EPHEMERAL slots can be made PERSISTENT by calling
+ * ReplicationSlotPersist(). For a slot that goes away at the end of a
+ * session, TEMPORARY is the appropriate choice.
+ */
+typedef enum ReplicationSlotPersistency
+{
+ RS_PERSISTENT,
+ RS_EPHEMERAL,
+ RS_TEMPORARY
+} ReplicationSlotPersistency;
+
+/*
+ * On-Disk data of a replication slot, preserved across restarts.
+ */
+typedef struct ReplicationSlotPersistentData
+{
+ /* The slot's identifier */
+ NameData name;
+
+ /* database the slot is active on */
+ Oid database;
+
+ /*
+ * The slot's behaviour when being dropped (or restored after a crash).
+ */
+ ReplicationSlotPersistency persistency;
+
+ /*
+ * xmin horizon for data
+ *
+ * NB: This may represent a value that hasn't been written to disk yet;
+ * see notes for effective_xmin, below.
+ */
+ TransactionId xmin;
+
+ /*
+ * xmin horizon for catalog tuples
+ *
+ * NB: This may represent a value that hasn't been written to disk yet;
+ * see notes for effective_xmin, below.
+ */
+ TransactionId catalog_xmin;
+
+ /* oldest LSN that might be required by this replication slot */
+ XLogRecPtr restart_lsn;
+
+ /* RS_INVAL_NONE if valid, or the reason for having been invalidated */
+ ReplicationSlotInvalidationCause invalidated;
+
+ /*
+ * Oldest LSN that the client has acked receipt for. This is used as the
+ * start_lsn point in case the client doesn't specify one, and also as a
+ * safety measure to jump forwards in case the client specifies a
+ * start_lsn that's further in the past than this value.
+ */
+ XLogRecPtr confirmed_flush;
+
+ /*
+ * LSN at which we enabled two_phase commit for this slot or LSN at which
+ * we found a consistent point at the time of slot creation.
+ */
+ XLogRecPtr two_phase_at;
+
+ /*
+ * Allow decoding of prepared transactions?
+ */
+ bool two_phase;
+
+ /* plugin name */
+ NameData plugin;
+} ReplicationSlotPersistentData;
+
+/*
+ * Replication slot on-disk data structure.
+ */
+typedef struct ReplicationSlotOnDisk
+{
+ /* first part of this struct needs to be version independent */
+
+ /* data not covered by checksum */
+ uint32 magic;
+ pg_crc32c checksum;
+
+ /* data covered by checksum */
+ uint32 version;
+ uint32 length;
+
+ /*
+ * The actual data in the slot that follows can differ based on the above
+ * 'version'.
+ */
+
+ ReplicationSlotPersistentData slotdata;
+} ReplicationSlotOnDisk;
+
+/* size of version independent data */
+#define ReplicationSlotOnDiskConstantSize \
+ offsetof(ReplicationSlotOnDisk, slotdata)
+/* size of the part of the slot not covered by the checksum */
+#define ReplicationSlotOnDiskNotChecksummedSize \
+ offsetof(ReplicationSlotOnDisk, version)
+/* size of the part covered by the checksum */
+#define ReplicationSlotOnDiskChecksummedSize \
+ sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
+/* size of the slot data that is version dependent */
+#define ReplicationSlotOnDiskV2Size \
+ sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
+
+#define SLOT_MAGIC 0x1051CA1 /* format identifier */
+#define SLOT_VERSION 3 /* version for new files */
+
+#endif /* SLOT_COMMON_H */
--
2.34.1