On Fri, Feb 13, 2015 at 4:57 PM, Michael Paquier wrote:
> Moved patch to CF 2015-02 to not lose track of it, also because it does not
> seem it received a proper review.

This patch does not apply anymore, so attached is a rebased version.
The comments mentioned here have not been addressed:
http://www.postgresql.org/message-id/54a7bf61.9080...@vmware.com
Also, what kind of tests have been done? Logical decoding cannot be
used while a node is in recovery.
Regards,
-- 
Michael
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 4a20569..3036ce6 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -216,7 +216,8 @@ static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, Transac
 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
 
 static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
-
+static XLogRecPtr GetLatestRequestPtr(void);
+static TimeLineID ReadSendTimeLine(TimeLineID tli);
 
 /* Initialize walsender process before entering the main command loop */
 void
@@ -535,8 +536,6 @@ StartReplication(StartReplicationCmd *cmd)
 
 	if (cmd->timeline != 0)
 	{
-		XLogRecPtr	switchpoint;
-
 		sendTimeLine = cmd->timeline;
 		if (sendTimeLine == ThisTimeLineID)
 		{
@@ -545,18 +544,13 @@ StartReplication(StartReplicationCmd *cmd)
 		}
 		else
 		{
-			List	   *timeLineHistory;
-
 			sendTimeLineIsHistoric = true;
 
 			/*
 			 * Check that the timeline the client requested for exists, and
 			 * the requested start location is on that timeline.
 			 */
-			timeLineHistory = readTimeLineHistory(ThisTimeLineID);
-			switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
-										 &sendTimeLineNextTLI);
-			list_free_deep(timeLineHistory);
+			(void) ReadSendTimeLine(cmd->timeline);
 
 			/*
 			 * Found the requested timeline in the history. Check that
@@ -576,8 +570,8 @@ StartReplication(StartReplicationCmd *cmd)
 			 * that's older than the switchpoint, if it's still in the same
 			 * WAL segment.
 			 */
-			if (!XLogRecPtrIsInvalid(switchpoint) &&
-				switchpoint < cmd->startpoint)
+			if (!XLogRecPtrIsInvalid(sendTimeLineValidUpto) &&
+				sendTimeLineValidUpto < cmd->startpoint)
 			{
 				ereport(ERROR,
 						(errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
@@ -586,10 +580,9 @@ StartReplication(StartReplicationCmd *cmd)
 								cmd->timeline),
 						 errdetail("This server's history forked from timeline %u at %X/%X.",
 								   cmd->timeline,
-								   (uint32) (switchpoint >> 32),
-								   (uint32) (switchpoint))));
+								   (uint32) (sendTimeLineValidUpto >> 32),
+								   (uint32) (sendTimeLineValidUpto))));
 			}
-			sendTimeLineValidUpto = switchpoint;
 		}
 	}
 	else
@@ -928,6 +921,8 @@ static void
 StartLogicalReplication(StartReplicationCmd *cmd)
 {
 	StringInfoData buf;
+	XLogRecPtr	FlushPtr;
+	List	   *timeLineHistory;
 
 	/* make sure that our requirements are still fulfilled */
 	CheckLogicalDecodingRequirements();
@@ -940,6 +935,8 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 	 * Force a disconnect, so that the decoding code doesn't need to care
 	 * about an eventual switch from running in recovery, to running in a
 	 * normal environment. Client code is expected to handle reconnects.
+	 * This covers the race condition where we are promoted half way
+	 * through starting up.
 	 */
 	if (am_cascading_walsender && !RecoveryInProgress())
 	{
@@ -948,6 +945,14 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 		walsender_ready_to_stop = true;
 	}
 
+	if (am_cascading_walsender)
+	{
+		/* this also updates ThisTimeLineID */
+		FlushPtr = GetStandbyFlushRecPtr();
+	}
+	else
+		FlushPtr = GetFlushRecPtr();
+
 	WalSndSetState(WALSNDSTATE_CATCHUP);
 
 	/* Send a CopyBothResponse message, and start streaming */
@@ -974,6 +979,24 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 	logical_startptr = MyReplicationSlot->data.restart_lsn;
 
 	/*
+	 * Find the timeline for the start location, or throw an error.
+	 *
+	 * Logical replication relies upon replication slots. Each slot has a
+	 * single timeline history baked into it, so this should be easy.
+	 */
+	timeLineHistory = readTimeLineHistory(ThisTimeLineID);
+	sendTimeLine = tliOfPointInHistory(logical_startptr, timeLineHistory);
+	if (sendTimeLine != ThisTimeLineID)
+	{
+		sendTimeLineIsHistoric = true;
+		sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, timeLineHistory,
+										 &sendTimeLineNextTLI);
+	}
+	list_free_deep(timeLineHistory);
+
+	streamingDoneSending = streamingDoneReceiving = false;
+
+	/*
 	 * Report the location after which we'll send out further commits as the
 	 * current sentPtr.
 	 */
