On Fri, Feb 13, 2015 at 4:57 PM, Michael Paquier wrote:
> Moved patch to CF 2015-02 to not lose track of it, also because it does not
> seem it received a proper review.
This patch does not apply anymore, so attached is a rebased version.
The comments mentioned here have not been addressed:
http://www.postgresql.org/message-id/[email protected]
Also, what kind of tests have been done? Logical decoding cannot be
used while a node is in recovery.
Regards,
--
Michael
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 4a20569..3036ce6 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -216,7 +216,8 @@ static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, Transac
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
-
+static XLogRecPtr GetLatestRequestPtr(void);
+static TimeLineID ReadSendTimeLine(TimeLineID tli);
/* Initialize walsender process before entering the main command loop */
void
@@ -535,8 +536,6 @@ StartReplication(StartReplicationCmd *cmd)
if (cmd->timeline != 0)
{
- XLogRecPtr switchpoint;
-
sendTimeLine = cmd->timeline;
if (sendTimeLine == ThisTimeLineID)
{
@@ -545,18 +544,13 @@ StartReplication(StartReplicationCmd *cmd)
}
else
{
- List *timeLineHistory;
-
sendTimeLineIsHistoric = true;
/*
* Check that the timeline the client requested for exists, and
* the requested start location is on that timeline.
*/
- timeLineHistory = readTimeLineHistory(ThisTimeLineID);
- switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
- &sendTimeLineNextTLI);
- list_free_deep(timeLineHistory);
+ (void) ReadSendTimeLine(cmd->timeline);
/*
* Found the requested timeline in the history. Check that
@@ -576,8 +570,8 @@ StartReplication(StartReplicationCmd *cmd)
* that's older than the switchpoint, if it's still in the same
* WAL segment.
*/
- if (!XLogRecPtrIsInvalid(switchpoint) &&
- switchpoint < cmd->startpoint)
+ if (!XLogRecPtrIsInvalid(sendTimeLineValidUpto) &&
+ sendTimeLineValidUpto < cmd->startpoint)
{
ereport(ERROR,
(errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
@@ -586,10 +580,9 @@ StartReplication(StartReplicationCmd *cmd)
cmd->timeline),
errdetail("This server's history forked from timeline %u at %X/%X.",
cmd->timeline,
- (uint32) (switchpoint >> 32),
- (uint32) (switchpoint))));
+ (uint32) (sendTimeLineValidUpto >> 32),
+ (uint32) (sendTimeLineValidUpto))));
}
- sendTimeLineValidUpto = switchpoint;
}
}
else
@@ -928,6 +921,8 @@ static void
StartLogicalReplication(StartReplicationCmd *cmd)
{
StringInfoData buf;
+ XLogRecPtr FlushPtr;
+ List *timeLineHistory;
/* make sure that our requirements are still fulfilled */
CheckLogicalDecodingRequirements();
@@ -940,6 +935,8 @@ StartLogicalReplication(StartReplicationCmd *cmd)
* Force a disconnect, so that the decoding code doesn't need to care
* about an eventual switch from running in recovery, to running in a
* normal environment. Client code is expected to handle reconnects.
+ * This covers the race condition where we are promoted half way
+ * through starting up.
*/
if (am_cascading_walsender && !RecoveryInProgress())
{
@@ -948,6 +945,14 @@ StartLogicalReplication(StartReplicationCmd *cmd)
walsender_ready_to_stop = true;
}
+ if (am_cascading_walsender)
+ {
+ /* this also updates ThisTimeLineID */
+ FlushPtr = GetStandbyFlushRecPtr();
+ }
+ else
+ FlushPtr = GetFlushRecPtr();
+
WalSndSetState(WALSNDSTATE_CATCHUP);
/* Send a CopyBothResponse message, and start streaming */
@@ -974,6 +979,24 @@ StartLogicalReplication(StartReplicationCmd *cmd)
logical_startptr = MyReplicationSlot->data.restart_lsn;
/*
+ * Find the timeline for the start location, or throw an error.
+ *
+ * Logical replication relies upon replication slots. Each slot has a
+ * single timeline history baked into it, so this should be easy.
+ */
+ timeLineHistory = readTimeLineHistory(ThisTimeLineID);
+ sendTimeLine = tliOfPointInHistory(logical_startptr, timeLineHistory);
+ if (sendTimeLine != ThisTimeLineID)
+ {
+ sendTimeLineIsHistoric = true;
+ sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, timeLineHistory,
+ &sendTimeLineNextTLI);
+ }
+ list_free_deep(timeLineHistory);
+
+ streamingDoneSending = streamingDoneReceiving = false;
+
+ /*
* Report the location after which we'll send out further commits as the
* current sentPtr.
*/
@@ -2179,93 +2202,10 @@ XLogSendPhysical(void)
return;
}
- /* Figure out how far we can safely send the WAL. */
- if (sendTimeLineIsHistoric)
- {
- /*
- * Streaming an old timeline that's in this server's history, but is
- * not the one we're currently inserting or replaying. It can be
- * streamed up to the point where we switched off that timeline.
- */
- SendRqstPtr = sendTimeLineValidUpto;
- }
- else if (am_cascading_walsender)
- {
- /*
- * Streaming the latest timeline on a standby.
- *
- * Attempt to send all WAL that has already been replayed, so that we
- * know it's valid. If we're receiving WAL through streaming
- * replication, it's also OK to send any WAL that has been received
- * but not replayed.
- *
- * The timeline we're recovering from can change, or we can be
- * promoted. In either case, the current timeline becomes historic. We
- * need to detect that so that we don't try to stream past the point
- * where we switched to another timeline. We check for promotion or
- * timeline switch after calculating FlushPtr, to avoid a race
- * condition: if the timeline becomes historic just after we checked
- * that it was still current, it's still be OK to stream it up to the
- * FlushPtr that was calculated before it became historic.
- */
- bool becameHistoric = false;
-
- SendRqstPtr = GetStandbyFlushRecPtr();
-
- if (!RecoveryInProgress())
- {
- /*
- * We have been promoted. RecoveryInProgress() updated
- * ThisTimeLineID to the new current timeline.
- */
- am_cascading_walsender = false;
- becameHistoric = true;
- }
- else
- {
- /*
- * Still a cascading standby. But is the timeline we're sending
- * still the one recovery is recovering from? ThisTimeLineID was
- * updated by the GetStandbyFlushRecPtr() call above.
- */
- if (sendTimeLine != ThisTimeLineID)
- becameHistoric = true;
- }
-
- if (becameHistoric)
- {
- /*
- * The timeline we were sending has become historic. Read the
- * timeline history file of the new timeline to see where exactly
- * we forked off from the timeline we were sending.
- */
- List *history;
-
- history = readTimeLineHistory(ThisTimeLineID);
- sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI);
-
- Assert(sendTimeLine < sendTimeLineNextTLI);
- list_free_deep(history);
-
- sendTimeLineIsHistoric = true;
-
- SendRqstPtr = sendTimeLineValidUpto;
- }
- }
- else
- {
- /*
- * Streaming the current timeline on a master.
- *
- * Attempt to send all data that's already been written out and
- * fsync'd to disk. We cannot go further than what's been written out
- * given the current implementation of XLogRead(). And in any case
- * it's unsafe to send WAL that is not securely down to disk on the
- * master: if the master subsequently crashes and restarts, slaves
- * must not have applied any WAL that gets lost on the master.
- */
- SendRqstPtr = GetFlushRecPtr();
- }
+ /*
+ * Get the SendRqstPtr and follow any timeline changes.
+ */
+ SendRqstPtr = GetLatestRequestPtr();
/*
* If this is a historic timeline and we've reached the point where we
@@ -2402,6 +2342,7 @@ XLogSendPhysical(void)
static void
XLogSendLogical(void)
{
+ XLogRecPtr SendRqstPtr;
XLogRecord *record;
char *errm;
@@ -2436,6 +2377,42 @@ XLogSendLogical(void)
WalSndCaughtUp = true;
}
+ /*
+ * We don't need the SendRqstPtr, but we want to follow timeline
+ * changes and set sendTimeLineIsHistoric if required.
+ */
+ if (!sendTimeLineIsHistoric)
+ SendRqstPtr = GetLatestRequestPtr();
+
+ /*
+ * If this is a historic timeline and we've reached the point where we
+ * forked to the next timeline, switch to new timeline.
+ */
+ if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
+ {
+ /* close the current file. */
+ if (sendFile >= 0)
+ close(sendFile);
+ sendFile = -1;
+
+ elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
+ (uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto,
+ (uint32) (sentPtr >> 32), (uint32) sentPtr);
+
+ /*
+ * Did we reach the current timeline yet? If not, switch to the
+ * next one and follow that to its endpoint.
+ */
+ if (sendTimeLineNextTLI == ThisTimeLineID)
+ {
+ sendTimeLineIsHistoric = false;
+ sendTimeLine = sendTimeLineNextTLI;
+ sendTimeLineValidUpto = InvalidXLogRecPtr;
+ }
+ else
+ sendTimeLine = ReadSendTimeLine(sendTimeLineNextTLI);
+ }
+
/* Update shared memory status */
{
/* use volatile pointer to prevent code rearrangement */
@@ -2445,6 +2422,8 @@ XLogSendLogical(void)
walsnd->sentPtr = sentPtr;
SpinLockRelease(&walsnd->mutex);
}
+
+ /* ps display updated by plugin, if desired */
}
/*
@@ -2947,3 +2926,106 @@ GetOldestWALSendPointer(void)
}
#endif
+
+static XLogRecPtr
+GetLatestRequestPtr(void)
+{
+ XLogRecPtr SendRqstPtr;
+
+ /* Figure out how far we can safely send the WAL. */
+ if (sendTimeLineIsHistoric)
+ {
+ /*
+ * Streaming an old timeline timeline that's in this server's history,
+ * but is not the one we're currently inserting or replaying. It can
+ * be streamed up to the point where we switched off that timeline.
+ */
+ SendRqstPtr = sendTimeLineValidUpto;
+ }
+ else if (am_cascading_walsender)
+ {
+ /*
+ * Streaming the latest timeline on a standby.
+ *
+ * Attempt to send all WAL that has already been replayed, so that we
+ * know it's valid. If we're receiving WAL through streaming
+ * replication, it's also OK to send any WAL that has been received
+ * but not replayed.
+ *
+ * The timeline we're recovering from can change, or we can be
+ * promoted. In either case, the current timeline becomes historic. We
+ * need to detect that so that we don't try to stream past the point
+ * where we switched to another timeline. We check for promotion or
+ * timeline switch after calculating FlushPtr, to avoid a race
+ * condition: if the timeline becomes historic just after we checked
+ * that it was still current, it's still be OK to stream it up to the
+ * FlushPtr that was calculated before it became historic.
+ */
+ bool becameHistoric = false;
+
+ SendRqstPtr = GetStandbyFlushRecPtr();
+
+ if (!RecoveryInProgress())
+ {
+ /*
+ * We have been promoted. RecoveryInProgress() updated
+ * ThisTimeLineID to the new current timeline.
+ */
+ am_cascading_walsender = false;
+ becameHistoric = true;
+ }
+ else
+ {
+ /*
+ * Still a cascading standby. But is the timeline we're sending
+ * still the one recovery is recovering from? ThisTimeLineID was
+ * updated by the GetStandbyFlushRecPtr() call above.
+ */
+ if (sendTimeLine != ThisTimeLineID)
+ becameHistoric = true;
+ }
+
+ if (becameHistoric)
+ {
+ /*
+ * The timeline we were sending has become historic. Read the
+ * timeline history file of the new timeline to see where exactly
+ * we forked off from the timeline we were sending.
+ */
+ (void) ReadSendTimeLine(ThisTimeLineID);
+
+ sendTimeLineIsHistoric = true;
+
+ SendRqstPtr = sendTimeLineValidUpto;
+ }
+ }
+ else
+ {
+ /*
+ * Streaming the current timeline on a master.
+ *
+ * Attempt to send all data that's already been written out and
+ * fsync'd to disk. We cannot go further than what's been written out
+ * given the current implementation of XLogRead(). And in any case
+ * it's unsafe to send WAL that is not securely down to disk on the
+ * master: if the master subsequently crashes and restarts, slaves
+ * must not have applied any WAL that gets lost on the master.
+ */
+ SendRqstPtr = GetFlushRecPtr();
+ }
+
+ return SendRqstPtr;
+}
+
+static TimeLineID
+ReadSendTimeLine(TimeLineID tli)
+{
+ List *history;
+
+ history = readTimeLineHistory(ThisTimeLineID);
+ sendTimeLineValidUpto = tliSwitchPoint(tli, history,
+ &sendTimeLineNextTLI);
+
+ Assert(tli < sendTimeLineNextTLI);
+ list_free_deep(history);
+}
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers