On 20/01/17 22:30, Petr Jelinek wrote:
> Since it's not exactly straight forward to find when these need to be
> initialized based on commands, I decided to move the initialization code
> to exec_replication_command() since that's always called before anything
> so that makes it much less error prone (patch 0003).
> 
> The 0003 should be backpatched all the way to 9.4 where multiple
> commands started using those buffers.
> 

Actually there is better place, the WalSndInit().

Just to make it easier for PeterE (or whichever committer picks this up)
I attached all the logical replication followup fix/polish patches:

0001 - Changes the libpqrcv_connect to use async libpq api so that it
won't get stuck forever in case of connect is stuck. This is preexisting
bug that also affects walreceiver but it's less visible there as there
is no SQL interface to initiate connection there.

0002 - Close replication connection when CREATE SUBSCRIPTION gets
canceled (otherwise walsender on the other side may stay in idle in
transaction state).

0003 - Fixes buffer initialization in walsender that I found when
testing the above two. This one should be back-patched to 9.4 since it's
broken since then.

0004 - Fixes the foreign key issue reported by Thom Brown and also adds
tests for FK and trigger handling.

0005 - Adds support for renaming publications and subscriptions.

All rebased on top of current master (90992e0).

-- 
  Petr Jelinek                  http://www.2ndQuadrant.com/
  PostgreSQL Development, 24x7 Support, Training & Services
From e2c30b258e97fbba51c8d0f6e12ac557cafb129a Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Fri, 20 Jan 2017 21:20:56 +0100
Subject: [PATCH 1/5] Use asynchronous connect API in libpqwalreceiver

---
 src/backend/postmaster/pgstat.c                    |  4 +-
 .../libpqwalreceiver/libpqwalreceiver.c            | 58 +++++++++++++++++++++-
 src/include/pgstat.h                               |  2 +-
 3 files changed, 59 insertions(+), 5 deletions(-)

diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 7176cf1..19ad6b5 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3340,8 +3340,8 @@ pgstat_get_wait_client(WaitEventClient w)
 		case WAIT_EVENT_WAL_RECEIVER_WAIT_START:
 			event_name = "WalReceiverWaitStart";
 			break;
-		case WAIT_EVENT_LIBPQWALRECEIVER_READ:
-			event_name = "LibPQWalReceiverRead";
+		case WAIT_EVENT_LIBPQWALRECEIVER:
+			event_name = "LibPQWalReceiver";
 			break;
 		case WAIT_EVENT_WAL_SENDER_WAIT_WAL:
 			event_name = "WalSenderWaitForWAL";
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 7df3698..8dc51d4 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -112,6 +112,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
 				 char **err)
 {
 	WalReceiverConn *conn;
+	PostgresPollingStatusType status;
 	const char *keys[5];
 	const char *vals[5];
 	int			i = 0;
@@ -138,7 +139,60 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
 	vals[i] = NULL;
 
 	conn = palloc0(sizeof(WalReceiverConn));
-	conn->streamConn = PQconnectdbParams(keys, vals, /* expand_dbname = */ true);
+	conn->streamConn = PQconnectStartParams(keys, vals,
+											/* expand_dbname = */ true);
+	/* Check for conn status. */
+	if (PQstatus(conn->streamConn) == CONNECTION_BAD)
+	{
+		*err = pstrdup(PQerrorMessage(conn->streamConn));
+		return NULL;
+	}
+
+	/* Poll connection. */
+	do
+	{
+		int			rc;
+		int			extra_flag;
+
+		/* Determine which function we should use for Polling. */
+		status = PQconnectPoll(conn->streamConn);
+
+		/* Next action based upon status value. */
+		switch (status)
+		{
+			case PGRES_POLLING_READING:
+				extra_flag = WL_SOCKET_READABLE;
+				/* pass through */
+			case PGRES_POLLING_WRITING:
+				extra_flag = WL_SOCKET_WRITEABLE;
+
+				ResetLatch(&MyProc->procLatch);
+				rc = WaitLatchOrSocket(&MyProc->procLatch,
+									   WL_POSTMASTER_DEATH |
+									   WL_LATCH_SET | extra_flag,
+									   PQsocket(conn->streamConn),
+									   0,
+									   WAIT_EVENT_LIBPQWALRECEIVER);
+				if (rc & WL_POSTMASTER_DEATH)
+					exit(1);
+
+				/* Interrupted. */
+				if (rc & WL_LATCH_SET)
+				{
+					CHECK_FOR_INTERRUPTS();
+					break;
+				}
+				break;
+
+			/* Either can continue polling or finished one way or another. */
+			default:
+				break;
+		}
+
+		/* Loop until we have OK or FAILED status. */
+	} while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
+
+	/* Check the status. */
 	if (PQstatus(conn->streamConn) != CONNECTION_OK)
 	{
 		*err = pstrdup(PQerrorMessage(conn->streamConn));
@@ -508,7 +562,7 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query)
 								   WL_LATCH_SET,
 								   PQsocket(streamConn),
 								   0,
-								   WAIT_EVENT_LIBPQWALRECEIVER_READ);
+								   WAIT_EVENT_LIBPQWALRECEIVER);
 			if (rc & WL_POSTMASTER_DEATH)
 				exit(1);
 
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index de8225b..e088474 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -764,7 +764,7 @@ typedef enum
 	WAIT_EVENT_CLIENT_WRITE,
 	WAIT_EVENT_SSL_OPEN_SERVER,
 	WAIT_EVENT_WAL_RECEIVER_WAIT_START,
