On 03/06/17 16:12, Jeff Janes wrote:
> 
> On Fri, Jun 2, 2017 at 4:10 PM, Petr Jelinek
> <petr.jeli...@2ndquadrant.com <mailto:petr.jeli...@2ndquadrant.com>> wrote:
> 
> 
>     While I was testing something for different thread I noticed that I
>     manage transactions incorrectly in this patch. Fixed here, I didn't test
>     it much yet (it takes a while as you know :) ). Not sure if it's related
>     to the issue you've seen though.
> 
> 
> This conflicts again with Peter E's recent commit
> 3c9bc2157a4f465b3c070d1250597568d2dc285f
> 

Rebased on top of current HEAD.

-- 
  Petr Jelinek                  http://www.2ndQuadrant.com/
  PostgreSQL Development, 24x7 Support, Training & Services
>From 067edf450967c76d000c0b4a2cddf719fae45f7f Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmo...@pjmodos.net>
Date: Wed, 31 May 2017 01:51:45 +0200
Subject: [PATCH] Improve handover logic between sync and apply workers

Make apply busy wait check the catalog instead of shmem state to ensure
that next transaction will see the expected table synchronization state.

Also make the handover always go through same set of steps to make the
overall process easier to understand and debug.
---
 src/backend/replication/logical/tablesync.c | 215 ++++++++++++++++------------
 1 file changed, 123 insertions(+), 92 deletions(-)

diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 6e268f3..8926914 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -27,27 +27,21 @@
  *		 synchronization has finished.
  *
  *	  The stream position synchronization works in multiple steps.
- *	   - Sync finishes copy and sets table state as SYNCWAIT and waits
+ *	   - Sync finishes copy and sets worker state as SYNCWAIT and waits
  *		 for state to change in a loop.
  *	   - Apply periodically checks tables that are synchronizing for SYNCWAIT.
- *		 When the desired state appears it will compare its position in the
- *		 stream with the SYNCWAIT position and based on that changes the
- *		 state to based on following rules:
- *		  - if the apply is in front of the sync in the WAL stream the new
- *			state is set to CATCHUP and apply loops until the sync process
- *			catches up to the same LSN as apply
- *		  - if the sync is in front of the apply in the WAL stream the new
- *			state is set to SYNCDONE
- *		  - if both apply and sync are at the same position in the WAL stream
- *			the state of the table is set to READY
- *	   - If the state was set to CATCHUP sync will read the stream and
- *		 apply changes until it catches up to the specified stream
- *		 position and then sets state to READY and signals apply that it
- *		 can stop waiting and exits, if the state was set to something
- *		 else than CATCHUP the sync process will simply end.
- *	   - If the state was set to SYNCDONE by apply, the apply will
- *		 continue tracking the table until it reaches the SYNCDONE stream
- *		 position at which point it sets state to READY and stops tracking.
+ *		 When the desired state appears it will set the worker state to
+ *		 CATCHUP and starts loop waiting until either table state is set to
+ *		 SYNCDONE or worker exits
+ *	   - After worker seen the state change to CATCHUP, it will read the
+ *	     stream and apply changes until it catches up to the specified stream
+ *		 position and then sets state to SYNCDONE. Note that there might be
+ *		 zero changes applied betweem CATCHUP and SYNCDONE states as the sync
+ *		 worker might be ahead of the apply.
+ *	   - Once the state was set to SYNCDONE, the apply will continue tracking
+ *	     the table until it reaches the SYNCDONE stream position at which
+ *	     point it sets state to READY and stops tracking. Again there might
+ *	     be zero changes inbetween.
  *
  *	  The catalog pg_subscription_rel is used to keep information about
  *	  subscribed tables and their state and some transient state during
@@ -56,26 +50,29 @@
  *	  Example flows look like this:
  *	   - Apply is in front:
  *		  sync:8
- *			-> set SYNCWAIT
+ *			-> set in memory SYNCWAIT
  *		  apply:10
- *			-> set CATCHUP
+ *			-> set in memory CATCHUP
  *			-> enter wait-loop
  *		  sync:10
- *			-> set READY
+ *			-> set in catalog SYNCDONE
  *			-> exit
  *		  apply:10
  *			-> exit wait-loop
  *			-> continue rep
+ *		  apply:11
+ *		    -> set in catalog READY
  *	   - Sync in front:
  *		  sync:10
- *			-> set SYNCWAIT
+ *			-> set in memory SYNCWAIT
  *		  apply:8
- *			-> set SYNCDONE
+ *			-> set in memory CATCHUP
  *			-> continue per-table filtering
- *		  sync:10
+  *		  sync:10
+ *			-> set in catalog SYNCDONE
  *			-> exit
  *		  apply:10
- *			-> set READY
+ *			-> set in catalog READY
  *			-> stop per-table filtering
  *			-> continue rep
  *-------------------------------------------------------------------------
@@ -100,6 +97,7 @@
 #include "replication/walreceiver.h"
 #include "replication/worker_internal.h"
 
+#include "utils/snapmgr.h"
 #include "storage/ipc.h"
 
 #include "utils/builtins.h"
@@ -130,41 +128,54 @@ finish_sync_worker(void)
 	/* And flush all writes. */
 	XLogFlush(GetXLogWriteRecPtr());
 
-	/* Find the main apply worker and signal it. */
-	logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
-
 	StartTransactionCommand();
 	ereport(LOG,
 			(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
 					MySubscription->name, get_rel_name(MyLogicalRepWorker->relid))));
 	CommitTransactionCommand();
 
+	/* Find the main apply worker and signal it. */
+	logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
+
 	/* Stop gracefully */
 	proc_exit(0);
 }
 
 /*
- * Wait until the table synchronization change.
+ * Wait until the relation synchronization state is set in catalog to the
+ * expected one.
  *
- * If called from apply worker, it will wait for the synchronization worker to
- * change table state in shmem.  If called from synchronization worker, it
- * will wait for apply worker to change table state in shmem.
+ * Used when transitioning from CATCHUP state to SYNCDONE.
  *
- * Returns false if the opposite worker has disappeared or the table state has
- * been reset.
+ * Returns false if the synchronization worker has disappeared or table state
+ * has been reset.
  */
 static bool
-wait_for_sync_status_change(Oid relid, char origstate)
+wait_for_relation_state_change(Oid relid, char expected_state)
 {
-	int			rc;
-	char		state = origstate;
+	int		rc;
+	char	state;
 
 	for (;;)
 	{
-		LogicalRepWorker *worker;
+		LogicalRepWorker   *worker;
+		XLogRecPtr			statelsn;
 
 		CHECK_FOR_INTERRUPTS();
 
+		/* XXX use cache invalidation here to improve performance? */
+		PushActiveSnapshot(GetLatestSnapshot());
+		state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
+										relid, &statelsn, true);
+		PopActiveSnapshot();
+
+		if (state == SUBREL_STATE_UNKNOWN)
+			return false;
+
+		if (state == expected_state)
+			return true;
+
+		/* Check if the sync worker is still running and bail if not. */
 		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 
 		/* Check if the opposite worker is still running and bail if not. */
@@ -176,23 +187,54 @@ wait_for_sync_status_change(Oid relid, char origstate)
 			LWLockRelease(LogicalRepWorkerLock);
 			return false;
 		}
+		LWLockRelease(LogicalRepWorkerLock);
 
-		/*
-		 * If I'm the synchronization worker, look at my own state.  Otherwise
-		 * look at the state of the synchronization worker we found above.
-		 */
-		if (am_tablesync_worker())
-			worker = MyLogicalRepWorker;
+		rc = WaitLatch(&MyProc->procLatch,
+					   WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
+					   1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
 
-		Assert(worker->relid == relid);
-		state = worker->relstate;
+		/* emergency bailout if postmaster has died */
+		if (rc & WL_POSTMASTER_DEATH)
+			proc_exit(1);
 
-		LWLockRelease(LogicalRepWorkerLock);
+		ResetLatch(&MyProc->procLatch);
+	}
 
-		if (state == SUBREL_STATE_UNKNOWN)
+	return false;
+}
+
+/*
+ * Wait until the the apply worker changes the state of our synchronization
+ * worker to the expected one.
+ *
+ * Used when transitioning from SYNCWAIT state to CATCHUP.
+ *
+ * Returns false if the apply worker has disappeared or table state has been
+ * reset.
+ */
+static bool
+wait_for_worker_state_change(char expected_state)
+{
+	int		rc;
+
+	for (;;)
+	{
+		LogicalRepWorker   *worker;
+
+		CHECK_FOR_INTERRUPTS();
+
+		/* Bail if he apply has died. */
+		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+		worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
+										InvalidOid, false);
+		if (!worker)
+		{
+			LWLockRelease(LogicalRepWorkerLock);
 			return false;
+		}
+		LWLockRelease(LogicalRepWorkerLock);
 
-		if (state != origstate)
+		if (MyLogicalRepWorker->relstate == expected_state)
 			return true;
 
 		rc = WaitLatch(&MyProc->procLatch,
@@ -239,10 +281,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 	{
 		TimeLineID	tli;
 
-		MyLogicalRepWorker->relstate =
-			(current_lsn == MyLogicalRepWorker->relstate_lsn)
-			? SUBREL_STATE_READY
-			: SUBREL_STATE_SYNCDONE;
+		MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
 		MyLogicalRepWorker->relstate_lsn = current_lsn;
 
 		SpinLockRelease(&MyLogicalRepWorker->relmutex);
@@ -416,45 +455,29 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 			if (syncworker && rstate->state == SUBREL_STATE_SYNCWAIT)
 			{
 				/*
-				 * There are three possible synchronization situations here.
-				 *
-				 * a) Apply is in front of the table sync: We tell the table
-				 * sync to CATCHUP.
-				 *
-				 * b) Apply is behind the table sync: We tell the table sync
-				 * to mark the table as SYNCDONE and finish.
-				 *
-				 * c) Apply and table sync are at the same position: We tell
-				 * table sync to mark the table as READY and finish.
-				 *
-				 * In any case we'll need to wait for table sync to change the
-				 * state in catalog and only then continue ourselves.
+				 * Tell sync worker it can catchup now. We'll wait for it so
+				 * it does not get lost.
 				 */
-				if (current_lsn > rstate->lsn)
-				{
-					rstate->state = SUBREL_STATE_CATCHUP;
-					rstate->lsn = current_lsn;
-				}
-				else if (current_lsn == rstate->lsn)
-				{
-					rstate->state = SUBREL_STATE_READY;
-					rstate->lsn = current_lsn;
-				}
-				else
-					rstate->state = SUBREL_STATE_SYNCDONE;
-
 				SpinLockAcquire(&syncworker->relmutex);
-				syncworker->relstate = rstate->state;
-				syncworker->relstate_lsn = rstate->lsn;
+				syncworker->relstate = SUBREL_STATE_CATCHUP;
+				syncworker->relstate_lsn =
+					Max(syncworker->relstate_lsn, current_lsn);
 				SpinLockRelease(&syncworker->relmutex);
 
 				/* Signal the sync worker, as it may be waiting for us. */
 				logicalrep_worker_wakeup_ptr(syncworker);
 
 				/*
-				 * Enter busy loop and wait for synchronization status change.
+				 * Enter busy loop and wait for synchronization worker to
+				 * reach expected state (or die trying).
 				 */
-				wait_for_sync_status_change(rstate->relid, rstate->state);
+				if (!started_tx)
+				{
+					StartTransactionCommand();
+					started_tx = true;
+				}
+				wait_for_relation_state_change(rstate->relid,
+											   SUBREL_STATE_SYNCDONE);
 			}
 
 			/*
@@ -889,19 +912,27 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 				MyLogicalRepWorker->relstate_lsn = *origin_startpos;
 				SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
+				/* Wait for main apply worker to either tell us to catchup. */
+				wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
+
 				/*
-				 * Wait for main apply worker to either tell us to catchup or
-				 * that we are done.
+				 * There are now two possible state here.
+				 * a) Sync is behind the apply. If that's the case we need
+				 *    to catch up with it by consuming the logical replication
+				 *    stream up to the relstate_lsn.
+				 * b) Sync is caught up with the apply so it can just set the
+				 *    state to SYNCDONE and finish.
 				 */
-				wait_for_sync_status_change(MyLogicalRepWorker->relid,
-											MyLogicalRepWorker->relstate);
-				if (MyLogicalRepWorker->relstate != SUBREL_STATE_CATCHUP)
+				if (*origin_startpos >= MyLogicalRepWorker->relstate_lsn)
 				{
-					/* Update the new state. */
+					/*
+					 * Update the new state in catalog.  No need to bother
+					 * with the shmem state as we are exiting for good.
+					 */
 					SetSubscriptionRelState(MyLogicalRepWorker->subid,
 											MyLogicalRepWorker->relid,
-											MyLogicalRepWorker->relstate,
-											MyLogicalRepWorker->relstate_lsn);
+											SUBREL_STATE_SYNCDONE,
+											*origin_startpos);
 					finish_sync_worker();
 				}
 				break;
-- 
2.7.4

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to