This patch is looking pretty good to me, modulo the failing pg_dump tests.

Attached is a fixup patch.  I have mainly updated some comments and
variable naming for (my) clarity.  No functional changes.

-- 
Peter Eisentraut              http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
From 04f772f350773cce9890386c4a5924ee251ebbe6 Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <peter_e@gmx.net>
Date: Tue, 21 Mar 2017 15:38:39 -0400
Subject: [PATCH] fixup! Logical replication support for initial data copy

---
 src/backend/catalog/pg_subscription.c              |   6 +-
 src/backend/commands/subscriptioncmds.c            |   2 +-
 .../libpqwalreceiver/libpqwalreceiver.c            |   9 +-
 src/backend/replication/logical/tablesync.c        | 190 +++++++++++----------
 src/backend/replication/logical/worker.c           |  22 +--
 src/include/replication/logical.h                  |   2 -
 src/include/replication/worker_internal.h          |   8 +-
 src/test/regress/expected/subscription.out         |   2 -
 src/test/regress/sql/subscription.sql              |   2 -
 src/test/subscription/t/001_rep_changes.pl         |   6 +-
 src/test/subscription/t/004_sync.pl                |   2 +-
 11 files changed, 127 insertions(+), 124 deletions(-)

diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 9b74892548..e420ec14d2 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -321,10 +321,8 @@ GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn,
 			return SUBREL_STATE_UNKNOWN;
 		}
 
-		ereport(ERROR,
-				(errcode(ERRCODE_UNDEFINED_OBJECT),
-				 errmsg("subscription table %u in subscription %u does not exist",
-						relid, subid)));
+		elog(ERROR, "subscription table %u in subscription %u does not exist",
+			 relid, subid);
 	}
 
 	/* Get the state. */
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index cba2d5c085..0784ca7951 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -992,7 +992,7 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
 				(errmsg("could not receive list of replicated tables from the publisher: %s",
 						res->err)));
 
-	/* Proccess tables. */
+	/* Process tables. */
 	slot = MakeSingleTupleTableSlot(res->tupledesc);
 	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
 	{
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 3176182523..4dd8eef1f9 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -792,13 +792,12 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
  * Convert tuple query result to tuplestore.
  */
 static void
-libpqrcv_proccessTuples(PGresult *pgres, WalRcvExecResult *walres,
+libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
 						const int nRetTypes, const Oid *retTypes)
 {
 	int		tupn;
 	int		coln;
 	int		nfields = PQnfields(pgres);
-	char   *cstrs[MaxTupleAttributeNumber];
 	HeapTuple		tuple;
 	AttInMetadata  *attinmeta;
 	MemoryContext	rowcontext;
@@ -830,9 +829,11 @@ libpqrcv_proccessTuples(PGresult *pgres, WalRcvExecResult *walres,
 									   "libpqrcv query result context",
 									   ALLOCSET_DEFAULT_SIZES);
 
-	/* Proccess returned rows. */
+	/* Process returned rows. */
 	for (tupn = 0; tupn < PQntuples(pgres); tupn++)
 	{
+		char   *cstrs[MaxTupleAttributeNumber];
+
 		CHECK_FOR_INTERRUPTS();
 
 		/* Do the allocations in temporary context. */
@@ -885,7 +886,7 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
 		case PGRES_SINGLE_TUPLE:
 		case PGRES_TUPLES_OK:
 			walres->status = WALRCV_OK_TUPLES;
-			libpqrcv_proccessTuples(pgres, walres, nRetTypes, retTypes);
+			libpqrcv_processTuples(pgres, walres, nRetTypes, retTypes);
 			break;
 
 		case PGRES_COPY_IN:
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 6c67a5ea9f..3e16b0d576 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -36,7 +36,7 @@
  *		  - if the apply is in front of the sync in the wal stream the new
  *			state is set to CATCHUP and apply loops until the sync process
  *			catches up to the same LSN as apply
- *		  - if the sync if in front of the apply in the wal stream the new
+ *		  - if the sync is in front of the apply in the wal stream the new
  *			state is set to SYNCDONE
  *		  - if both apply and sync are at the same position in the wal stream
  *			the state of the table is set to READY
@@ -104,7 +104,6 @@
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
 
-static List *table_states = NIL;
 static bool table_states_valid = false;
 
 StringInfo	copybuf = NULL;
@@ -112,10 +111,10 @@ StringInfo	copybuf = NULL;
 /*
  * Exit routine for synchronization worker.
  */
-static void
-finish_sync_worker(char *slotname)
+static void pg_attribute_noreturn()
+finish_sync_worker(void)
 {
-	/* Commit any outstanding trasnsaction, */
+	/* Commit any outstanding transaction. */
 	if (IsTransactionState())
 		CommitTransactionCommand();
 
@@ -193,41 +192,37 @@ invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
  * worker.
  *
  * If the sync worker is in catch up mode and reached the predetermined
- * synchronization point in wal stream, it will mark the table as ready and
- * finish.
+ * synchronization point in the WAL stream, mark the table as READY and
+ * finish.  If it caught up too far, set to SYNCDONE and finish.  Things will
+ * then proceed in the "sync in front" scenario.
  */
 static void
-process_syncing_tables_for_sync(char *slotname, XLogRecPtr end_lsn)
+process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 {
-	TimeLineID	tli;
-
 	Assert(IsTransactionState());
 
-	/*
-	 * Synchronization worker has catched up with apply. Update the table
-	 * state and finish.
-	 */
 	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
 
 	if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
-		end_lsn >= MyLogicalRepWorker->relstate_lsn)
+		current_lsn >= MyLogicalRepWorker->relstate_lsn)
 	{
+		TimeLineID	tli;
+
 		MyLogicalRepWorker->relstate =
-			(end_lsn == MyLogicalRepWorker->relstate_lsn)
+			(current_lsn == MyLogicalRepWorker->relstate_lsn)
 			? SUBREL_STATE_READY
 			: SUBREL_STATE_SYNCDONE;
-		MyLogicalRepWorker->relstate_lsn = end_lsn;
+		MyLogicalRepWorker->relstate_lsn = current_lsn;
 
 		SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
-		/* The synchronization is done so write it into catalog. */
 		SetSubscriptionRelState(MyLogicalRepWorker->subid,
 								MyLogicalRepWorker->relid,
 								MyLogicalRepWorker->relstate,
 								MyLogicalRepWorker->relstate_lsn);
 
 		walrcv_endstreaming(wrconn, &tli);
-		finish_sync_worker(slotname);
+		finish_sync_worker();
 	}
 	else
 		SpinLockRelease(&MyLogicalRepWorker->relmutex);
@@ -236,28 +231,31 @@ process_syncing_tables_for_sync(char *slotname, XLogRecPtr end_lsn)
 /*
  * Handle table synchronization cooperation from the apply worker.
  *
- * Walk over all subscription tables that are individually tracked by apply
- * process (currently all that have state other than SUBREL_STATE_READY) and
- * manage synchronization for them.
+ * Walk over all subscription tables that are individually tracked by the
+ * apply process (currently, all that have state other than
+ * SUBREL_STATE_READY) and manage synchronization for them.
  *
- * In case there are tables that need synchronized and are not being
- * synchronized yet (and there are free slots for sync workers) it will start
- * sync workers for them.
+ * If there are tables that need synchronizing and are not being synchronized
+ * yet, start sync workers for them (if there are free slots for sync
+ * workers).
  *
- * For tables that are being synchronized already, it will check if sync
- * workers either need action from the apply worker or have finished.
+ * For tables that are being synchronized already, check if sync workers
+ * either need action from the apply worker or have finished.
  *
- * The usual action needed by apply is to mark table for catchup and wait for
- * the catchup to happen. In case that sync worker got in front of apply
- * worker it will mark the table as synced but not ready yet as it needs to be
- * tracked until apply reaches the same position to which it was synced.
+ * The usual scenario is that the apply got ahead of the sync while the sync
+ * ran, and then the action needed by apply is to mark a table for CATCHUP and
+ * wait for the catchup to happen.  In the less common case that sync worker
+ * got in front of the apply worker, the table is marked as SYNCDONE but not
+ * ready yet, as it needs to be tracked until apply reaches the same position
+ * to which it was synced.
  *
- * In case the synchronization position is reached the table can be marked
- * as ready and no longer tracked.
+ * If the synchronization position is reached, then the table can be marked as
+ * READY and is no longer tracked.
  */
 static void
-process_syncing_tables_for_apply(char *slotname, XLogRecPtr end_lsn)
+process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 {
+	static List *table_states = NIL;
 	ListCell   *lc;
 
 	Assert(!IsTransactionState());
@@ -303,82 +301,85 @@ process_syncing_tables_for_apply(char *slotname, XLogRecPtr end_lsn)
 		{
 			/*
 			 * Apply has caught up to the position where the table sync
-			 * has finished, time to mark the table as ready so that
+			 * has finished.  Time to mark the table as ready so that
 			 * apply will just continue to replicate it normally.
 			 */
-			if (end_lsn >= rstate->lsn)
+			if (current_lsn >= rstate->lsn)
 			{
 				rstate->state = SUBREL_STATE_READY;
-				rstate->lsn = end_lsn;
+				rstate->lsn = current_lsn;
 				StartTransactionCommand();
 				SetSubscriptionRelState(MyLogicalRepWorker->subid,
 										rstate->relid, rstate->state,
 										rstate->lsn);
 				CommitTransactionCommand();
 			}
-
-			continue;
 		}
 		else
 		{
-			LogicalRepWorker   *worker;
-			int					nworkers = 0;
+			LogicalRepWorker   *syncworker;
+			int					nsyncworkers = 0;
 
 			LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-			worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
-											rstate->relid, false);
-			if (worker)
+			syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
+												rstate->relid, false);
+			if (syncworker)
 			{
-				SpinLockAcquire(&worker->relmutex);
-				rstate->state = worker->relstate;
-				rstate->lsn = worker->relstate_lsn;
-				SpinLockRelease(&worker->relmutex);
+				SpinLockAcquire(&syncworker->relmutex);
+				rstate->state = syncworker->relstate;
+				rstate->lsn = syncworker->relstate_lsn;
+				SpinLockRelease(&syncworker->relmutex);
 			}
 			else
-				nworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
+				/*
+				 * If no sync worker for this table yet, could running sync
+				 * workers for this subscription, while we have the lock, for
+				 * later.
+				 */
+				nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
 			LWLockRelease(LogicalRepWorkerLock);
 
 			/*
 			 * There is a worker synchronizing the relation and waiting for
 			 * apply to do something.
 			 */
-			if (worker && rstate->state == SUBREL_STATE_SYNCWAIT)
+			if (syncworker && rstate->state == SUBREL_STATE_SYNCWAIT)
 			{
 				/*
-				 * When the synchronization process is at the catchup phase.
-				 *
 				 * There are three possible synchronization situations here.
-				 * a) Apply is infront of the table sync, in this case we
-				 *    tell the table sync to catch up.
-				 * b) Apply is behind the table sync, in that case we tell
-				 *    the table sync to mark the table as syncdone and finish.
-				 * c) Apply and table sync are at the same position, in which
-				 *    case we tell table sync to mark the table as ready and
-				 *    finish.
 				 *
-				 * In any case we'll need to wait for tablesync to change
+				 * a) Apply is in front of the table sync: We tell the table
+				 *    sync to CATCHUP.
+				 *
+				 * b) Apply is behind the table sync: We tell the table sync
+				 *    to mark the table as SYNCDONE and finish.
+
+				 * c) Apply and table sync are at the same position: We tell
+				 *    table sync to mark the table as READY and finish.
+				 *
+				 * In any case we'll need to wait for table sync to change
 				 * the state in catalog and only then continue ourselves.
 				 */
-				if (end_lsn > rstate->lsn)
+				if (current_lsn > rstate->lsn)
 				{
 					rstate->state = SUBREL_STATE_CATCHUP;
-					rstate->lsn = end_lsn;
+					rstate->lsn = current_lsn;
 				}
-				else if (end_lsn == rstate->lsn)
+				else if (current_lsn == rstate->lsn)
 				{
 					rstate->state = SUBREL_STATE_READY;
-					rstate->lsn = end_lsn;
+					rstate->lsn = current_lsn;
 				}
 				else
 					rstate->state = SUBREL_STATE_SYNCDONE;
 
-				SpinLockAcquire(&worker->relmutex);
-				worker->relstate = rstate->state;
-				worker->relstate_lsn = rstate->lsn;
-				SpinLockRelease(&worker->relmutex);
+				SpinLockAcquire(&syncworker->relmutex);
+				syncworker->relstate = rstate->state;
+				syncworker->relstate_lsn = rstate->lsn;
+				SpinLockRelease(&syncworker->relmutex);
 
-				/* Signal the worker as it may be waiting for us. */
-				logicalrep_worker_wakeup_ptr(worker);
+				/* Signal the sync worker, as it may be waiting for us. */
+				logicalrep_worker_wakeup_ptr(syncworker);
 
 				/*
 				 * Enter busy loop and wait for synchronization status
@@ -392,7 +393,7 @@ process_syncing_tables_for_apply(char *slotname, XLogRecPtr end_lsn)
 			 * there is some free sync worker slot, start new sync worker
 			 * for the table.
 			 */
-			else if (!worker && nworkers < max_sync_workers_per_subscription)
+			else if (!syncworker && nsyncworkers < max_sync_workers_per_subscription)
 			{
 				logicalrep_worker_launch(MyLogicalRepWorker->dbid,
 										 MySubscription->oid,
@@ -408,12 +409,12 @@ process_syncing_tables_for_apply(char *slotname, XLogRecPtr end_lsn)
  * Process state possible change(s) of tables that are being synchronized.
  */
 void
-process_syncing_tables(char *slotname, XLogRecPtr end_lsn)
+process_syncing_tables(XLogRecPtr current_lsn)
 {
 	if (am_tablesync_worker())
-		process_syncing_tables_for_sync(slotname, end_lsn);
+		process_syncing_tables_for_sync(current_lsn);
 	else
-		process_syncing_tables_for_apply(slotname, end_lsn);
+		process_syncing_tables_for_apply(current_lsn);
 }
 
 /*
@@ -446,8 +447,8 @@ make_copy_attnamelist(LogicalRepRelMapEntry *rel)
 }
 
 /*
- * Callback for the COPY FROM which reads from the remote connection
- * and passes the data back to our local COPY.
+ * Data source callback for the COPY FROM, which reads from the remote
+ * connection and passes the data back to our local COPY.
  */
 static int
 copy_read_data(void *outbuf, int minread, int maxread)
@@ -482,13 +483,9 @@ copy_read_data(void *outbuf, int minread, int maxread)
 			CHECK_FOR_INTERRUPTS();
 
 			if (len == 0)
-			{
 				break;
-			}
 			else if (len < 0)
-			{
 				return bytesread;
-			}
 			else
 			{
 				/* Process the data */
@@ -552,7 +549,7 @@ fetch_remote_table_info(char *nspname, char *relname,
 	lrel->nspname = nspname;
 	lrel->relname = relname;
 
-	/* First fetch Oid and replication identity. */
+	/* First fetch Oid and replica identity. */
 	initStringInfo(&cmd);
 	appendStringInfo(&cmd, "SELECT c.oid, c.relreplident"
 						   "  FROM pg_catalog.pg_class c,"
@@ -605,7 +602,7 @@ fetch_remote_table_info(char *nspname, char *relname,
 				(errmsg("could not fetch table info for table \"%s.%s\": %s",
 						nspname, relname, res->err)));
 
-	/* We don't know number of rows infront so allocate enough space. */
+	/* We don't know number of rows coming, so allocate enough space. */
 	lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
 	lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
 	lrel->attkeys = NULL;
@@ -695,7 +692,7 @@ copy_table(Relation rel)
 char *
 LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 {
-	char			slotname[NAMEDATALEN];
+	char		   *slotname;
 	char		   *err;
 
 	/* Check the state of the table synchronization. */
@@ -710,14 +707,18 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	CommitTransactionCommand();
 
 	/*
-	 * We are limited to 63 characters of the name length so we cut the
-	 * original slot name to 36 chars because the "_sync_" adds 6, each
-	 * each unsigned integer (oid) has maximum of 10 characters and we have
-	 * one additional "_" separator between slot name and subscription oid.
+	 * To build a slot name for the sync work, we are limited to NAMEDATALEN -
+	 * 1 characters.  We cut the original slot name to NAMEDATALEN - 28 chars
+	 * and append _%u_sync_%u (1 + 10 + 6 + 10 + '\0').  (It's actually the
+	 * NAMEDATALEN on the remote that matters, but this scheme will also work
+	 * reasonably if that is different.)
 	 */
-	snprintf(slotname, NAMEDATALEN, "%.36s_%u_sync_%u",
-			 MySubscription->slotname, MySubscription->oid,
-			 MyLogicalRepWorker->relid);
+	StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small"); /* for sanity */
+	slotname = psprintf("%.*s_%u_sync_%u",
+						NAMEDATALEN - 28,
+						MySubscription->slotname,
+						MySubscription->oid,
+						MyLogicalRepWorker->relid);
 
 	wrconn = walrcv_connect(MySubscription->conninfo, true, slotname, &err);
 	if (wrconn == NULL)
@@ -787,7 +788,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 
 				copy_table(rel);
 
-				res = walrcv_exec(wrconn, "ROLLBACK", 0, NULL);
+				res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
 				if (res->status != WALRCV_OK_COMMAND)
 					ereport(ERROR,
 							(errmsg("table copy could not finish transaction on publisher"),
@@ -821,18 +822,19 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 											MyLogicalRepWorker->relid,
 											MyLogicalRepWorker->relstate,
 											MyLogicalRepWorker->relstate_lsn);
-					finish_sync_worker(slotname);
+					finish_sync_worker();
 				}
 				break;
 			}
 		case SUBREL_STATE_SYNCDONE:
 		case SUBREL_STATE_READY:
 			/* Nothing to do here but finish. */
-			finish_sync_worker(slotname);
+			finish_sync_worker();
+			break;
 		default:
 			elog(ERROR, "unknown relation state \"%c\"",
 				 MyLogicalRepWorker->relstate);
 	}
 
-	return pstrdup(slotname);
+	return slotname;
 }
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 2396d24b43..bbf3506be0 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -109,7 +109,6 @@ WalReceiverConn	   *wrconn = NULL;
 Subscription	   *MySubscription = NULL;
 bool				MySubscriptionValid = false;
 
-static char		   *myslotname = NULL;
 bool				in_remote_transaction = false;
 static XLogRecPtr	remote_final_lsn = InvalidXLogRecPtr;
 
@@ -129,18 +128,18 @@ static void reread_subscription(void);
  * Note we need to do smaller or equals comparison for SYNCDONE state because
  * it might hold position of end of intitial slot consistent point WAL
  * record + 1 (ie start of next record) and next record can be COMMIT of
- * transaction we are now proccessing (which is what we set remote_final_lsn
+ * transaction we are now processing (which is what we set remote_final_lsn
  * to in apply_handle_begin).
  */
 static bool
 should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
 {
-	return (!am_tablesync_worker() &&
-			(rel->state == SUBREL_STATE_READY ||
-			 (rel->state == SUBREL_STATE_SYNCDONE &&
-			  rel->statelsn <= remote_final_lsn))) ||
-		(am_tablesync_worker() &&
-		 MyLogicalRepWorker->relid == rel->localreloid);
+	if (am_tablesync_worker())
+		return MyLogicalRepWorker->relid == rel->localreloid;
+	else
+		return (rel->state == SUBREL_STATE_READY ||
+				(rel->state == SUBREL_STATE_SYNCDONE &&
+				 rel->statelsn <= remote_final_lsn));
 }
 
 /*
@@ -459,8 +458,8 @@ apply_handle_commit(StringInfo s)
 
 	in_remote_transaction = false;
 
-	/* Proccess any tables that are being synchronized in parallel. */
-	process_syncing_tables(myslotname, commit_data.end_lsn);
+	/* Process any tables that are being synchronized in parallel. */
+	process_syncing_tables(commit_data.end_lsn);
 
 	pgstat_report_activity(STATE_IDLE, NULL);
 }
@@ -1110,7 +1109,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 				reread_subscription();
 
 			/* Process any table synchronization changes. */
-			process_syncing_tables(myslotname, last_received);
+			process_syncing_tables(last_received);
 		}
 
 		/* Cleanup the memory. */
@@ -1420,6 +1419,7 @@ ApplyWorkerMain(Datum main_arg)
 	MemoryContext	oldctx;
 	char			originname[NAMEDATALEN];
 	XLogRecPtr		origin_startpos;
+	char		   *myslotname;
 	WalRcvStreamOptions options;
 
 	/* Attach to slot */
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index a5088c4cba..d10dd2c90a 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -99,8 +99,6 @@ extern LogicalDecodingContext *CreateCopyDecodingContext(
 					  List *output_plugin_options,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
 					  LogicalOutputPluginWriterWrite do_write);
-extern void DecodingContextProccessTuple(LogicalDecodingContext *ctx,
-							 Relation rel, HeapTuple tup);
 extern List *DecodingContextGetTableList(LogicalDecodingContext *ctx);
 
 extern void LogicalIncreaseXminForSlot(XLogRecPtr lsn, TransactionId xmin);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index f642b1c6dc..bf96d340ca 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -71,10 +71,14 @@ extern int logicalrep_sync_worker_count(Oid subid);
 
 extern void logicalrep_worker_sigterm(SIGNAL_ARGS);
 extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
-void process_syncing_tables(char *slotname, XLogRecPtr end_lsn);
+void process_syncing_tables(XLogRecPtr current_lsn);
 void invalidate_syncing_table_states(Datum arg, int cacheid,
 									 uint32 hashvalue);
 
-#define am_tablesync_worker() OidIsValid(MyLogicalRepWorker->relid)
+static inline bool
+am_tablesync_worker(void)
+{
+	return OidIsValid(MyLogicalRepWorker->relid);
+}
 
 #endif   /* WORKER_INTERNAL_H */
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index d8dc55a129..0912bef657 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -14,7 +14,6 @@ CREATE SUBSCRIPTION testsub PUBLICATION foo;
 ERROR:  syntax error at or near "PUBLICATION"
 LINE 1: CREATE SUBSCRIPTION testsub PUBLICATION foo;
                                     ^
-set client_min_messages to error;
 -- fail - cannot do CREATE SUBSCRIPTION CREATE SLOT inside transaction block
 BEGIN;
 CREATE SUBSCRIPTION testsub CONNECTION 'testconn' PUBLICATION testpub WITH (CREATE SLOT);
@@ -23,7 +22,6 @@ COMMIT;
 CREATE SUBSCRIPTION testsub CONNECTION 'testconn' PUBLICATION testpub;
 ERROR:  invalid connection string syntax: missing "=" after "testconn" in connection info string
 
-reset client_min_messages;
 CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (NOCONNECT);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 62c99d8b20..c1199ee629 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -12,14 +12,12 @@ CREATE SUBSCRIPTION testsub CONNECTION 'foo';
 -- fail - no connection
 CREATE SUBSCRIPTION testsub PUBLICATION foo;
 
-set client_min_messages to error;
 -- fail - cannot do CREATE SUBSCRIPTION CREATE SLOT inside transaction block
 BEGIN;
 CREATE SUBSCRIPTION testsub CONNECTION 'testconn' PUBLICATION testpub WITH (CREATE SLOT);
 COMMIT;
 
 CREATE SUBSCRIPTION testsub CONNECTION 'testconn' PUBLICATION testpub;
-reset client_min_messages;
 
 CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (NOCONNECT);
 
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index 23eb39d666..d1817f57da 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -3,7 +3,7 @@
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 13;
+use Test::More tests => 14;
 
 # Initialize publisher node
 my $node_publisher = get_new_node('publisher');
@@ -209,5 +209,9 @@
   $node_publisher->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots");
 is($result, qq(0), 'check replication slot was dropped on publisher');
 
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_replication_origin");
+is($result, qq(0), 'check replication origin was dropped on subscriber');
+
 $node_subscriber->stop('fast');
 $node_publisher->stop('fast');
diff --git a/src/test/subscription/t/004_sync.pl b/src/test/subscription/t/004_sync.pl
index 98cdf08b48..87541a0e6e 100644
--- a/src/test/subscription/t/004_sync.pl
+++ b/src/test/subscription/t/004_sync.pl
@@ -1,4 +1,4 @@
-# Basic logical replication test
+# Tests for logical replication table syncing
 use strict;
 use warnings;
 use PostgresNode;
-- 
2.12.0

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to