On 11 March 2017 at 00:33, Petr Jelinek <petr.jeli...@2ndquadrant.com> wrote:
> On 09/03/17 18:48, Peter Eisentraut wrote:
>> On 3/6/17 05:27, Petr Jelinek wrote:
>>> And lastly I changed the automagic around exporting, not exporting and
>>> using the snapshot produced by CREATE_REPLICATION_SLOT into explicit
>>> parameter for the CREATE_REPLICATION_SLOT command (and added simple
>>> framework for adding more of those if needed in the future).
>>
>> It might be useful to make this into a separate patch, for clarity.
>>
>
> Okay here it is with this part split out. The first patch also help with
> Craig's logical decoding on standby so it definitely makes sense to be
> split.

Greatly appreciated.

Committing this in chunks makes sense anyway.

I've attached a slightly version that makes pg_recvlogical skip slot
export. The second patch is unchanged, use the copy from the
immediately prior mail.

-- 
 Craig Ringer                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
From 02af255a84736ac2783705055d6e998e476359af Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmo...@pjmodos.net>
Date: Thu, 9 Mar 2017 14:20:28 +0100
Subject: [PATCH 1/4] Add option to control snapshot export to
 CREATE_REPLICATION_SLOT

We used to export snapshots unconditionally in CREATE_REPLICATION_SLOT
in the replication protocol, but several upcoming patches want more control
over what happens with the slot.

This means that when we allow creation of replication slots on standbys, which
cannot export snapshots because they cannot allocate new XIDs, we don't have to
silently omit the snapshot creation.

It also allows clients like pg_recvlogical, which neither need nor can use the
exported snapshot, to suppress its creation. Since snapshot exporting can fail
this improves reliability.
---
 doc/src/sgml/logicaldecoding.sgml                  | 13 +++--
 doc/src/sgml/protocol.sgml                         | 16 +++++-
 src/backend/commands/subscriptioncmds.c            |  6 ++-
 .../libpqwalreceiver/libpqwalreceiver.c            | 15 ++++--
 src/backend/replication/repl_gram.y                | 43 ++++++++++++----
 src/backend/replication/repl_scanner.l             |  2 +
 src/backend/replication/walsender.c                | 58 ++++++++++++++++++++--
 src/bin/pg_basebackup/streamutil.c                 |  5 ++
 src/include/nodes/replnodes.h                      |  2 +-
 src/include/replication/walreceiver.h              |  6 +--
 10 files changed, 140 insertions(+), 26 deletions(-)

diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 03c2c69..2b7d6e9 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -268,11 +268,12 @@ $ pg_recvlogical -d postgres --slot test --drop-slot
     </para>
    </sect2>
 
-   <sect2>
+   <sect2 id="logicaldecoding-snapshot-export" xreflabel="Exported Snapshots (Logical Decoding)">
     <title>Exported Snapshots</title>
     <para>
-     When a new replication slot is created using the streaming replication interface,
-     a snapshot is exported
+     When <link linkend="protocol-replication-create-slot">a new replication
+     slot is created using the streaming replication interface</>, a snapshot
+     is exported
      (see <xref linkend="functions-snapshot-synchronization">), which will show
      exactly the state of the database after which all changes will be
      included in the change stream. This can be used to create a new replica by
@@ -282,6 +283,12 @@ $ pg_recvlogical -d postgres --slot test --drop-slot
      database's state at that point in time, which afterwards can be updated
      using the slot's contents without losing any changes.
     </para>
+    <para>
+     Creation of a snapshot is not always possible - in particular, it will
+     fail when connected to a hot standby. Applications that do not require
+     snapshot export may suppress it with the <literal>NOEXPORT_SNAPSHOT</>
+     option.
+    </para>
    </sect2>
   </sect1>
 
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 3d6e8ee..95603d3 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -1487,7 +1487,7 @@ The commands accepted in walsender mode are:
   </varlistentry>
 
   <varlistentry>