@@ -2179,93 +2202,10 @@ XLogSendPhysical(void)
 		return;
 	}
 
-	/* Figure out how far we can safely send the WAL. */
-	if (sendTimeLineIsHistoric)
-	{
-		/*
-		 * Streaming an old timeline that's in this server's history, but is
-		 * not the one we're currently inserting or replaying. It can be
-		 * streamed up to the point where we switched off that timeline.
-		 */
-		SendRqstPtr = sendTimeLineValidUpto;
-	}
-	else if (am_cascading_walsender)
-	{
-		/*
-		 * Streaming the latest timeline on a standby.
-		 *
-		 * Attempt to send all WAL that has already been replayed, so that we
-		 * know it's valid. If we're receiving WAL through streaming
-		 * replication, it's also OK to send any WAL that has been received
-		 * but not replayed.
-		 *
-		 * The timeline we're recovering from can change, or we can be
-		 * promoted. In either case, the current timeline becomes historic. We
-		 * need to detect that so that we don't try to stream past the point
-		 * where we switched to another timeline. We check for promotion or
-		 * timeline switch after calculating FlushPtr, to avoid a race
-		 * condition: if the timeline becomes historic just after we checked
-		 * that it was still current, it's still be OK to stream it up to the
-		 * FlushPtr that was calculated before it became historic.
-		 */
-		bool		becameHistoric = false;
-
-		SendRqstPtr = GetStandbyFlushRecPtr();
-
-		if (!RecoveryInProgress())
-		{
-			/*
-			 * We have been promoted. RecoveryInProgress() updated
-			 * ThisTimeLineID to the new current timeline.
-			 */
-			am_cascading_walsender = false;
-			becameHistoric = true;
-		}
-		else
-		{
-			/*
-			 * Still a cascading standby. But is the timeline we're sending
-			 * still the one recovery is recovering from? ThisTimeLineID was
-			 * updated by the GetStandbyFlushRecPtr() call above.
-			 */
-			if (sendTimeLine != ThisTimeLineID)
-				becameHistoric = true;
-		}
-
-		if (becameHistoric)
-		{
-			/*
-			 * The timeline we were sending has become historic. Read the
-			 * timeline history file of the new timeline to see where exactly
-			 * we forked off from the timeline we were sending.
-			 */
-			List	   *history;
-
-			history = readTimeLineHistory(ThisTimeLineID);
-			sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI);
-
-			Assert(sendTimeLine < sendTimeLineNextTLI);
-			list_free_deep(history);
-
-			sendTimeLineIsHistoric = true;
-
-			SendRqstPtr = sendTimeLineValidUpto;
-		}
-	}
-	else
-	{
-		/*
-		 * Streaming the current timeline on a master.
-		 *
-		 * Attempt to send all data that's already been written out and
-		 * fsync'd to disk.  We cannot go further than what's been written out
-		 * given the current implementation of XLogRead().  And in any case
-		 * it's unsafe to send WAL that is not securely down to disk on the
-		 * master: if the master subsequently crashes and restarts, slaves
-		 * must not have applied any WAL that gets lost on the master.
-		 */
-		SendRqstPtr = GetFlushRecPtr();
-	}
+	/*
+	 * Get the SendRqstPtr and follow any timeline changes.
+	 */
+	SendRqstPtr = GetLatestRequestPtr();
 
 	/*
 	 * If this is a historic timeline and we've reached the point where we
@@ -2402,6 +2342,7 @@ XLogSendPhysical(void)
 static void
 XLogSendLogical(void)
 {
+	XLogRecPtr	SendRqstPtr;
 	XLogRecord *record;
 	char	   *errm;
 
@@ -2436,6 +2377,42 @@ XLogSendLogical(void)
 			WalSndCaughtUp = true;
 	}
 
+	/*
+	 * We don't need the SendRqstPtr, but we want to follow timeline
+	 * changes and set sendTimeLineIsHistoric if required.
+	 */
+	if (!sendTimeLineIsHistoric)
+		SendRqstPtr = GetLatestRequestPtr();
+
+	/*
+	 * If this is a historic timeline and we've reached the point where we
+	 * forked to the next timeline, switch to new timeline.
+	 */
+	if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
+	{
+		/* close the current file. */
+		if (sendFile >= 0)
+			close(sendFile);
+		sendFile = -1;
+
+		elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
+			 (uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto,
+			 (uint32) (sentPtr >> 32), (uint32) sentPtr);
+
+		/*
+		 * Did we reach the current timeline yet? If not, switch to the
+		 * next one and follow that to its endpoint.
+		 */
+		if (sendTimeLineNextTLI == ThisTimeLineID)
+		{
+			sendTimeLineIsHistoric = false;
+			sendTimeLine = sendTimeLineNextTLI;
+			sendTimeLineValidUpto = InvalidXLogRecPtr;
+		}
+		else
+			sendTimeLine = ReadSendTimeLine(sendTimeLineNextTLI);
+	}
+
 	/* Update shared memory status */
 	{
 		/* use volatile pointer to prevent code rearrangement */
@@ -2445,6 +2422,8 @@ XLogSendLogical(void)
 		walsnd->sentPtr = sentPtr;
 		SpinLockRelease(&walsnd->mutex);
 	}