-	WAIT_EVENT_LIBPQWALRECEIVER_READ,
+	WAIT_EVENT_LIBPQWALRECEIVER,
 	WAIT_EVENT_WAL_SENDER_WAIT_WAL,
 	WAIT_EVENT_WAL_SENDER_WRITE_DATA
 } WaitEventClient;
-- 
2.7.4

From 0c843de52ceeb04bab5da774dc0ea8a56cfe08ba Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Fri, 20 Jan 2017 22:22:33 +0100
Subject: [PATCH 2/5] Close replication connection when slot creation gets
 canceled

---
 src/backend/commands/subscriptioncmds.c | 18 ++++++++++++++----
 1 file changed, 14 insertions(+), 4 deletions(-)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 2b6d322..e0add94 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -301,10 +301,20 @@ CreateSubscription(CreateSubscriptionStmt *stmt)
 			ereport(ERROR,
 					(errmsg("could not connect to the publisher: %s", err)));
 
-		walrcv_create_slot(wrconn, slotname, false, &lsn);
-		ereport(NOTICE,
-				(errmsg("created replication slot \"%s\" on publisher",
-						slotname)));
+		PG_TRY();
+		{
+			walrcv_create_slot(wrconn, slotname, false, &lsn);
+			ereport(NOTICE,
+					(errmsg("created replication slot \"%s\" on publisher",
+							slotname)));
+		}
+		PG_CATCH();
+		{
+			/* Close the connection in case of failure. */
+			walrcv_disconnect(wrconn);
+			PG_RE_THROW();
+		}
+		PG_END_TRY();
 
 		/* And we are done with the remote side. */
 		walrcv_disconnect(wrconn);
-- 
2.7.4

From c7a2748c05dea330b166743680f7f668a904e181 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Fri, 20 Jan 2017 22:22:53 +0100
Subject: [PATCH 3/5] Always initialize stringinfo buffers in walsender

---
 src/backend/replication/walsender.c | 18 ++++++++----------
 1 file changed, 8 insertions(+), 10 deletions(-)

diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index f3082c3..e03642a 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -241,6 +241,14 @@ InitWalSender(void)
 	 */
 	MarkPostmasterChildWalSender();
 	SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
+
+	/*
+	 * Allocate buffers that will be used for each outgoing and incoming
+	 * message.
+	 */
+	initStringInfo(&output_message);
+	initStringInfo(&reply_message);
+	initStringInfo(&tmpbuf);
 }
 
 /*
@@ -816,8 +824,6 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 							  cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL);
 	}
 
-	initStringInfo(&output_message);
-
 	if (cmd->kind == REPLICATION_KIND_LOGICAL)
 	{
 		LogicalDecodingContext *ctx;
@@ -1802,14 +1808,6 @@ static void
 WalSndLoop(WalSndSendDataCallback send_data)
 {
 	/*
-	 * Allocate buffers that will be used for each outgoing and incoming
-	 * message.  We do this just once to reduce palloc overhead.
-	 */
-	initStringInfo(&output_message);
-	initStringInfo(&reply_message);
-	initStringInfo(&tmpbuf);
-
-	/*
 	 * Initialize the last reply timestamp. That enables timeout processing
 	 * from hereon.
 	 */
-- 
2.7.4

From d90701b04887c4bcb21e11d7ec44af65bb180f67 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Sun, 22 Jan 2017 23:16:57 +0100
Subject: [PATCH 4/5] Fix after trigger execution in logical replication

---
 src/backend/replication/logical/worker.c   |  15 ++++
 src/test/subscription/t/003_constraints.pl | 113 +++++++++++++++++++++++++++++
 2 files changed, 128 insertions(+)
 create mode 100644 src/test/subscription/t/003_constraints.pl

diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 7d86736..6229bef 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -176,6 +176,9 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel)
 	if (resultRelInfo->ri_TrigDesc)
 		estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
 
+	/* Prepare to catch AFTER triggers. */
+	AfterTriggerBeginQuery();
+
 	return estate;
 }
 
@@ -536,6 +539,10 @@ apply_handle_insert(StringInfo s)
 	/* Cleanup. */
 	ExecCloseIndices(estate->es_result_relation_info);
 	PopActiveSnapshot();
+
+	/* Handle queued AFTER triggers. */
+	AfterTriggerEndQuery(estate);
+
 	ExecResetTupleTable(estate->es_tupleTable, false);
 	FreeExecutorState(estate);
 
@@ -676,6 +683,10 @@ apply_handle_update(StringInfo s)
 	/* Cleanup. */
 	ExecCloseIndices(estate->es_result_relation_info);
 	PopActiveSnapshot();
+
+	/* Handle queued AFTER triggers. */
+	AfterTriggerEndQuery(estate);
+
 	EvalPlanQualEnd(&epqstate);
 	ExecResetTupleTable(estate->es_tupleTable, false);
 	FreeExecutorState(estate);
@@ -763,6 +774,10 @@ apply_handle_delete(StringInfo s)
 	/* Cleanup. */
 	ExecCloseIndices(estate->es_result_relation_info);
 	PopActiveSnapshot();
+
+	/* Handle queued AFTER triggers. */
+	AfterTriggerEndQuery(estate);
+
 	EvalPlanQualEnd(&epqstate);
 	ExecResetTupleTable(estate->es_tupleTable, false);
 	FreeExecutorState(estate);
diff --git a/src/test/subscription/t/003_constraints.pl b/src/test/subscription/t/003_constraints.pl
new file mode 100644
index 0000000..203322f
--- /dev/null
+++ b/src/test/subscription/t/003_constraints.pl
@@ -0,0 +1,113 @@
+# Basic logical replication test
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 4;
+
+# Initialize publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Setup structure on publisher
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_fk (bid int PRIMARY KEY);");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_fk_ref (id int PRIMARY KEY, bid int REFERENCES tab_fk (bid));");
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_fk (bid int PRIMARY KEY);");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_fk_ref (id int PRIMARY KEY, bid int REFERENCES tab_fk (bid));");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub FOR ALL TABLES;");
+
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub");
+
+# Wait for subscriber to finish initialization
+my $caughtup_query =
+"SELECT pg_current_xlog_location() <= replay_location FROM pg_stat_replication WHERE application_name = '$appname';";
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+  or die "Timed out while waiting for subscriber to catch up";
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_fk (bid) VALUES (1);");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_fk_ref (id, bid) VALUES (1,1);");
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+  or die "Timed out while waiting for subscriber to catch up";
+
+# Check data on subscriber
+my $result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*), min(bid), max(bid) FROM tab_fk");
+is($result, qq(1|1|1), 'check replicated tab_fk inserts on subscriber');
+
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref");
+is($result, qq(1|1|1), 'check replicated tab_fk_ref inserts on subscriber');
+
+# Drop the fk on provider
+$node_publisher->safe_psql('postgres',
+	"DROP TABLE tab_fk CASCADE;");
+
+# Insert data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_fk_ref (id, bid) VALUES (2,2);");
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+  or die "Timed out while waiting for subscriber to catch up";
+
+# FK is not enforced on subscriber
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref");
+is($result, qq(2|1|2), 'check FK ignored on subscriber');
+
+# Add replica trigger
+$node_subscriber->safe_psql('postgres', qq{
+CREATE FUNCTION filter_basic_dml_fn() RETURNS TRIGGER AS \$\$
+BEGIN
+    IF (TG_OP = 'INSERT') THEN
+        IF (NEW.id < 10) THEN
+            RETURN NEW;
+        ELSE
+            RETURN NULL;
+        END IF;
+    ELSE
+        RAISE WARNING 'Unknown action';
+        RETURN NULL;
+    END IF;
+END;
+\$\$ LANGUAGE plpgsql;
+CREATE TRIGGER filter_basic_dml_trg
+BEFORE INSERT ON tab_fk_ref
+FOR EACH ROW EXECUTE PROCEDURE filter_basic_dml_fn();
+ALTER TABLE tab_fk_ref ENABLE REPLICA TRIGGER filter_basic_dml_trg;
+});
+
+# Insert data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_fk_ref (id, bid) VALUES (10,10);");
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+  or die "Timed out while waiting for subscriber to catch up";
+
+# The row should be skipped on subscriber
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref");
+is($result, qq(2|1|2), 'check skipped insert on subscriber');
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
-- 
2.7.4

From faf97e6f698de1f2d6fd8f286033a6bf723d0ec7 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Thu, 19 Jan 2017 00:59:01 +0100
Subject: [PATCH 5/5] Add RENAME support for PUBLICATIONs and SUBSCRIPTIONs

---
 src/backend/commands/alter.c               |  6 ++++
 src/backend/commands/publicationcmds.c     | 50 ++++++++++++++++++++++++++++++
 src/backend/commands/subscriptioncmds.c    | 50 ++++++++++++++++++++++++++++++
 src/backend/parser/gram.y                  | 18 +++++++++++
 src/backend/replication/logical/worker.c   | 16 +++++++++-
 src/bin/psql/tab-complete.c                |  6 ++--
 src/include/commands/publicationcmds.h     |  2 ++
 src/include/commands/subscriptioncmds.h    |  2 ++
 src/test/regress/expected/publication.out  | 10 +++++-
 src/test/regress/expected/subscription.out | 10 +++++-
 src/test/regress/sql/publication.sql       |  6 +++-
 src/test/regress/sql/subscription.sql      |  6 +++-
 src/test/subscription/t/001_rep_changes.pl | 11 ++++++-
 13 files changed, 185 insertions(+), 8 deletions(-)

diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index 768fcc8..1a4154c 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -351,6 +351,12 @@ ExecRenameStmt(RenameStmt *stmt)
 		case OBJECT_TYPE:
 			return RenameType(stmt);
 
+		case OBJECT_PUBLICATION:
+			return RenamePublication(stmt);
+
+		case OBJECT_SUBSCRIPTION:
+			return RenameSubscription(stmt);
+
 		case OBJECT_AGGREGATE:
 		case OBJECT_COLLATION:
 		case OBJECT_CONVERSION:
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 63dcc10..5bad7fc 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -754,3 +754,53 @@ AlterPublicationOwner_oid(Oid subid, Oid newOwnerId)
 
 	heap_close(rel, RowExclusiveLock);
 }
+
+/*
+ * Rename the publication.
+ */
+ObjectAddress
+RenamePublication(RenameStmt *stmt)
+{
+	Oid			pubid;
+	HeapTuple	tup;
+	Relation	rel;
+	ObjectAddress address;
+
+	rel = heap_open(PublicationRelationId, RowExclusiveLock);
+
+	tup = SearchSysCacheCopy1(PUBLICATIONNAME,
+							  CStringGetDatum(stmt->subname));
+
+	if (!HeapTupleIsValid(tup))
+		ereport(ERROR,
+				(errcode(ERRCODE_UNDEFINED_OBJECT),
+				 errmsg("publication \"%s\" does not exist", stmt->subname)));
+
+	/* make sure the new name doesn't exist */
+	if (OidIsValid(get_publication_oid(stmt->newname, true)))
+		ereport(ERROR,
+				(errcode(ERRCODE_DUPLICATE_SCHEMA),
+				 errmsg("publication \"%s\" already exists", stmt->newname)));
+
+	pubid = HeapTupleGetOid(tup);
+
+	/* Must be owner. */
+	if (!pg_publication_ownercheck(pubid, GetUserId()))
+		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_PUBLICATION,
+					   stmt->subname);
+
+	/* rename */
+	namestrcpy(&(((Form_pg_publication) GETSTRUCT(tup))->pubname),
+			   stmt->newname);
+	simple_heap_update(rel, &tup->t_self, tup);
+	CatalogUpdateIndexes(rel, tup);
+
+	InvokeObjectPostAlterHook(PublicationRelationId, pubid, 0);
+
+	ObjectAddressSet(address, PublicationRelationId, pubid);
+
+	heap_close(rel, NoLock);
+	heap_freetuple(tup);
+
+	return address;
+}
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index e0add94..a5a057e 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -657,3 +657,53 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
 
 	heap_close(rel, RowExclusiveLock);
 }
+
+/*
+ * Rename the subscription.
+ */
+ObjectAddress
+RenameSubscription(RenameStmt *stmt)
+{
+	Oid			subid;
+	HeapTuple	tup;
+	Relation	rel;
+	ObjectAddress address;
+
+	rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
+
+	tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId,
+							  CStringGetDatum(stmt->subname));
+
+	if (!HeapTupleIsValid(tup))
+		ereport(ERROR,
+				(errcode(ERRCODE_UNDEFINED_OBJECT),
+				 errmsg("subscription \"%s\" does not exist", stmt->subname)));
+
+	/* make sure the new name doesn't exist */
+	if (OidIsValid(get_subscription_oid(stmt->newname, true)))
+		ereport(ERROR,
+				(errcode(ERRCODE_DUPLICATE_SCHEMA),
+				 errmsg("subscription \"%s\" already exists", stmt->newname)));
+
+	subid = HeapTupleGetOid(tup);
+
+	/* Must be owner. */
+	if (!pg_subscription_ownercheck(subid, GetUserId()))
+		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_SUBSCRIPTION,
+					   stmt->subname);
+
+	/* rename */
+	namestrcpy(&(((Form_pg_subscription) GETSTRUCT(tup))->subname),
+			   stmt->newname);
+	simple_heap_update(rel, &tup->t_self, tup);
+	CatalogUpdateIndexes(rel, tup);
+
+	InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
+
+	ObjectAddressSet(address, SubscriptionRelationId, subid);
+
+	heap_close(rel, NoLock);
+	heap_freetuple(tup);
+
+	return address;
+}
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index a8e35fe..712dfdd 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -8475,6 +8475,24 @@ RenameStmt: ALTER AGGREGATE aggregate_with_argtypes RENAME TO name
 					n->missing_ok = false;
 					$$ = (Node *)n;
 				}
+			| ALTER PUBLICATION name RENAME TO name
+				{
+					RenameStmt *n = makeNode(RenameStmt);
+					n->renameType = OBJECT_PUBLICATION;
+					n->subname = $3;
+					n->newname = $6;
+					n->missing_ok = false;
+					$$ = (Node *)n;
+				}
+			| ALTER SUBSCRIPTION name RENAME TO name
+				{
+					RenameStmt *n = makeNode(RenameStmt);
+					n->renameType = OBJECT_SUBSCRIPTION;
+					n->subname = $3;
+					n->newname = $6;
+					n->missing_ok = false;
+					$$ = (Node *)n;
+				}
 		;
 
 opt_column: COLUMN									{ $$ = COLUMN; }
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 6229bef..2a7e6f5 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1264,6 +1264,21 @@ reread_subscription(void)
 	}
 
 	/*
+	 * Exit if subscription name was changed (it's used for
+	 * fallback_application_name). The launcher will start new worker.
+	 */
+	if (strcmp(newsub->name, MySubscription->name) != 0)
+	{
+		ereport(LOG,
+				(errmsg("logical replication worker for subscription \"%s\" will "
+						"restart because subscription was renamed",
+						MySubscription->name)));
+
+		walrcv_disconnect(wrconn);
+		proc_exit(0);
+	}
+
+	/*
 	 * Exit if the subscription was removed.
 	 * This normally should not happen as the worker gets killed
 	 * during DROP SUBSCRIPTION.
@@ -1297,7 +1312,6 @@ reread_subscription(void)
 
 	/* Check for other changes that should never happen too. */
 	if (newsub->dbid != MySubscription->dbid ||
-		strcmp(newsub->name, MySubscription->name) != 0 ||
 		strcmp(newsub->slotname, MySubscription->slotname) != 0)
 	{
 		elog(ERROR, "subscription %u changed unexpectedly",
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index d6fffcf..a8256e8 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1438,7 +1438,8 @@ psql_completion(const char *text, int start, int end)
 	/* ALTER PUBLICATION <name> ...*/
 	else if (Matches3("ALTER","PUBLICATION",MatchAny))
 	{
-		COMPLETE_WITH_LIST5("WITH", "ADD TABLE", "SET TABLE", "DROP TABLE", "OWNER TO");
+		COMPLETE_WITH_LIST6("WITH", "ADD TABLE", "SET TABLE", "DROP TABLE",
+							"OWNER TO", "RENAME TO");
 	}
 	/* ALTER PUBLICATION <name> .. WITH ( ... */
 	else if (HeadMatches3("ALTER", "PUBLICATION",MatchAny) && TailMatches2("WITH", "("))
@@ -1449,7 +1450,8 @@ psql_completion(const char *text, int start, int end)
 	/* ALTER SUBSCRIPTION <name> ... */
 	else if (Matches3("ALTER","SUBSCRIPTION",MatchAny))
 	{
-		COMPLETE_WITH_LIST6("WITH", "CONNECTION", "SET PUBLICATION", "ENABLE", "DISABLE", "OWNER TO");
+		COMPLETE_WITH_LIST7("WITH", "CONNECTION", "SET PUBLICATION", "ENABLE",
+							"DISABLE", "OWNER TO", "RENAME TO");
 	}
 	else if (HeadMatches3("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches2("WITH", "("))
 	{
diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h
index cdacfa6..7053373 100644
--- a/src/include/commands/publicationcmds.h
+++ b/src/include/commands/publicationcmds.h
@@ -26,4 +26,6 @@ extern void RemovePublicationRelById(Oid proid);
 extern ObjectAddress AlterPublicationOwner(const char *name, Oid newOwnerId);
 extern void AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId);
 
+extern ObjectAddress RenamePublication(RenameStmt *stmt);
+
 #endif   /* PUBLICATIONCMDS_H */
diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h
index 87c1a27..65c8238 100644
--- a/src/include/commands/subscriptioncmds.h
+++ b/src/include/commands/subscriptioncmds.h
@@ -25,4 +25,6 @@ extern void DropSubscription(DropSubscriptionStmt *stmt);
 extern ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId);
 extern void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId);
 
+extern ObjectAddress RenameSubscription(RenameStmt *stmt);
+
 #endif   /* SUBSCRIPTIONCMDS_H */
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index 5784b0f..6416fbb 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -148,7 +148,15 @@ DROP TABLE testpub_tbl1;
  t       | t       | t
 (1 row)
 
-DROP PUBLICATION testpub_default;
+ALTER PUBLICATION testpub_default RENAME TO testpub_foo;
+\dRp testpub_foo
+                         List of publications
+    Name     |          Owner           | Inserts | Updates | Deletes 
+-------------+--------------------------+---------+---------+---------
+ testpub_foo | regress_publication_user | t       | t       | t
+(1 row)
+
+DROP PUBLICATION testpub_foo;
 DROP PUBLICATION testpib_ins_trunct;
 DROP PUBLICATION testpub_fortbl;
 DROP SCHEMA pub_test CASCADE;
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 2ccec98..cb1ab4e 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -61,6 +61,14 @@ ALTER SUBSCRIPTION testsub DISABLE;
 (1 row)
 
 COMMIT;
-DROP SUBSCRIPTION testsub NODROP SLOT;
+ALTER SUBSCRIPTION testsub RENAME TO testsub_foo;
+\dRs
+                         List of subscriptions
+    Name     |           Owner           | Enabled |    Publication     
+-------------+---------------------------+---------+--------------------
+ testsub_foo | regress_subscription_user | f       | {testpub,testpub1}
+(1 row)
+
+DROP SUBSCRIPTION testsub_foo NODROP SLOT;
 RESET SESSION AUTHORIZATION;
 DROP ROLE regress_subscription_user;
diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql
index 8779788..9563ea1 100644
--- a/src/test/regress/sql/publication.sql
+++ b/src/test/regress/sql/publication.sql
@@ -73,7 +73,11 @@ DROP TABLE testpub_tbl1;
 
 \dRp+ testpub_default
 
-DROP PUBLICATION testpub_default;
+ALTER PUBLICATION testpub_default RENAME TO testpub_foo;
+
+\dRp testpub_foo
+
+DROP PUBLICATION testpub_foo;
 DROP PUBLICATION testpib_ins_trunct;
 DROP PUBLICATION testpub_fortbl;
 
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 68c17d5..fce6069 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -38,7 +38,11 @@ ALTER SUBSCRIPTION testsub DISABLE;
 
 COMMIT;
 
-DROP SUBSCRIPTION testsub NODROP SLOT;
+ALTER SUBSCRIPTION testsub RENAME TO testsub_foo;
+
+\dRs
+
+DROP SUBSCRIPTION testsub_foo NODROP SLOT;
 
 RESET SESSION AUTHORIZATION;
 DROP ROLE regress_subscription_user;
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index b51740b..a9c4b01 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -169,8 +169,17 @@ $result =
   $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_full");
 is($result, qq(11|0|100), 'check replicated insert after alter publication');
 
+# check restart on rename
+$oldpid = $node_publisher->safe_psql('postgres',
+	"SELECT pid FROM pg_stat_replication WHERE application_name = '$appname';");
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION tap_sub RENAME TO tap_sub_renamed");
+$node_publisher->poll_query_until('postgres',
+	"SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname';")
+  or die "Timed out while waiting for apply to restart";
+
 # check all the cleanup
-$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_renamed");
 
 $result =
   $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_subscription");
-- 
2.7.4

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to