-   <term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</> [ <literal>TEMPORARY</> ] { <literal>PHYSICAL</> [ <literal>RESERVE_WAL</> ] | <literal>LOGICAL</> <replaceable class="parameter">output_plugin</> }
+   <term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</> [ <literal>TEMPORARY</> ] { <literal>PHYSICAL</> [ <literal>RESERVE_WAL</> ] | <literal>LOGICAL</> <replaceable class="parameter">output_plugin</> [ <literal>EXPORT_SNAPSHOT</> | <literal>NOEXPORT_SNAPSHOT</> ] }
      <indexterm><primary>CREATE_REPLICATION_SLOT</primary></indexterm>
     </term>
     <listitem>
@@ -1538,6 +1538,20 @@ The commands accepted in walsender mode are:
         </para>
        </listitem>
       </varlistentry>
+      <varlistentry>
+       <term><literal>EXPORT_SNAPSHOT</></term>
+       <term><literal>NOEXPORT_SNAPSHOT</></term>
+       <listitem>
+        <para>
+         Decides what to do with the snapshot created during logical slot
+         initialization. <literal>EXPORT_SNAPSHOT</>, which is the
+         default, will export the snapshot for use in other sessions. This
+         option can't be used inside a transaction. The
+         <literal>NOEXPORT_SNAPSHOT</> will just use the snapshot for logical
+         decoding as normal but won't do anything else with it.
+        </para>
+       </listitem>
+      </varlistentry>
      </variablelist>
 
      <para>
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 0036d99..33ccc08 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -314,7 +314,11 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 
 		PG_TRY();
 		{
-			walrcv_create_slot(wrconn, slotname, false, &lsn);
+			/*
+			 * Create permanent slot for the subscription, we won't use
+			 * the initial snapshot for anything so no need to export it.
+			 */
+			walrcv_create_slot(wrconn, slotname, false, false, &lsn);
 			ereport(NOTICE,
 					(errmsg("created replication slot \"%s\" on publisher",
 							slotname)));
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index ebadf36..cd2e578 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -68,6 +68,7 @@ static void libpqrcv_send(WalReceiverConn *conn, const char *buffer,
 static char *libpqrcv_create_slot(WalReceiverConn *conn,
 								  const char *slotname,
 								  bool temporary,
+								  bool export_snapshot,
 								  XLogRecPtr *lsn);
 static bool libpqrcv_command(WalReceiverConn *conn,
 							 const char *cmd, char **err);
@@ -720,7 +721,7 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
  */
 static char *
 libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
-					 bool temporary, XLogRecPtr *lsn)
+					 bool temporary, bool export_snapshot, XLogRecPtr *lsn)
 {
 	PGresult	   *res;
 	StringInfoData	cmd;
@@ -728,13 +729,19 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
 
 	initStringInfo(&cmd);
 
-	appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\" ", slotname);
+	appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\"", slotname);
 
 	if (temporary)
-		appendStringInfo(&cmd, "TEMPORARY ");
+		appendStringInfo(&cmd, " TEMPORARY");
 
 	if (conn->logical)
-		appendStringInfo(&cmd, "LOGICAL pgoutput");
+	{
+		appendStringInfo(&cmd, " LOGICAL pgoutput");
+		if (export_snapshot)
+			appendStringInfo(&cmd, " EXPORT_SNAPSHOT");
+		else
+			appendStringInfo(&cmd, " NOEXPORT_SNAPSHOT");
+	}
 
 	res = libpqrcv_PQexec(conn->streamConn, cmd.data);
 	pfree(cmd.data);
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index b35d0f0..f1e43bc 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -79,6 +79,8 @@ Node *replication_parse_result;
 %token K_SLOT
 %token K_RESERVE_WAL
 %token K_TEMPORARY
+%token K_EXPORT_SNAPSHOT
+%token K_NOEXPORT_SNAPSHOT
 
 %type <node>	command
 %type <node>	base_backup start_replication start_logical_replication
@@ -91,7 +93,9 @@ Node *replication_parse_result;
 %type <defelt>	plugin_opt_elem
 %type <node>	plugin_opt_arg
 %type <str>		opt_slot var_name
-%type <boolval>	opt_reserve_wal opt_temporary
+%type <boolval>	opt_temporary
+%type <list>	create_slot_opt_list
+%type <defelt>	create_slot_opt
 
 %%
 
@@ -202,18 +206,18 @@ base_backup_opt:
 
 create_replication_slot:
 			/* CREATE_REPLICATION_SLOT slot TEMPORARY PHYSICAL RESERVE_WAL */
-			K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_PHYSICAL opt_reserve_wal
+			K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_PHYSICAL create_slot_opt_list
 				{
 					CreateReplicationSlotCmd *cmd;
 					cmd = makeNode(CreateReplicationSlotCmd);
 					cmd->kind = REPLICATION_KIND_PHYSICAL;
 					cmd->slotname = $2;
 					cmd->temporary = $3;
-					cmd->reserve_wal = $5;
+					cmd->options = $5;
 					$$ = (Node *) cmd;
 				}
 			/* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL plugin */
-			| K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT
+			| K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT create_slot_opt_list
 				{
 					CreateReplicationSlotCmd *cmd;
 					cmd = makeNode(CreateReplicationSlotCmd);
@@ -221,10 +225,36 @@ create_replication_slot:
 					cmd->slotname = $2;
 					cmd->temporary = $3;
 					cmd->plugin = $5;
+					cmd->options = $6;
 					$$ = (Node *) cmd;
 				}
 			;
 
+create_slot_opt_list:
+			create_slot_opt_list create_slot_opt
+				{ $$ = lappend($1, $2); }
+			| /* EMPTY */
+				{ $$ = NIL; }
+			;
+
+create_slot_opt:
+			K_EXPORT_SNAPSHOT
+				{
+				  $$ = makeDefElem("export_snapshot",
+								   (Node *)makeInteger(TRUE), -1);
+				}
+			| K_NOEXPORT_SNAPSHOT
+				{
+				  $$ = makeDefElem("export_snapshot",
+								   (Node *)makeInteger(FALSE), -1);
+				}
+			| K_RESERVE_WAL
+				{
+				  $$ = makeDefElem("reserve_wal",
+								   (Node *)makeInteger(TRUE), -1);
+				}
+			;
+
 /* DROP_REPLICATION_SLOT slot */
 drop_replication_slot:
 			K_DROP_REPLICATION_SLOT IDENT
@@ -291,11 +321,6 @@ opt_physical:
 			| /* EMPTY */
 			;
 
-opt_reserve_wal:
-			K_RESERVE_WAL					{ $$ = true; }
-			| /* EMPTY */					{ $$ = false; }
-			;
-
 opt_temporary:
 			K_TEMPORARY						{ $$ = true; }
 			| /* EMPTY */					{ $$ = false; }
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index 37f8579..f56d41d 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -100,6 +100,8 @@ RESERVE_WAL			{ return K_RESERVE_WAL; }
 LOGICAL				{ return K_LOGICAL; }
 SLOT				{ return K_SLOT; }
 TEMPORARY			{ return K_TEMPORARY; }
+EXPORT_SNAPSHOT		{ return K_EXPORT_SNAPSHOT; }
+NOEXPORT_SNAPSHOT	{ return K_NOEXPORT_SNAPSHOT; }
 
 ","				{ return ','; }
 ";"				{ return ';'; }
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index dd3a936..127efec 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -51,6 +51,7 @@
 
 #include "catalog/pg_type.h"
 #include "commands/dbcommands.h"
+#include "commands/defrem.h"
 #include "funcapi.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
@@ -738,6 +739,48 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 }
 
 /*
+ * Process extra options given to CREATE_REPLICATION_SLOT.
+ */
+static void
+parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
+						   bool *reserve_wal,
+						   bool *export_snapshot)
+{
+	ListCell   *lc;
+	bool		snapshot_action_given = false;
+	bool		reserve_wal_given = false;
+
+	/* Parse options */
+	foreach (lc, cmd->options)
+	{
+		DefElem    *defel = (DefElem *) lfirst(lc);
+
+		if (strcmp(defel->defname, "export_snapshot") == 0)
+		{
+			if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+
+			snapshot_action_given = true;
+			*export_snapshot = defGetBoolean(defel);
+		}
+		else if (strcmp(defel->defname, "reserve_wal") == 0)
+		{
+			if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+
+			reserve_wal_given = true;
+			*reserve_wal = true;
+		}
+		else
+			elog(ERROR, "unrecognized option: %s", defel->defname);
+	}
+}
+
+/*
  * Create a new replication slot.
  */
 static void
@@ -746,6 +789,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 	const char *snapshot_name = NULL;
 	char		xpos[MAXFNAMELEN];
 	char	   *slot_name;
+	bool		reserve_wal = false;
+	bool		export_snapshot = true;
 	DestReceiver *dest;
 	TupOutputState *tstate;
 	TupleDesc	tupdesc;
@@ -754,6 +799,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 
 	Assert(!MyReplicationSlot);
 
+	parseCreateReplSlotOptions(cmd, &reserve_wal, &export_snapshot);
+
 	/* setup state for XLogReadPage */
 	sendTimeLineIsHistoric = false;
 	sendTimeLine = ThisTimeLineID;
@@ -799,10 +846,13 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 		DecodingContextFindStartpoint(ctx);
 
 		/*
-		 * Export a plain (not of the snapbuild.c type) snapshot to the user
-		 * that can be imported into another session.
+		 * Export the snapshot if we've been asked to do so.
+		 *
+		 * NB. We will convert the snapbuild.c kind of snapshot to normal
+		 * snapshot when doing this.
 		 */
-		snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
+		if (export_snapshot)
+			snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
 
 		/* don't need the decoding context anymore */
 		FreeDecodingContext(ctx);
@@ -810,7 +860,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 		if (!cmd->temporary)
 			ReplicationSlotPersist();
 	}
-	else if (cmd->kind == REPLICATION_KIND_PHYSICAL && cmd->reserve_wal)
+	else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
 	{
 		ReplicationSlotReserveWal();
 
diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c
index 1fe42ef..507da5e 100644
--- a/src/bin/pg_basebackup/streamutil.c
+++ b/src/bin/pg_basebackup/streamutil.c
@@ -338,8 +338,13 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
 		appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" PHYSICAL",
 						  slot_name);
 	else
+	{
 		appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",
 						  slot_name, plugin);
+		if (PQserverVersion(conn) >= 100000)
+			/* pg_recvlogical doesn't use an exported snapshot, so suppress */
+			appendPQExpBuffer(query, " NOEXPORT_SNAPSHOT");
+	}
 
 	res = PQexec(conn, query->data);
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index f27354f..996da3c 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -56,7 +56,7 @@ typedef struct CreateReplicationSlotCmd
 	ReplicationKind kind;
 	char	   *plugin;
 	bool		temporary;
-	bool		reserve_wal;
+	List	   *options;
 } CreateReplicationSlotCmd;
 
 
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 0857bdc..78e577c 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -183,7 +183,7 @@ typedef void (*walrcv_send_fn) (WalReceiverConn *conn, const char *buffer,
 								int nbytes);
 typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
 										const char *slotname, bool temporary,
-										XLogRecPtr *lsn);
+										bool export_snapshot, XLogRecPtr *lsn);
 typedef bool (*walrcv_command_fn) (WalReceiverConn *conn, const char *cmd,
 								   char **err);
 typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn);
@@ -224,8 +224,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
 	WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd)
 #define walrcv_send(conn, buffer, nbytes) \
 	WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
-#define walrcv_create_slot(conn, slotname, temporary, lsn) \
-	WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, lsn)
+#define walrcv_create_slot(conn, slotname, temporary, export_snapshot, lsn) \
+	WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, export_snapshot, lsn)
 #define walrcv_command(conn, cmd, err) \
 	WalReceiverFunctions->walrcv_command(conn, cmd, err)
 #define walrcv_disconnect(conn) \
-- 
2.5.5

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to