Hi all I made an unfortunate oversight in the logical decoding timeline following code: it doesn't work for logical decoding from the walsender, because I left the glue code required out of the final cut of the patch.
The reason the new src/test/recovery/ tests don't notice this is that using pg_recvlogical from the TAP tests is currently pretty awkward. pg_recvlogical has no way to specify a stop-point or return when there's no immediately pending data like the SQL interface does. So you have to read from it until you figure it's not going to return anything more then kill it and look at what it did return and hope you don't lose anything in buffering.I don't much like relying on timeouts as part of normal successful results since they can upset some of the older and slower buildfarm members. I'd rather be able to pass a --stoppos= or a --n-xacts= option, but it was a bit too late to add those. Per the separate mail I sent to hackers, xlogreader is currently pretty much timeline-agnostic. The timeline tracking code needs to know when the xlogreader does a random read and do a fresh lookup so its state is part of the XLogReaderState struct. But the walsender's xlogreader page read callback doesn't use the xlogreader's state, and it can't because it's also used for physical replication, which communicates the timeline to read from with the page read function via globals. So for now, I just set the globals before calling the page read function: + XLogReadDetermineTimeline(state); + sendTimeLineIsHistoric = state->currTLI == ThisTimeLineID; + sendTimeLine = state->currTLI; + sendTimeLineValidUpto = state->currTLIValidUntil; + sendTimeLineNextTLI = state->nextTLI; Per separate mail, I'd quite like to tackle the level of duplication in timeline following logic in 9.7 and maybe see if the _three_ separate read xlog page functions can be unified at the same time. But that sure isn't 9.6 material.
From c56a32b5ace8a48908da366e5f778fa98a125740 Mon Sep 17 00:00:00 2001 From: Craig Ringer <cr...@2ndquadrant.com> Date: Thu, 14 Apr 2016 15:43:20 +0800 Subject: [PATCH] Enable logical timeline following in the walsender --- src/backend/access/transam/xlogutils.c | 7 +++---- src/backend/replication/walsender.c | 11 +++++++---- src/include/access/xlogreader.h | 3 +++ src/include/access/xlogutils.h | 2 ++ 4 files changed, 15 insertions(+), 8 deletions(-) diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index c3213ac..c3b0e5c 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -773,7 +773,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) * might be reading from historical timeline data on a segment that's been * copied to a new timeline. */ -static void +void XLogReadDetermineTimeline(XLogReaderState *state) { /* Read the history on first time through */ @@ -856,12 +856,11 @@ XLogReadDetermineTimeline(XLogReaderState *state) state->currTLIValidUntil == InvalidXLogRecPtr) { XLogRecPtr tliSwitch; - TimeLineID nextTLI; CHECK_FOR_INTERRUPTS(); tliSwitch = tliSwitchPoint(state->currTLI, state->timelineHistory, - &nextTLI); + &state->nextTLI); /* round ValidUntil down to start of seg containing the switch */ state->currTLIValidUntil = @@ -875,7 +874,7 @@ XLogReadDetermineTimeline(XLogReaderState *state) * * If that's the current TLI we'll stop searching. */ - state->currTLI = nextTLI; + state->currTLI = state->nextTLI; state->currTLIValidUntil = InvalidXLogRecPtr; } } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index e4a0119..495bff2 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -47,6 +47,7 @@ #include "access/transam.h" #include "access/xact.h" #include "access/xlog_internal.h" +#include "access/xlogutils.h" #include "catalog/pg_type.h" #include "commands/dbcommands.h" @@ -756,6 +757,12 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req XLogRecPtr flushptr; int count; + XLogReadDetermineTimeline(state); + sendTimeLineIsHistoric = state->currTLI == ThisTimeLineID; + sendTimeLine = state->currTLI; + sendTimeLineValidUpto = state->currTLIValidUntil; + sendTimeLineNextTLI = state->nextTLI; + /* make sure we have enough WAL available */ flushptr = WalSndWaitForWal(targetPagePtr + reqLen); @@ -984,10 +991,6 @@ StartLogicalReplication(StartReplicationCmd *cmd) pq_endmessage(&buf); pq_flush(); - /* setup state for XLogReadPage */ - sendTimeLineIsHistoric = false; - sendTimeLine = ThisTimeLineID; - /* * Initialize position to the last ack'ed one, then the xlog records begin * to be shipped from that position. diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index 300747d..bbee552 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -166,6 +166,9 @@ struct XLogReaderState XLogRecPtr currRecPtr; /* timeline to read it from, 0 if a lookup is required */ TimeLineID currTLI; + /* timeline that follows currTLI */ + TimeLineID nextTLI; + /* * Safe point to read to in currTLI. If currTLI is historical, then this * is set to the end of the last whole segment that contains that TLI; diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h index d027ea1..9a32ab7 100644 --- a/src/include/access/xlogutils.h +++ b/src/include/access/xlogutils.h @@ -52,4 +52,6 @@ extern int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI); +extern void XLogReadDetermineTimeline(XLogReaderState *state); + #endif -- 2.1.0
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers