On 22 December 2016 at 14:21, Craig Ringer <cr...@2ndquadrant.com> wrote:

changes-in-0001-v2.diff shows the changes to PostgresNode.pm per
Michael's comments, and applies on top of 0001.

I also attach a patch to add a new CREATE_REPLICATION_SLOT option per
Petr's suggestion, so you can request a slot be created
WITHOUT_SNAPSHOT. This replaces the patch series's behaviour of
silently suppressing snapshot export when a slot was created on a
replica. It'll conflict (easily resolved) if applied on top of the
current series.

I have more to do before re-posting the full series, so waiting on
author at this point. The PostgresNode changes likely break later
tests, I'm just posting them so there's some progress here and so I
don't forget over the next few days' distraction.

-- 
 Craig Ringer                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm
index 28e9f0b..64a4633 100644
--- a/src/test/perl/PostgresNode.pm
+++ b/src/test/perl/PostgresNode.pm
@@ -93,7 +93,6 @@ use RecursiveCopy;
 use Socket;
 use Test::More;
 use TestLib ();
-use pg_lsn qw(parse_lsn);
 use Scalar::Util qw(blessed);
 
 our @EXPORT = qw(
@@ -1325,38 +1324,62 @@ sub run_log
        TestLib::run_log(@_);
 }
 
-=pod $node->lsn
+=pod $node->lsn(mode)
 
-Return pg_current_xlog_insert_location() or, on a replica,
-pg_last_xlog_replay_location().
+Look up xlog positions on the server:
+
+* insert position (master only, error on replica)
+* write position (master only, error on replica)
+* flush position
+* receive position (always undef on master)
+* replay position
+
+mode must be specified.
 
 =cut
 
 sub lsn
 {
-       my $self = shift;
-       return $self->safe_psql('postgres', 'select case when 
pg_is_in_recovery() then pg_last_xlog_replay_location() else 
pg_current_xlog_insert_location() end as lsn;');
+       my ($self, $mode) = @_;
+       my %modes = ('insert' => 'pg_current_xlog_insert_location()',
+                                'flush' => 'pg_current_xlog_flush_location()',
+                                'write' => 'pg_current_xlog_location()',
+                                'receive' => 'pg_last_xlog_receive_location()',
+                                'replay' => 'pg_last_xlog_replay_location()');
+
+       $mode = '<undef>' if !defined($mode);
+       die "unknown mode for 'lsn': '$mode', valid modes are " . join(', ', 
keys %modes)
+               if !defined($modes{$mode});
+
+       my $result = $self->safe_psql('postgres', "SELECT $modes{$mode}");
+       chomp($result);
+       if ($result eq '')
+       {
+               return undef;
+       }
+       else
+       {
+               return $result;
+       }
 }
 
 =pod $node->wait_for_catchup(standby_name, mode, target_lsn)
 
 Wait for the node with application_name standby_name (usually from node->name)
-until its replication equals or passes the upstream's xlog insert point at the
-time this function is called. By default the replay_location is waited for,
-but 'mode' may be specified to wait for any of sent|write|flush|replay.
+until its replication position in pg_stat_replication equals or passes the
+upstream's xlog insert point at the time this function is called. By default
+the replay_location is waited for, but 'mode' may be specified to wait for any
+of sent|write|flush|replay.
 
 If there is no active replication connection from this peer, waits until
 poll_query_until timeout.
 
 Requires that the 'postgres' db exists and is accessible.
 
-If pos is passed, use that xlog position instead of the server's current
-xlog insert position.
+target_lsn may be any arbitrary lsn, but is typically 
$master_node->lsn('insert').
 
 This is not a test. It die()s on failure.
 
-Returns the LSN caught up to.
-
 =cut
 
 sub wait_for_catchup
@@ -1364,24 +1387,25 @@ sub wait_for_catchup
        my ($self, $standby_name, $mode, $target_lsn) = @_;
        $mode = defined($mode) ? $mode : 'replay';
        my %valid_modes = ( 'sent' => 1, 'write' => 1, 'flush' => 1, 'replay' 
=> 1 );
-       die "valid modes are " . join(', ', keys(%valid_modes)) unless 
exists($valid_modes{$mode});
-       if ( blessed( $standby_name ) && $standby_name->isa("PostgresNode") ) {
+       die "unknown mode $mode for 'wait_for_catchup', valid modes are " . 
join(', ', keys(%valid_modes)) unless exists($valid_modes{$mode});
+       # Allow passing of a PostgresNode instance as shorthand
+       if ( blessed( $standby_name ) && $standby_name->isa("PostgresNode") )
+       {
                $standby_name = $standby_name->name;
        }
-       if (!defined($target_lsn)) {
-               $target_lsn = $self->lsn;
-       }
-       $self->poll_query_until('postgres', qq[SELECT '$target_lsn' <= 
${mode}_location FROM pg_catalog.pg_stat_replication WHERE application_name = 
'$standby_name';])
-               or die "timed out waiting for catchup";
-       return $target_lsn;
+       die 'target_lsn must be specified' unless defined($target_lsn);
+       print "Waiting for replication conn " . $standby_name . "'s " . $mode . 
"_location to pass " . $target_lsn . " on " . $self->name . "\n";
+       my $query = qq[SELECT '$target_lsn' <= ${mode}_location FROM 
pg_catalog.pg_stat_replication WHERE application_name = '$standby_name';];
+       $self->poll_query_until('postgres', $query)
+               or die "timed out waiting for catchup, current position is " . 
($self->safe_psql('postgres', $query) || '(unknown)');
+       print "done";
 }
 
 =pod $node->wait_for_slot_catchup(slot_name, mode, target_lsn)
 
-Wait for the named replication slot to equal or pass the xlog position of the
-server, or the supplied target_lsn if given. The position used is the
-restart_lsn unless mode is given, in which case it may be 'restart' or
-'confirmed_flush'.
+Wait for the named replication slot to equal or pass the supplied target_lsn.
+The position used is the restart_lsn unless mode is given, in which case it may
+be 'restart' or 'confirmed_flush'.
 
 Requires that the 'postgres' db exists and is accessible.
 
@@ -1389,9 +1413,9 @@ This is not a test. It die()s on failure.
 
 If the slot is not active, will time out after poll_query_until's timeout.
 
-Note that for logical slots, restart_lsn is held down by the oldest in 
progress tx.
+target_lsn may be any arbitrary lsn, but is typically 
$master_node->lsn('insert').
 
-Returns the LSN caught up to.
+Note that for logical slots, restart_lsn is held down by the oldest 
in-progress tx.
 
 =cut
 
@@ -1399,15 +1423,55 @@ sub wait_for_slot_catchup
 {
        my ($self, $slot_name, $mode, $target_lsn) = @_;
        $mode = defined($mode) ? $mode : 'restart';
-       if (!($mode eq 'restart' || $mode eq 'confirmed_flush')) {
+       if (!($mode eq 'restart' || $mode eq 'confirmed_flush'))
+       {
                die "valid modes are restart, confirmed_flush";
        }
-       if (!defined($target_lsn)) {
-               $target_lsn = $self->lsn;
-       }
-       $self->poll_query_until('postgres', qq[SELECT '$target_lsn' <= 
${mode}_lsn FROM pg_catalog.pg_replication_slots WHERE slot_name = 
'$slot_name';])
-               or die "timed out waiting for catchup";
-       return $target_lsn;
+       die 'target lsn must be specified' unless defined($target_lsn);
+       print "Waiting for replication slot " . $slot_name . "'s " . $mode . 
"_lsn to pass " . $target_lsn . " on " . $self->name . "\n";
+       my $query = qq[SELECT '$target_lsn' <= ${mode}_lsn FROM 
pg_catalog.pg_replication_slots WHERE slot_name = '$slot_name';];
+       $self->poll_query_until('postgres', $query)
+               or die "timed out waiting for catchup, current position is " . 
($self->safe_psql('postgres', $query) || '(unknown)');
+       print "done\n";
+}
+
+=pod $node->query_hash($dbname, $query, @columns)
+
+Execute $query on $dbname, replacing any appearance of the string __COLUMNS__
+within the query with a comma-separated list of @columns.
+
+If __COLUMNS__ does not appear in the query, its result columns must EXACTLY
+match the order and number (but not necessarily alias) of supplied @columns.
+
+The query must return zero or one rows.
+
+Return a hash-ref representation of the results of the query, with any empty
+or null results as defined keys with an empty-string value. There is no way
+to differentiate between null and empty-string result fields.
+
+If the query returns zero rows, return a hash with all columns empty. There
+is no way to differentiate between zero rows returned and a row with only
+null columns.
+
+=cut
+
+sub query_hash
+{
+       my ($self, $dbname, $query, @columns) = @_;
+       die 'calls in array context for multi-row results not supported yet' if 
(wantarray);
+       # Replace __COLUMNS__ if found
+       substr($query, index($query, '__COLUMNS__'), length('__COLUMNS__')) = 
join(', ', @columns)
+               if index($query, '__COLUMNS__') >= 0;
+       my $result = $self->safe_psql($dbname, $query);
+       $result = undef if $result eq '';
+       # hash slice, see http://stackoverflow.com/a/16755894/398670 .
+       #
+       # Fills the hash with empty strings produced by x-operator element
+       # duplication if result is an empty row
+       #
+       my %val;
+       @val{@columns} = $result ne '' ? split(qr/\|/, $result) : ('',) x 
scalar(@columns);
+       return \%val;
 }
 
 =pod $node->slot(slot_name)
@@ -1426,19 +1490,8 @@ either.
 sub slot
 {
        my ($self, $slot_name) = @_;
-       my @fields = ('plugin', 'slot_type', 'datoid', 'database', 'active', 
'active_pid', 'xmin', 'catalog_xmin', 'restart_lsn');
-       my $result = $self->safe_psql('postgres', 'SELECT ' . join(', ', 
@fields) . " FROM pg_catalog.pg_replication_slots WHERE slot_name = 
'$slot_name'");
-       $result = undef if $result eq '';
-       # hash slice, see http://stackoverflow.com/a/16755894/398670 .
-       #
-       # Fills the hash with empty strings produced by x-operator element
-       # duplication if result is an empty row
-       #
-       my %val;
-       @val{@fields} = $result ne '' ? split(qr/\|/, $result) : ('',) x 
scalar(@fields);
-       $val{'restart_lsn_arr'} = parse_lsn($val{'restart_lsn'});
-       $val{'confirmed_flush_lsn_arr'} = 
parse_lsn($val{'confirmed_flush_lsn'});
-       return \%val;
+       my @columns = ('plugin', 'slot_type', 'datoid', 'database', 'active', 
'active_pid', 'xmin', 'catalog_xmin', 'restart_lsn');
+       return $self->query_hash('postgres', "SELECT __COLUMNS__ FROM 
pg_catalog.pg_replication_slots WHERE slot_name = '$slot_name'", @columns);
 }
 
 =pod
diff --git a/src/test/recovery/t/001_stream_rep.pl 
b/src/test/recovery/t/001_stream_rep.pl
index 5ce69bb..ba1da8c 100644
--- a/src/test/recovery/t/001_stream_rep.pl
+++ b/src/test/recovery/t/001_stream_rep.pl
@@ -40,8 +40,8 @@ $node_master->safe_psql('postgres',
        "CREATE TABLE tab_int AS SELECT generate_series(1,1002) AS a");
 
 # Wait for standbys to catch up
-$node_master->wait_for_catchup($node_standby_1);
-$node_standby_1->wait_for_catchup($node_standby_2);
+$node_master->wait_for_catchup($node_standby_1, 'replay', 
$node_master->lsn('insert'));
+$node_standby_1->wait_for_catchup($node_standby_2, 'replay', 
$node_standby_1->lsn('replay'));
 
 my $result =
   $node_standby_1->safe_psql('postgres', "SELECT count(*) FROM tab_int");
diff --git a/src/test/recovery/t/004_timeline_switch.pl 
b/src/test/recovery/t/004_timeline_switch.pl
index 5f3b2fe..7c6587a 100644
--- a/src/test/recovery/t/004_timeline_switch.pl
+++ b/src/test/recovery/t/004_timeline_switch.pl
@@ -32,14 +32,9 @@ $node_standby_2->start;
 # Create some content on master
 $node_master->safe_psql('postgres',
        "CREATE TABLE tab_int AS SELECT generate_series(1,1000) AS a");
-my $until_lsn =
-  $node_master->safe_psql('postgres', "SELECT pg_current_xlog_location();");
 
 # Wait until standby has replayed enough data on standby 1
-my $caughtup_query =
-  "SELECT '$until_lsn'::pg_lsn <= pg_last_xlog_replay_location()";
-$node_standby_1->poll_query_until('postgres', $caughtup_query)
-  or die "Timed out while waiting for standby to catch up";
+$node_master->wait_for_catchup($node_standby_1, 'replay', 
$node_master->lsn('write'));
 
 # Stop and remove master, and promote standby 1, switching it to a new timeline
 $node_master->teardown_node;
@@ -50,7 +45,7 @@ rmtree($node_standby_2->data_dir . '/recovery.conf');
 my $connstr_1 = $node_standby_1->connstr;
 $node_standby_2->append_conf(
        'recovery.conf', qq(
-primary_conninfo='$connstr_1'
+primary_conninfo='$connstr_1 application_name=@{[$node_standby_2->name]}'
 standby_mode=on
 recovery_target_timeline='latest'
 ));
@@ -60,12 +55,7 @@ $node_standby_2->restart;
 # to ensure that the timeline switch has been done.
 $node_standby_1->safe_psql('postgres',
        "INSERT INTO tab_int VALUES (generate_series(1001,2000))");
-$until_lsn = $node_standby_1->safe_psql('postgres',
-       "SELECT pg_current_xlog_location();");
-$caughtup_query =
-  "SELECT '$until_lsn'::pg_lsn <= pg_last_xlog_replay_location()";
-$node_standby_2->poll_query_until('postgres', $caughtup_query)
-  or die "Timed out while waiting for standby to catch up";
+$node_standby_1->wait_for_catchup($node_standby_2, 'replay', 
$node_standby_1->lsn('write'));
 
 my $result =
   $node_standby_2->safe_psql('postgres', "SELECT count(*) FROM tab_int");
From 9dce1252641cde15a33198e7c117bc6138a94103 Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Wed, 21 Dec 2016 11:21:46 +0800
Subject: [PATCH] Make snapshot export on logical slot creation optional

Allow logical decoding slot creation via the walsender protocol's
CREATE_REPLICATION_SLOT command to optionally suppress exporting of
a snapshot when the WITHOUT_SNAPSHOT option is passed.

This means that when we allow creation of replication slots on standbys, which
cannot export snapshots, we don't have to silently omit the snapshot creation.
It also allows clients like pg_recvlogical, which neither need nor can use the
exported snapshot, to suppress its creation. Since snapshot exporting can fail
this improves reliability.
---
 doc/src/sgml/logicaldecoding.sgml      | 13 ++++++++++---
 doc/src/sgml/protocol.sgml             | 17 +++++++++++++++--
 src/backend/replication/repl_gram.y    | 10 +++++++++-
 src/backend/replication/repl_scanner.l |  1 +
 src/backend/replication/walsender.c    |  9 ++++++++-
 src/bin/pg_basebackup/streamutil.c     |  5 +++++
 src/include/nodes/replnodes.h          |  1 +
 7 files changed, 49 insertions(+), 7 deletions(-)

diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 484915d..c0b6987 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -268,11 +268,12 @@ $ pg_recvlogical -d postgres --slot test --drop-slot
     </para>
    </sect2>
 
-   <sect2>
+   <sect2 id="logicaldecoding-snapshot-export" xreflabel="Exported Snapshots (Logical Decoding)">
     <title>Exported Snapshots</title>
     <para>
-     When a new replication slot is created using the streaming replication interface,
-     a snapshot is exported
+     When <link linkend="protocol-replication-create-slot">a new replication
+     slot is created using the streaming replication interface</>, a snapshot
+     is exported
      (see <xref linkend="functions-snapshot-synchronization">), which will show
      exactly the state of the database after which all changes will be
      included in the change stream. This can be used to create a new replica by
@@ -282,6 +283,12 @@ $ pg_recvlogical -d postgres --slot test --drop-slot
      database's state at that point in time, which afterwards can be updated
      using the slot's contents without losing any changes.
     </para>
+    <para>
+     Creation of a snapshot is not always possible - in particular, it will
+     fail when connected to a hot standby. Applications that do not require
+     snapshot export may suppress it with the <literal>WITHOUT_SNAPSHOT</>
+     option.
+    </para>
    </sect2>
   </sect1>
 
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 50cf527..c3e5c58 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -1433,8 +1433,8 @@ The commands accepted in walsender mode are:
     </listitem>
   </varlistentry>
 
-  <varlistentry>
-   <term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</> { <literal>PHYSICAL</> [ <literal>RESERVE_WAL</> ] | <literal>LOGICAL</> <replaceable class="parameter">output_plugin</> }
+  <varlistentry id="protocol-replication-create-slot" xreflabel="CREATE_REPLICATION_SLOT">
+   <term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</> { <literal>PHYSICAL</> [ <literal>RESERVE_WAL</> ] | <literal>LOGICAL</> <replaceable class="parameter">output_plugin</> [<literal>WITHOUT_SNAPSHOT</>] }
      <indexterm><primary>CREATE_REPLICATION_SLOT</primary></indexterm>
     </term>
     <listitem>
@@ -1474,6 +1474,19 @@ The commands accepted in walsender mode are:
         </para>
        </listitem>
       </varlistentry>
+
+      <varlistentry>
+       <term><literal>WITHOUT_SNAPSHOT</></term>
+       <listitem>
+        <para>
+         By default, logical replication slot creation exports a snapshot for
+         use in initialization; see <xref linkend="logicaldecoding-snapshot-export">.
+         Because not all clients need an exported snapshot its creation can
+         be suppressed with <literal>WITHOUT_SNAPSHOT</>.
+        </para>
+       </listitem>
+      </varlistentry>
+     </variablelist>
      </variablelist>
     </listitem>
   </varlistentry>
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index fd0fa6d..85091bd 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -77,6 +77,7 @@ Node *replication_parse_result;
 %token K_LOGICAL
 %token K_SLOT
 %token K_RESERVE_WAL
+%token K_WITHOUT_SNAPSHOT
 
 %type <node>	command
 %type <node>	base_backup start_replication start_logical_replication
@@ -90,6 +91,7 @@ Node *replication_parse_result;
 %type <node>	plugin_opt_arg
 %type <str>		opt_slot
 %type <boolval>	opt_reserve_wal
+%type <boolval> opt_without_snapshot
 
 %%
 
@@ -194,13 +196,14 @@ create_replication_slot:
 					$$ = (Node *) cmd;
 				}
 			/* CREATE_REPLICATION_SLOT slot LOGICAL plugin */
-			| K_CREATE_REPLICATION_SLOT IDENT K_LOGICAL IDENT
+			| K_CREATE_REPLICATION_SLOT IDENT K_LOGICAL IDENT opt_without_snapshot
 				{
 					CreateReplicationSlotCmd *cmd;
 					cmd = makeNode(CreateReplicationSlotCmd);
 					cmd->kind = REPLICATION_KIND_LOGICAL;
 					cmd->slotname = $2;
 					cmd->plugin = $4;
+					cmd->without_snapshot = $5;
 					$$ = (Node *) cmd;
 				}
 			;
@@ -276,6 +279,11 @@ opt_reserve_wal:
 			| /* EMPTY */					{ $$ = false; }
 			;
 
+opt_without_snapshot:
+			K_WITHOUT_SNAPSHOT				{ $$ = true; }
+			| /* EMPTY */					{ $$ = false; }
+			;
+
 opt_slot:
 			K_SLOT IDENT
 				{ $$ = $2; }
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index f83ec53..ae2784f 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -96,6 +96,7 @@ DROP_REPLICATION_SLOT		{ return K_DROP_REPLICATION_SLOT; }
 TIMELINE_HISTORY	{ return K_TIMELINE_HISTORY; }
 PHYSICAL			{ return K_PHYSICAL; }
 RESERVE_WAL			{ return K_RESERVE_WAL; }
+WITHOUT_SNAPSHOT	{ return K_WITHOUT_SNAPSHOT; }
 LOGICAL				{ return K_LOGICAL; }
 SLOT				{ return K_SLOT; }
 
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 8b145e0..f7448a6 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -843,7 +843,14 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 		 * Export a plain (not of the snapbuild.c type) snapshot to the user
 		 * that can be imported into another session.
 		 */
-		snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
+		if (cmd->without_snapshot)
+			snapshot_name = "";
+		else if (RecoveryInProgress())
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("cannot export a snapshot from a standby")));
+		else
+			snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
 
 		/* don't need the decoding context anymore */
 		FreeDecodingContext(ctx);
diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c
index 595eaff..7b1b2ee 100644
--- a/src/bin/pg_basebackup/streamutil.c
+++ b/src/bin/pg_basebackup/streamutil.c
@@ -346,8 +346,13 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
 		appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" PHYSICAL",
 						  slot_name);
 	else
+	{
 		appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",
 						  slot_name, plugin);
+		if (PQserverVersion(conn) >= 100000)
+			/* pg_recvlogical doesn't use an exported snapshot, so suppress */
+			appendPQExpBuffer(query, " WITHOUT_SNAPSHOT");
+	}
 
 	res = PQexec(conn, query->data);
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index d2f1edb..9864594 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -56,6 +56,7 @@ typedef struct CreateReplicationSlotCmd
 	ReplicationKind kind;
 	char	   *plugin;
 	bool		reserve_wal;
+	bool		without_snapshot;
 } CreateReplicationSlotCmd;
 
 
-- 
2.5.5

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to