Hi all

I made an unfortunate oversight in the logical decoding timeline following
code: it doesn't work for logical decoding from the walsender, because I
left the glue code required out of the final cut of the patch.

The reason the new src/test/recovery/ tests don't notice this is that using
pg_recvlogical from the TAP tests is currently pretty awkward.
pg_recvlogical has no way to specify a stop-point or return when there's no
immediately pending data like the SQL interface does. So you have to read
from it until you figure it's not going to return anything more then kill
it and look at what it did return and hope you don't lose anything in
buffering.I don't much like relying on timeouts as part of normal
successful results since they can upset some of the older and slower
buildfarm members. I'd rather be able to pass a --stoppos= or a --n-xacts=
option, but it was a bit too late to add those.

Per the separate mail I sent to hackers, xlogreader is currently pretty
much timeline-agnostic. The timeline tracking code needs to know when the
xlogreader does a random read and do a fresh lookup so its state is part of
the XLogReaderState struct. But the walsender's xlogreader page read
callback doesn't use the xlogreader's state, and it can't because it's also
used for physical replication, which communicates the timeline to read from
with the page read function via globals. So for now, I just set the globals
before calling the page read function:

+       XLogReadDetermineTimeline(state);
+       sendTimeLineIsHistoric = state->currTLI == ThisTimeLineID;
+       sendTimeLine = state->currTLI;
+       sendTimeLineValidUpto = state->currTLIValidUntil;
+       sendTimeLineNextTLI = state->nextTLI;

Per separate mail, I'd quite like to tackle the level of duplication in
timeline following logic in 9.7 and maybe see if the _three_ separate read
xlog page functions can be unified at the same time. But that sure isn't
9.6 material.
From c56a32b5ace8a48908da366e5f778fa98a125740 Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Thu, 14 Apr 2016 15:43:20 +0800
Subject: [PATCH] Enable logical timeline following in the walsender

---
 src/backend/access/transam/xlogutils.c |  7 +++----
 src/backend/replication/walsender.c    | 11 +++++++----
 src/include/access/xlogreader.h        |  3 +++
 src/include/access/xlogutils.h         |  2 ++
 4 files changed, 15 insertions(+), 8 deletions(-)

diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index c3213ac..c3b0e5c 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -773,7 +773,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
  * might be reading from historical timeline data on a segment that's been
  * copied to a new timeline.
  */
-static void
+void
 XLogReadDetermineTimeline(XLogReaderState *state)
 {
 	/* Read the history on first time through */
@@ -856,12 +856,11 @@ XLogReadDetermineTimeline(XLogReaderState *state)
 			   state->currTLIValidUntil == InvalidXLogRecPtr)
 		{
 			XLogRecPtr	tliSwitch;
-			TimeLineID	nextTLI;
 
 			CHECK_FOR_INTERRUPTS();
 
 			tliSwitch = tliSwitchPoint(state->currTLI, state->timelineHistory,
-									   &nextTLI);
+									   &state->nextTLI);
 
 			/* round ValidUntil down to start of seg containing the switch */
 			state->currTLIValidUntil =
@@ -875,7 +874,7 @@ XLogReadDetermineTimeline(XLogReaderState *state)
 				 *
 				 * If that's the current TLI we'll stop searching.
 				 */
-				state->currTLI = nextTLI;
+				state->currTLI = state->nextTLI;
 				state->currTLIValidUntil = InvalidXLogRecPtr;
 			}
 		}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index e4a0119..495bff2 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -47,6 +47,7 @@
 #include "access/transam.h"
 #include "access/xact.h"
 #include "access/xlog_internal.h"
+#include "access/xlogutils.h"
 
 #include "catalog/pg_type.h"
 #include "commands/dbcommands.h"
@@ -756,6 +757,12 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 	XLogRecPtr	flushptr;
 	int			count;
 
+	XLogReadDetermineTimeline(state);
+	sendTimeLineIsHistoric = state->currTLI == ThisTimeLineID;
+	sendTimeLine = state->currTLI;
+	sendTimeLineValidUpto = state->currTLIValidUntil;
+	sendTimeLineNextTLI = state->nextTLI;
+
 	/* make sure we have enough WAL available */
 	flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
 
@@ -984,10 +991,6 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 	pq_endmessage(&buf);
 	pq_flush();
 
-	/* setup state for XLogReadPage */
-	sendTimeLineIsHistoric = false;
-	sendTimeLine = ThisTimeLineID;
-
 	/*
 	 * Initialize position to the last ack'ed one, then the xlog records begin
 	 * to be shipped from that position.
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 300747d..bbee552 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -166,6 +166,9 @@ struct XLogReaderState
 	XLogRecPtr	currRecPtr;
 	/* timeline to read it from, 0 if a lookup is required */
 	TimeLineID	currTLI;
+	/* timeline that follows currTLI */
+	TimeLineID  nextTLI;
+
 	/*
 	 * Safe point to read to in currTLI.  If currTLI is historical, then this
 	 * is set to the end of the last whole segment that contains that TLI;
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index d027ea1..9a32ab7 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -52,4 +52,6 @@ extern int read_local_xlog_page(XLogReaderState *state,
 					 XLogRecPtr targetRecPtr, char *cur_page,
 					 TimeLineID *pageTLI);
 
+extern void XLogReadDetermineTimeline(XLogReaderState *state);
+
 #endif
-- 
2.1.0

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to