+
+	/* ps display updated by plugin, if desired */
 }
 
 /*
@@ -2947,3 +2926,106 @@ GetOldestWALSendPointer(void)
 }
 
 #endif
+
+static XLogRecPtr
+GetLatestRequestPtr(void)
+{
+	XLogRecPtr	SendRqstPtr;
+
+	/* Figure out how far we can safely send the WAL. */
+	if (sendTimeLineIsHistoric)
+	{
+		/*
+		 * Streaming an old timeline timeline that's in this server's history,
+		 * but is not the one we're currently inserting or replaying. It can
+		 * be streamed up to the point where we switched off that timeline.
+		 */
+		SendRqstPtr =  sendTimeLineValidUpto;
+	}
+	else if (am_cascading_walsender)
+	{
+		/*
+		 * Streaming the latest timeline on a standby.
+		 *
+		 * Attempt to send all WAL that has already been replayed, so that we
+		 * know it's valid. If we're receiving WAL through streaming
+		 * replication, it's also OK to send any WAL that has been received
+		 * but not replayed.
+		 *
+		 * The timeline we're recovering from can change, or we can be
+		 * promoted. In either case, the current timeline becomes historic. We
+		 * need to detect that so that we don't try to stream past the point
+		 * where we switched to another timeline. We check for promotion or
+		 * timeline switch after calculating FlushPtr, to avoid a race
+		 * condition: if the timeline becomes historic just after we checked
+		 * that it was still current, it's still be OK to stream it up to the
+		 * FlushPtr that was calculated before it became historic.
+		 */
+		bool		becameHistoric = false;
+
+		SendRqstPtr = GetStandbyFlushRecPtr();
+
+		if (!RecoveryInProgress())
+		{
+			/*
+			 * We have been promoted. RecoveryInProgress() updated
+			 * ThisTimeLineID to the new current timeline.
+			 */
+			am_cascading_walsender = false;
+			becameHistoric = true;
+		}
+		else
+		{
+			/*
+			 * Still a cascading standby. But is the timeline we're sending
+			 * still the one recovery is recovering from? ThisTimeLineID was
+			 * updated by the GetStandbyFlushRecPtr() call above.
+			 */
+			if (sendTimeLine != ThisTimeLineID)
+				becameHistoric = true;
+		}
+
+		if (becameHistoric)
+		{
+			/*
+			 * The timeline we were sending has become historic. Read the
+			 * timeline history file of the new timeline to see where exactly
+			 * we forked off from the timeline we were sending.
+			 */
+			(void) ReadSendTimeLine(ThisTimeLineID);
+
+			sendTimeLineIsHistoric = true;
+
+			SendRqstPtr = sendTimeLineValidUpto;
+		}
+	}
+	else
+	{
+		/*
+		 * Streaming the current timeline on a master.
+		 *
+		 * Attempt to send all data that's already been written out and
+		 * fsync'd to disk.  We cannot go further than what's been written out
+		 * given the current implementation of XLogRead().  And in any case
+		 * it's unsafe to send WAL that is not securely down to disk on the
+		 * master: if the master subsequently crashes and restarts, slaves
+		 * must not have applied any WAL that gets lost on the master.
+		 */
+		SendRqstPtr = GetFlushRecPtr();
+	}
+
+	return SendRqstPtr;
+}
+
+static TimeLineID
+ReadSendTimeLine(TimeLineID tli)
+{
+	List	   *history;
+
+	history = readTimeLineHistory(ThisTimeLineID);
+	sendTimeLineValidUpto = tliSwitchPoint(tli, history,
+										   &sendTimeLineNextTLI);
+
+	Assert(tli < sendTimeLineNextTLI);
+	list_free_deep(history);
+}
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to