From 0392f3e3f5c371e0b11c853a419f48b5e402660b Mon Sep 17 00:00:00 2001
From: Melih Mutlu <m.melihmutlu@gmail.com>
Date: Thu, 13 Oct 2022 17:05:45 +0300
Subject: [PATCH 1/2] Add replication protocol cmd to create a snapshot

Introduced REPLICATION_SLOT_SNAPSHOT to be able to create and use a
snapshot without creating a new replication slot, but by using an
existing slot.

REPLICATION_SLOT_SNAPSHOT simply does what CREATE_REPLICATION_SLOT does
without creating a new replication slot.

REPLICATION_SLOT_SNAPSHOT command imports the snapshot into the current
transaction and returns consistent_point. The changes earlier than the
consistent_point will be applied by importing the snapshot. All changes
later than the consistent_point will be available to be consumed from
the replication slot.

This is useful for reusing replication slots in logical replication.
Otherwise, tablesync workers cannot start from a consistent point to copy a relation and then apply changes by consuming from replication slot.
---
 doc/src/sgml/protocol.sgml                    | 32 ++++++
 .../libpqwalreceiver/libpqwalreceiver.c       | 69 ++++++++++++-
 src/backend/replication/logical/logical.c     | 39 +++++++-
 .../replication/logical/logicalfuncs.c        |  1 +
 src/backend/replication/repl_gram.y           | 18 +++-
 src/backend/replication/repl_scanner.l        |  2 +
 src/backend/replication/slotfuncs.c           |  1 +
 src/backend/replication/walsender.c           | 97 ++++++++++++++++++-
 src/include/nodes/replnodes.h                 | 11 +++
 src/include/replication/logical.h             |  1 +
 src/include/replication/walreceiver.h         | 13 +++
 src/tools/pgindent/typedefs.list              |  1 +
 12 files changed, 281 insertions(+), 4 deletions(-)

diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 93fc7167d4..93a3867996 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -2613,6 +2613,38 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
      </listitem>
     </varlistentry>
 
+    <varlistentry id="protocol-replication-replication-slot-snapshot">
+     <term><literal>REPLICATION_SLOT_SNAPSHOT</literal> <replaceable class="parameter">slot_name</replaceable> [ ( <replaceable class="parameter">option</replaceable> [, ...] ) ]
+      <indexterm><primary>REPLICATION_SLOT_SNAPSHOT</primary></indexterm>
+     </term>
+     <listitem>
+      <para>
+       Creates a snapshot including all the changes from the replication slot until
+       the point at which the replication slot becomes consistent. Then the snapshot
+       is used in the currenct transaction. This command is currently only supported
+       for logical replication.
+       slots.
+      </para>
+
+      <para>
+       In response to this command, the server will return a one-row result set,
+       containing the following field:
+       <variablelist>
+        <varlistentry>
+         <term><literal>consistent_point</literal> (<type>text</type>)</term>
+         <listitem>
+          <para>
+           The WAL location at which the slot became consistent.  This is the
+           earliest location from which streaming can start on this replication
+           slot.
+          </para>
+         </listitem>
+        </varlistentry>
+       </variablelist>
+      </para>
+     </listitem>
+    </varlistentry>
+
     <varlistentry id="protocol-replication-base-backup" xreflabel="BASE_BACKUP">
      <term><literal>BASE_BACKUP</literal> [ ( <replaceable class="parameter">option</replaceable> [, ...] ) ]
       <indexterm><primary>BASE_BACKUP</primary></indexterm>
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index c40c6220db..9213fd2b1b 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -80,6 +80,8 @@ static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn,
 									   const int nRetTypes,
 									   const Oid *retTypes);
 static void libpqrcv_disconnect(WalReceiverConn *conn);
+static void libpqrcv_slot_snapshot(WalReceiverConn *conn, char *slotname,
+								   const WalRcvStreamOptions *options, XLogRecPtr *lsn);
 
 static WalReceiverFunctionsType PQWalReceiverFunctions = {
 	.walrcv_connect = libpqrcv_connect,
@@ -96,7 +98,8 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
 	.walrcv_create_slot = libpqrcv_create_slot,
 	.walrcv_get_backend_pid = libpqrcv_get_backend_pid,
 	.walrcv_exec = libpqrcv_exec,
-	.walrcv_disconnect = libpqrcv_disconnect
+	.walrcv_disconnect = libpqrcv_disconnect,
+	.walrcv_slot_snapshot = libpqrcv_slot_snapshot
 };
 
 /* Prototypes for private functions */
@@ -968,6 +971,70 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
 	return snapshot;
 }
 
+/*
+ * TODO
+ */
+static void
+libpqrcv_slot_snapshot(WalReceiverConn *conn,
+					   char *slotname,
+					   const WalRcvStreamOptions *options,
+					   XLogRecPtr *lsn)
+{
+	StringInfoData cmd;
+	PGresult   *res;
+	char	   *pubnames_str;
+	List	   *pubnames;
+	char	   *pubnames_literal;
+
+	initStringInfo(&cmd);
+
+	/* Build the command. */
+	appendStringInfo(&cmd, "REPLICATION_SLOT_SNAPSHOT \"%s\"", slotname);
+	appendStringInfoString(&cmd, " (");
+	appendStringInfo(&cmd, " proto_version '%u'",
+					 options->proto.logical.proto_version);
+
+	/* Add publication names. */
+	pubnames = options->proto.logical.publication_names;
+	pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
+	if (!pubnames_str)
+		ereport(ERROR,
+				(errcode(ERRCODE_OUT_OF_MEMORY),	/* likely guess */
+				 errmsg("could not start WAL streaming: %s",
+						pchomp(PQerrorMessage(conn->streamConn)))));
+	pubnames_literal = PQescapeLiteral(conn->streamConn, pubnames_str,
+									   strlen(pubnames_str));
+	if (!pubnames_literal)
+		ereport(ERROR,
+				(errcode(ERRCODE_OUT_OF_MEMORY),	/* likely guess */
+				 errmsg("could not start WAL streaming: %s",
+						pchomp(PQerrorMessage(conn->streamConn)))));
+	appendStringInfo(&cmd, ", publication_names %s", pubnames_literal);
+	PQfreemem(pubnames_literal);
+	pfree(pubnames_str);
+
+	appendStringInfoString(&cmd, " )");
+
+	/* Execute the command. */
+	res = libpqrcv_PQexec(conn->streamConn, cmd.data);
+	pfree(cmd.data);
+
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		PQclear(res);
+		ereport(ERROR,
+				(errcode(ERRCODE_PROTOCOL_VIOLATION),
+				 errmsg("Could not create a snapshot by replication slot \"%s\": %s",
+						slotname, pchomp(PQerrorMessage(conn->streamConn)))));
+	}
+
+	if (lsn)
+		*lsn = DatumGetLSN(DirectFunctionCall1Coll(pg_lsn_in, InvalidOid,
+												   CStringGetDatum(PQgetvalue(res, 0, 0))));
+
+	PQclear(res);
+}
+
 /*
  * Return PID of remote backend process.
  */
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 52d1fe6269..b62babe359 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -461,6 +461,10 @@ CreateInitDecodingContext(const char *plugin,
  * fast_forward
  *		bypass the generation of logical changes.
  *
+ * need_full_snapshot
+ * 		if true, create a snapshot able to read all tables,
+ * 		otherwise do not create any snapshot.
+ *
  * xl_routine
  *		XLogReaderRoutine used by underlying xlogreader
  *
@@ -479,6 +483,7 @@ LogicalDecodingContext *
 CreateDecodingContext(XLogRecPtr start_lsn,
 					  List *output_plugin_options,
 					  bool fast_forward,
+					  bool need_full_snapshot,
 					  XLogReaderRoutine *xl_routine,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
 					  LogicalOutputPluginWriterWrite do_write,
@@ -487,6 +492,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 	LogicalDecodingContext *ctx;
 	ReplicationSlot *slot;
 	MemoryContext old_context;
+	TransactionId xmin_horizon = InvalidTransactionId;
 
 	/* shorter lines... */
 	slot = MyReplicationSlot;
@@ -533,8 +539,39 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 		start_lsn = slot->data.confirmed_flush;
 	}
 
+
+	/*
+	 * We need to determine a safe xmin horizon to start decoding from if we
+	 * want to create a snapshot too. Otherwise we would end up with a
+	 * snapshot that cannot be imported since xmin value from the snapshot may
+	 * be less than the oldest safe xmin. To avoid this call
+	 * GetOldestSafeDecodingTransactionId() to return a safe xmin value, which
+	 * can be used while exporting/importing the snapshot.
+	 *
+	 * So we have to acquire the ProcArrayLock to prevent computation of new
+	 * xmin horizons by other backends, get the safe decoding xid, and inform
+	 * the slot machinery about the new limit. Once that's done the
+	 * ProcArrayLock can be released as the slot machinery now is protecting
+	 * against vacuum.
+	 */
+	if (need_full_snapshot)
+	{
+		LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+
+		SpinLockAcquire(&slot->mutex);
+		slot->effective_catalog_xmin = xmin_horizon;
+		slot->data.catalog_xmin = xmin_horizon;
+		slot->effective_xmin = xmin_horizon;
+		SpinLockRelease(&slot->mutex);
+
+		xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot);
+		ReplicationSlotsComputeRequiredXmin(true);
+
+		LWLockRelease(ProcArrayLock);
+	}
+
 	ctx = StartupDecodingContext(output_plugin_options,
-								 start_lsn, InvalidTransactionId, false,
+								 start_lsn, xmin_horizon, need_full_snapshot,
 								 fast_forward, xl_routine, prepare_write,
 								 do_write, update_progress);
 
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index fa1b641a2b..1191c70eb0 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -208,6 +208,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 		ctx = CreateDecodingContext(InvalidXLogRecPtr,
 									options,
 									false,
+									false,
 									XL_ROUTINE(.page_read = read_local_xlog_page,
 											   .segment_open = wal_segment_open,
 											   .segment_close = wal_segment_close),
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index 0c874e33cf..e5f0235d1e 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -65,6 +65,7 @@ Node *replication_parse_result;
 %token K_CREATE_REPLICATION_SLOT
 %token K_DROP_REPLICATION_SLOT
 %token K_TIMELINE_HISTORY
+%token K_REPLICATION_SLOT_SNAPSHOT
 %token K_WAIT
 %token K_TIMELINE
 %token K_PHYSICAL
@@ -80,7 +81,7 @@ Node *replication_parse_result;
 %type <node>	command
 %type <node>	base_backup start_replication start_logical_replication
 				create_replication_slot drop_replication_slot identify_system
-				read_replication_slot timeline_history show
+				read_replication_slot timeline_history show replication_slot_snapshot
 %type <list>	generic_option_list
 %type <defelt>	generic_option
 %type <uintval>	opt_timeline
@@ -114,6 +115,7 @@ command:
 			| read_replication_slot
 			| timeline_history
 			| show
+			| replication_slot_snapshot
 			;
 
 /*
@@ -307,6 +309,19 @@ timeline_history:
 				}
 			;
 
+/*
+ * REPLICATION_SLOT_SNAPSHOT %s options
+ */
+replication_slot_snapshot:
+			K_REPLICATION_SLOT_SNAPSHOT var_name plugin_options
+				{
+					ReplicationSlotSnapshotCmd *n = makeNode(ReplicationSlotSnapshotCmd);
+					n->slotname = $2;
+					n->options = $3;
+					$$ = (Node *) n;
+				}
+			;
+
 opt_physical:
 			K_PHYSICAL
 			| /* EMPTY */
@@ -400,6 +415,7 @@ ident_or_keyword:
 			| K_CREATE_REPLICATION_SLOT	{ $$ = "create_replication_slot"; }
 			| K_DROP_REPLICATION_SLOT		{ $$ = "drop_replication_slot"; }
 			| K_TIMELINE_HISTORY			{ $$ = "timeline_history"; }
+			| K_REPLICATION_SLOT_SNAPSHOT	{ $$ = "replication_slot_snapshot"; }
 			| K_WAIT						{ $$ = "wait"; }
 			| K_TIMELINE					{ $$ = "timeline"; }
 			| K_PHYSICAL					{ $$ = "physical"; }
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index cb467ca46f..1988a6203b 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -126,6 +126,7 @@ START_REPLICATION	{ return K_START_REPLICATION; }
 CREATE_REPLICATION_SLOT		{ return K_CREATE_REPLICATION_SLOT; }
 DROP_REPLICATION_SLOT		{ return K_DROP_REPLICATION_SLOT; }
 TIMELINE_HISTORY	{ return K_TIMELINE_HISTORY; }
+REPLICATION_SLOT_SNAPSHOT	{ return K_REPLICATION_SLOT_SNAPSHOT; }
 PHYSICAL			{ return K_PHYSICAL; }
 RESERVE_WAL			{ return K_RESERVE_WAL; }
 LOGICAL				{ return K_LOGICAL; }
@@ -303,6 +304,7 @@ replication_scanner_is_replication_command(void)
 		case K_DROP_REPLICATION_SLOT:
 		case K_READ_REPLICATION_SLOT:
 		case K_TIMELINE_HISTORY:
+		case K_REPLICATION_SLOT_SNAPSHOT:
 		case K_SHOW:
 			/* Yes; push back the first token so we can parse later. */
 			repl_pushed_back_token = first_token;
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 2f3c964824..b3ae11b2c8 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -478,6 +478,7 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
 		ctx = CreateDecodingContext(InvalidXLogRecPtr,
 									NIL,
 									true,	/* fast_forward */
+									false,
 									XL_ROUTINE(.page_read = read_local_xlog_page,
 											   .segment_open = wal_segment_open,
 											   .segment_close = wal_segment_close),
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 015ae2995d..70d926f4f0 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -238,6 +238,7 @@ static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd);
 static void DropReplicationSlot(DropReplicationSlotCmd *cmd);
 static void StartReplication(StartReplicationCmd *cmd);
 static void StartLogicalReplication(StartReplicationCmd *cmd);
+static void ReplicationSlotSnapshot(ReplicationSlotSnapshotCmd *cmd);
 static void ProcessStandbyMessage(void);
 static void ProcessStandbyReplyMessage(void);
 static void ProcessStandbyHSFeedbackMessage(void);
@@ -1280,7 +1281,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 	 * are reported early.
 	 */
 	logical_decoding_ctx =
-		CreateDecodingContext(cmd->startpoint, cmd->options, false,
+		CreateDecodingContext(cmd->startpoint, cmd->options, false, false,
 							  XL_ROUTINE(.page_read = logical_read_xlog_page,
 										 .segment_open = WalSndSegmentOpen,
 										 .segment_close = wal_segment_close),
@@ -1332,6 +1333,91 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 	EndCommand(&qc, DestRemote, false);
 }
 
+/*
+ * Create a snapshot from an existing replication slot.
+ */
+static void
+ReplicationSlotSnapshot(ReplicationSlotSnapshotCmd *cmd)
+{
+	Snapshot	snap;
+	LogicalDecodingContext *ctx;
+	char		xloc[MAXFNAMELEN];
+	DestReceiver *dest;
+	TupOutputState *tstate;
+	TupleDesc	tupdesc;
+	Datum		values[1];
+	bool		nulls[1] = {0};
+
+	Assert(!MyReplicationSlot);
+
+	if (!IsTransactionBlock())
+		ereport(ERROR,
+				(errmsg("%s must be called inside a transaction",
+						"REPLICATION_SLOT_SNAPSHOT ...")));
+
+	if (XactIsoLevel != XACT_REPEATABLE_READ)
+		ereport(ERROR,
+				(errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
+						"REPLICATION_SLOT_SNAPSHOT ...")));
+
+	if (FirstSnapshotSet)
+		ereport(ERROR,
+				(errmsg("%s must be called before any query",
+						"REPLICATION_SLOT_SNAPSHOT ...")));
+
+	if (IsSubTransaction())
+		ereport(ERROR,
+				(errmsg("%s must not be called in a subtransaction",
+						"REPLICATION_SLOT_SNAPSHOT ...")));
+
+	ReplicationSlotAcquire(cmd->slotname, false);
+
+	ctx = CreateDecodingContext(MyReplicationSlot->data.restart_lsn,
+								cmd->options,
+								false,
+								true,
+								XL_ROUTINE(.page_read = logical_read_xlog_page,
+										   .segment_open = WalSndSegmentOpen,
+										   .segment_close = wal_segment_close),
+								WalSndPrepareWrite, WalSndWriteData,
+								WalSndUpdateProgress);
+
+	/*
+	 * Signal that we don't need the timeout mechanism. We're just creating
+	 * the replication slot and don't yet accept feedback messages or send
+	 * keepalives. As we possibly need to wait for further WAL the walsender
+	 * would otherwise possibly be killed too soon.
+	 */
+	last_reply_timestamp = 0;
+
+	/* build initial snapshot, might take a while */
+	DecodingContextFindStartpoint(ctx);
+
+	snap = SnapBuildInitialSnapshot(ctx->snapshot_builder);
+	RestoreTransactionSnapshot(snap, MyProc);
+
+	/* Don't need the decoding context anymore */
+	FreeDecodingContext(ctx);
+
+	/* Create a tuple to send consisten WAL location */
+	snprintf(xloc, sizeof(xloc), "%X/%X",
+			 LSN_FORMAT_ARGS(MyReplicationSlot->data.confirmed_flush));
+
+	dest = CreateDestReceiver(DestRemoteSimple);
+	tupdesc = CreateTemplateTupleDesc(1);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "consistent_point",
+							  TEXTOID, -1, 0);
+	tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
+
+	/* consistent wal location */
+	values[0] = CStringGetTextDatum(xloc);
+
+	do_tup_output(tstate, values, nulls);
+	end_tup_output(tstate);
+
+	ReplicationSlotRelease();
+}
+
 /*
  * LogicalDecodingContext 'prepare_write' callback.
  *
@@ -1860,6 +1946,15 @@ exec_replication_command(const char *cmd_string)
 			}
 			break;
 
+		case T_ReplicationSlotSnapshotCmd:
+			{
+				cmdtag = "REPLICATION_SLOT_SNAPSHOT";
+				set_ps_display(cmdtag);
+				ReplicationSlotSnapshot((ReplicationSlotSnapshotCmd *) cmd_node);
+				EndReplicationCommand(cmdtag);
+				break;
+			}
+
 		default:
 			elog(ERROR, "unrecognized replication command node tag: %u",
 				 cmd_node->type);
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index 4321ba8f86..44a4580671 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -108,4 +108,15 @@ typedef struct TimeLineHistoryCmd
 	TimeLineID	timeline;
 } TimeLineHistoryCmd;
 
+/* ----------------------
+ *		REPLICATION_SLOT_SNAPSHOT command
+ * ----------------------
+ */
+typedef struct ReplicationSlotSnapshotCmd
+{
+	NodeTag		type;
+	char	   *slotname;
+	List	   *options;
+} ReplicationSlotSnapshotCmd;
+
 #endif							/* REPLNODES_H */
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 5f49554ea0..6535786a0e 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -125,6 +125,7 @@ extern LogicalDecodingContext *CreateInitDecodingContext(const char *plugin,
 extern LogicalDecodingContext *CreateDecodingContext(XLogRecPtr start_lsn,
 													 List *output_plugin_options,
 													 bool fast_forward,
+													 bool need_full_snapshot,
 													 XLogReaderRoutine *xl_routine,
 													 LogicalOutputPluginWriterPrepareWrite prepare_write,
 													 LogicalOutputPluginWriterWrite do_write,
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index decffe352d..bd11f9f31e 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -384,6 +384,16 @@ typedef WalRcvExecResult *(*walrcv_exec_fn) (WalReceiverConn *conn,
  */
 typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn);
 
+/*
+ * walrcv_slot_snapshot_fn
+ *
+ * Create a snapshot by an existing replication slot
+ */
+typedef void (*walrcv_slot_snapshot_fn) (WalReceiverConn *conn,
+										 char *slotname,
+										 const WalRcvStreamOptions *options,
+										 XLogRecPtr *lsn);
+
 typedef struct WalReceiverFunctionsType
 {
 	walrcv_connect_fn walrcv_connect;
@@ -401,6 +411,7 @@ typedef struct WalReceiverFunctionsType
 	walrcv_get_backend_pid_fn walrcv_get_backend_pid;
 	walrcv_exec_fn walrcv_exec;
 	walrcv_disconnect_fn walrcv_disconnect;
+	walrcv_slot_snapshot_fn walrcv_slot_snapshot;
 } WalReceiverFunctionsType;
 
 extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
@@ -435,6 +446,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
 	WalReceiverFunctions->walrcv_exec(conn, exec, nRetTypes, retTypes)
 #define walrcv_disconnect(conn) \
 	WalReceiverFunctions->walrcv_disconnect(conn)
+#define walrcv_slot_snapshot(conn, slotname, options, lsn) \
+	WalReceiverFunctions->walrcv_slot_snapshot(conn, slotname, options, lsn)
 
 static inline void
 walrcv_clear_result(WalRcvExecResult *walres)
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 23bafec5f7..209918c380 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2322,6 +2322,7 @@ ReplicationSlotCtlData
 ReplicationSlotOnDisk
 ReplicationSlotPersistency
 ReplicationSlotPersistentData
+ReplicationSlotSnapshotCmd
 ReplicationState
 ReplicationStateCtl
 ReplicationStateOnDisk
-- 
2.25.1

