Hi all Updated timeline following patch attached.
There's a change in read_local_xlog_page to ensure we maintain ThisTimeLineID properly, otherwise it's just comment changes. -- Craig Ringer http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Training & Services
From d42ceaec47793f67c55523d1aeb72be61c4f2dea Mon Sep 17 00:00:00 2001 From: Craig Ringer <cr...@2ndquadrant.com> Date: Thu, 1 Sep 2016 10:16:55 +0800 Subject: [PATCH] Teach xlogreader to follow timeline switches The XLogReader was timeline-agnostic and assumed that all WAL segments requested would be on ThisTimeLineID. When decoding from a logical slot, it's necessary for xlog reading to be able to read xlog from historical (i.e. not current) timelines. Otherwise decoding fails after failover to a physical replica because the oldest still-needed archives are in the historical timeline. Supporting logical decoding timeline following is a pre-requisite for logical decoding on physical standby servers. It also makes it possible to promote a replica with logical slots to a master and replay from those slots, allowing logical decoding applications to follow physical failover. Logical slots cannot actually be created or advanced on a replica so this is mostly foundation work for subsequent changes to enable logical decoding on standbys. Tests are included to exercise the functionality using a cold disk-level copy of the master that's started up as a replica with slots intact, but the intended use of the functionality is with logical decoding on a standby. Note that an earlier version of logical decoding timeline following was committed to 9.6 as 24c5f1a103ce, 3a3b309041b0, 82c83b337202, and f07d18b6e94d. It was then reverted by c1543a81a7a8 just after 9.6 feature freeze when issues were discovered too late to safely fix them in the 9.6 release cycle. The prior approach failed to consider that a record could be split across pages that are on different segments, where the new segment contains the start of a new timeline. In that case the old segment might be missing or renamed with a .partial suffix. This patch reworks the logic to be page-based and in the process simplify how the last timeline for a segment is looked up. --- src/backend/access/transam/xlogutils.c | 213 +++++++++++++++++++-- src/backend/replication/logical/logicalfuncs.c | 8 +- src/backend/replication/walsender.c | 11 +- src/include/access/xlogreader.h | 16 ++ src/include/access/xlogutils.h | 3 + src/test/recovery/Makefile | 2 + .../recovery/t/009_logical_decoding_timelines.pl | 130 +++++++++++++ 7 files changed, 364 insertions(+), 19 deletions(-) create mode 100644 src/test/recovery/t/009_logical_decoding_timelines.pl diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index b2b9fcb..28c07d3 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -19,6 +19,7 @@ #include <unistd.h> +#include "access/timeline.h" #include "access/xlog.h" #include "access/xlog_internal.h" #include "access/xlogutils.h" @@ -662,6 +663,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) /* state maintained across calls */ static int sendFile = -1; static XLogSegNo sendSegNo = 0; + static TimeLineID sendTLI = 0; static uint32 sendOff = 0; p = buf; @@ -677,7 +679,8 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) startoff = recptr % XLogSegSize; /* Do we need to switch to a different xlog segment? */ - if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo)) + if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo) || + sendTLI != tli) { char path[MAXPGPATH]; @@ -704,6 +707,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) path))); } sendOff = 0; + sendTLI = tli; } /* Need to seek in the file? */ @@ -754,6 +758,133 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) } /* + * Determine which timeline to read an xlog page from and set the + * XLogReaderState's currTLI to that timeline ID. + * + * We care about timelines in xlogreader when we might be reading xlog + * generated prior to a promotion, either if we're currently a standby in + * recovery or if we're a promoted master reading xlogs generated by the old + * master before our promotion. + * + * wantPage must be set to the start address of the page to read and + * wantLength to the amount of the page that will be read, up to + * XLOG_BLCKSZ. If the amount to be read isn't known, pass XLOG_BLCKSZ. + * + * We switch to an xlog segment from the new timeline eagerly when on a + * historical timeline, as soon as we reach the start of the xlog segment + * containing the timeline switch. The server copied the segment to the new + * timeline so all the data up to the switch point is the same, but there's no + * guarantee the old segment will still exist. It may have been deleted or + * renamed with a .partial suffix so we can't necessarily keep reading from + * the old TLI even though tliSwitchPoint says it's OK. + * + * We can't just check the timeline when we read a page on a different segment + * to the last page. We could've received a timeline switch from a cascading + * upstream, so the current segment ends apruptly (possibly getting renamed to + * .partial) and we have to switch to a new one. Even in the middle of reading + * a page we could have to dump the cached page and switch to a new TLI. + * + * Because of this, callers MAY NOT assume that currTLI is the timeline that + * will be in a page's xlp_tli; the page may begin on an older timeline or we + * might be reading from historical timeline data on a segment that's been + * copied to a new timeline. + * + * The caller must also make sure it doesn't read past the current replay + * position (using GetWalRcvWriteRecPtr) if executing in recovery, so it + * doesn't fail to notice that the current timeline became historical. The + * caller must also update ThisTimeLineID with the result of + * GetWalRcvWriteRecPtr and must check RecoveryInProgress(). + */ +void +XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength) +{ + const XLogRecPtr lastReadPage = state->readSegNo * XLogSegSize + state->readOff; + + Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0); + Assert(wantLength <= XLOG_BLCKSZ); + Assert(state->readLen == 0 || state->readLen <= XLOG_BLCKSZ); + + /* + * If the desired page is currently read in and valid, we have nothing to do. + * + * The caller should've ensured that it didn't previously advance readOff + * past the valid limit of this timeline, so it doesn't matter if the current + * TLI has since become historical. + */ + if (lastReadPage == wantPage && + state->readLen != 0 && + lastReadPage + state->readLen >= wantPage + Min(wantLength,XLOG_BLCKSZ-1)) + return; + + /* + * If we're reading from the current timeline, it hasn't become historical + * and the page we're reading is after the last page read, we can again + * just carry on. (Seeking backwards requires a check to make sure the older + * page isn't on a prior timeline). + * + * ThisTimeLineID might've become historical since we last looked, but the + * caller is required not to read past the flush limit it saw at the time + * it looked up the timeline. There's nothing we can do about it if + * StartupXLOG() renames it to .partial concurrently. + */ + if (state->currTLI == ThisTimeLineID && wantPage >= lastReadPage) + { + Assert(state->currTLIValidUntil == InvalidXLogRecPtr); + return; + } + + /* + * If we're just reading pages from a previously validated historical + * timeline and the timeline we're reading from is valid until the + * end of the current segment we can just keep reading. + */ + if (state->currTLIValidUntil != InvalidXLogRecPtr && + state->currTLI != ThisTimeLineID && + state->currTLI != 0 && + (wantPage + wantLength) / XLogSegSize < state->currTLIValidUntil / XLogSegSize) + return; + + /* + * If we reach this point we're either looking up a page for random access, + * the current timeline just became historical, or we're reading from a new + * segment containing a timeline switch. In all cases we need to determine + * the newest timeline on the segment. + * + * If it's the current timeline we can just keep reading from here unless + * we detect a timeline switch that makes the current timeline historical. + * If it's a historical timeline we can read all the segment on the newest + * timeline because it contains all the old timelines' data too. So only + * one switch check is required. + */ + { + /* + * We need to re-read the timeline history in case it's been changed + * by a promotion or replay from a cascaded replica. + */ + List *timelineHistory = readTimeLineHistory(ThisTimeLineID); + + XLogRecPtr endOfSegment = (((wantPage / XLogSegSize) + 1) * XLogSegSize) - 1; + + Assert(wantPage / XLogSegSize == endOfSegment / XLogSegSize); + + /* Find the timeline of the last LSN on the segment containing wantPage. */ + state->currTLI = tliOfPointInHistory(endOfSegment, timelineHistory); + state->currTLIValidUntil = tliSwitchPoint(state->currTLI, timelineHistory, + &state->nextTLI); + + Assert(state->currTLIValidUntil == InvalidXLogRecPtr || + wantPage + wantLength < state->currTLIValidUntil); + + list_free_deep(timelineHistory); + + elog(DEBUG3, "switched to timeline %u valid until %X/%X", + state->currTLI, + (uint32)(state->currTLIValidUntil >> 32), + (uint32)(state->currTLIValidUntil)); + } +} + +/* * read_page callback for reading local xlog files * * Public because it would likely be very helpful for someone writing another @@ -774,28 +905,84 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int count; loc = targetPagePtr + reqLen; + + /* Loop waiting for xlog to be available if necessary */ while (1) { /* - * TODO: we're going to have to do something more intelligent about - * timelines on standbys. Use readTimeLineHistory() and - * tliOfPointInHistory() to get the proper LSN? For now we'll catch - * that case earlier, but the code and TODO is left in here for when - * that changes. + * Determine the limit of xlog we can currently read to, and what the + * most recent timeline is. + * + * RecoveryInProgress() will update ThisTimeLineID when it first + * notices recovery finishes, so we only have to maintain it for the + * local process until recovery ends. */ if (!RecoveryInProgress()) - { - *pageTLI = ThisTimeLineID; read_upto = GetFlushRecPtr(); + else + read_upto = GetXLogReplayRecPtr(&ThisTimeLineID); + + *pageTLI = ThisTimeLineID; + + /* + * Check which timeline to get the record from. + * + * We have to do it each time through the loop because if we're in + * recovery as a cascading standby, the current timeline might've + * become historical. We can't rely on RecoveryInProgress() because + * in a standby configuration like + * + * A => B => C + * + * if we're a logical decoding session on C, and B gets promoted, our + * timeline will change while we remain in recovery. + * + * We can't just keep reading from the old timeline as the last WAL + * archive in the timeline will get renamed to .partial by StartupXLOG(). + * + * If that happens after our caller updated ThisTimeLineID but before + * we actually read the xlog page, we might still try to read from the + * old (now renamed) segment and fail. There's not much we can do about + * this, but it can only happen when we're a leaf of a cascading + * standby whose master gets promoted while we're decoding, so a + * one-off ERROR isn't too bad. + */ + XLogReadDetermineTimeline(state, targetPagePtr, reqLen); + + if (state->currTLI == ThisTimeLineID) + { + + if (loc <= read_upto) + break; + + CHECK_FOR_INTERRUPTS(); + pg_usleep(1000L); } else - read_upto = GetXLogReplayRecPtr(pageTLI); + { + /* + * We're on a historical timeline, so limit reading to the switch + * point where we moved to the next timeline. + * + * We don't need to GetFlushRecPtr or GetXLogReplayRecPtr. We know + * about the new timeline, so we must've received past the end of + * it. + */ + read_upto = state->currTLIValidUntil; - if (loc <= read_upto) + /* + * Setting pageTLI to our wanted record's TLI is slightly wrong; + * the page might begin on an older timeline if it contains a + * timeline switch, since its xlog segment will have been copied + * from the prior timeline. This is pretty harmless though, as + * nothing cares so long as the timeline doesn't go backwards. We + * should read the page header instead; FIXME someday. + */ + *pageTLI = state->currTLI; + + /* No need to wait on a historical timeline */ break; - - CHECK_FOR_INTERRUPTS(); - pg_usleep(1000L); + } } if (targetPagePtr + XLOG_BLCKSZ <= read_upto) diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 41c5000..c251b92 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -235,11 +235,14 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin rsinfo->setResult = p->tupstore; rsinfo->setDesc = p->tupdesc; - /* compute the current end-of-wal */ + /* + * Compute the current end-of-wal and maintain ThisTimeLineID. + * RecoveryInProgress() will update ThisTimeLineID on promotion. + */ if (!RecoveryInProgress()) end_of_wal = GetFlushRecPtr(); else - end_of_wal = GetXLogReplayRecPtr(NULL); + end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID); ReplicationSlotAcquire(NameStr(*name)); @@ -280,6 +283,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin /* invalidate non-timetravel entries */ InvalidateSystemCaches(); + /* Decode until we run out of records */ while ((startptr != InvalidXLogRecPtr && startptr < end_of_wal) || (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < end_of_wal)) { diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 0f6b828..90eb991 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -48,6 +48,7 @@ #include "access/transam.h" #include "access/xact.h" #include "access/xlog_internal.h" +#include "access/xlogutils.h" #include "catalog/pg_type.h" #include "commands/dbcommands.h" @@ -721,6 +722,12 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req XLogRecPtr flushptr; int count; + XLogReadDetermineTimeline(state, targetPagePtr, reqLen); + sendTimeLineIsHistoric = (state->currTLI == ThisTimeLineID); + sendTimeLine = state->currTLI; + sendTimeLineValidUpto = state->currTLIValidUntil; + sendTimeLineNextTLI = state->nextTLI; + /* make sure we have enough WAL available */ flushptr = WalSndWaitForWal(targetPagePtr + reqLen); @@ -974,10 +981,6 @@ StartLogicalReplication(StartReplicationCmd *cmd) pq_endmessage(&buf); pq_flush(); - /* setup state for XLogReadPage */ - sendTimeLineIsHistoric = false; - sendTimeLine = ThisTimeLineID; - /* * Initialize position to the last ack'ed one, then the xlog records begin * to be shipped from that position. diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index 663d3e7..a1beeb5 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -161,6 +161,22 @@ struct XLogReaderState /* beginning of the WAL record being read. */ XLogRecPtr currRecPtr; + /* timeline to read it from, 0 if a lookup is required */ + TimeLineID currTLI; + /* + * Safe point to read to in currTLI if current TLI is historical + * (tliSwitchPoint) or InvalidXLogRecPtr if on current timeline. + * + * Actually set to the start of the segment containing the timeline + * switch that ends currTLI's validity, not the LSN of the switch + * its self, since we can't assume the old segment will be present. + */ + XLogRecPtr currTLIValidUntil; + /* + * If currTLI is not the most recent known timeline, the next timeline to + * read from when currTLIValidUntil is reached. + */ + TimeLineID nextTLI; /* Buffer for current ReadRecord result (expandable) */ char *readRecordBuf; diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h index 567a7f3..25a9942 100644 --- a/src/include/access/xlogutils.h +++ b/src/include/access/xlogutils.h @@ -52,4 +52,7 @@ extern int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI); +extern void XLogReadDetermineTimeline(XLogReaderState *state, + XLogRecPtr wantPage, uint32 wantLength); + #endif diff --git a/src/test/recovery/Makefile b/src/test/recovery/Makefile index 9d03d33..142a1b8 100644 --- a/src/test/recovery/Makefile +++ b/src/test/recovery/Makefile @@ -9,6 +9,8 @@ # #------------------------------------------------------------------------- +EXTRA_INSTALL=contrib/test_decoding + subdir = src/test/recovery top_builddir = ../../.. include $(top_builddir)/src/Makefile.global diff --git a/src/test/recovery/t/009_logical_decoding_timelines.pl b/src/test/recovery/t/009_logical_decoding_timelines.pl new file mode 100644 index 0000000..09830dc --- /dev/null +++ b/src/test/recovery/t/009_logical_decoding_timelines.pl @@ -0,0 +1,130 @@ +# Demonstrate that logical can follow timeline switches. +# +# Logical replication slots can follow timeline switches but it's +# normally not possible to have a logical slot on a replica where +# promotion and a timeline switch can occur. The only ways +# we can create that circumstance are: +# +# * By doing a filesystem-level copy of the DB, since pg_basebackup +# excludes pg_replslot but we can copy it directly; or +# +# * by creating a slot directly at the C level on the replica and +# advancing it as we go using the low level APIs. It can't be done +# from SQL since logical decoding isn't allowed on replicas. +# +# This module uses the first approach to show that timeline following +# on a logical slot works. +# +use strict; +use warnings; + +use PostgresNode; +use TestLib; +use Test::More tests => 7; +use RecursiveCopy; +use File::Copy; +use IPC::Run (); +use Scalar::Util qw(blessed); + +my ($stdout, $stderr, $ret); + +# Initialize master node +my $node_master = get_new_node('master'); +$node_master->init(allows_streaming => 1, has_archiving => 1); +$node_master->append_conf('postgresql.conf', "wal_level = 'logical'\n"); +$node_master->append_conf('postgresql.conf', "max_replication_slots = 2\n"); +$node_master->append_conf('postgresql.conf', "max_wal_senders = 2\n"); +$node_master->append_conf('postgresql.conf', "log_min_messages = 'debug2'\n"); +$node_master->dump_info; +$node_master->start; + +diag "Testing logical timeline following with a filesystem-level copy"; + +$node_master->safe_psql('postgres', +"SELECT pg_create_logical_replication_slot('before_basebackup', 'test_decoding');" +); +$node_master->safe_psql('postgres', "CREATE TABLE decoding(blah text);"); +$node_master->safe_psql('postgres', + "INSERT INTO decoding(blah) VALUES ('beforebb');"); +$node_master->safe_psql('postgres', 'CHECKPOINT;'); + +my $backup_name = 'b1'; +$node_master->backup_fs_hot($backup_name); + +my $node_replica = get_new_node('replica'); +$node_replica->init_from_backup( + $node_master, $backup_name, + has_streaming => 1, + has_restoring => 1); +$node_replica->start; + +$node_master->safe_psql('postgres', +"SELECT pg_create_logical_replication_slot('after_basebackup', 'test_decoding');" +); +$node_master->safe_psql('postgres', + "INSERT INTO decoding(blah) VALUES ('afterbb');"); +$node_master->safe_psql('postgres', 'CHECKPOINT;'); + +# Verify that only the before base_backup slot is on the replica +$stdout = $node_replica->safe_psql('postgres', + 'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name'); +is($stdout, 'before_basebackup', + 'Expected to find only slot before_basebackup on replica'); + +# Boom, crash +$node_master->stop('immediate'); + +$node_replica->promote; +$node_replica->poll_query_until('postgres', + "SELECT NOT pg_is_in_recovery();"); + +$node_replica->safe_psql('postgres', + "INSERT INTO decoding(blah) VALUES ('after failover');"); + +# Shouldn't be able to read from slot created after base backup +($ret, $stdout, $stderr) = $node_replica->psql('postgres', +"SELECT data FROM pg_logical_slot_peek_changes('after_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');" +); +is($ret, 3, 'replaying from after_basebackup slot fails'); +like( + $stderr, + qr/replication slot "after_basebackup" does not exist/, + 'after_basebackup slot missing'); + +# Should be able to read from slot created before base backup +($ret, $stdout, $stderr) = $node_replica->psql( + 'postgres', +"SELECT data FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');", + timeout => 30); +is($ret, 0, 'replay from slot before_basebackup succeeds'); + +my $final_expected_output_bb = q(BEGIN +table public.decoding: INSERT: blah[text]:'beforebb' +COMMIT +BEGIN +table public.decoding: INSERT: blah[text]:'afterbb' +COMMIT +BEGIN +table public.decoding: INSERT: blah[text]:'after failover' +COMMIT); +is($stdout, $final_expected_output_bb, 'decoded expected data from slot before_basebackup'); +is($stderr, '', 'replay from slot before_basebackup produces no stderr'); + +# So far we've peeked the slots, so when we fetch the same info over +# pg_recvlogical we should get complete results. First, find out the commit lsn +# of the last transaction. There's no max(pg_lsn), so: + +my $endpos = $node_replica->safe_psql('postgres', "SELECT location FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL) ORDER BY location DESC LIMIT 1;"); + +# now use the walsender protocol to peek the slot changes and make sure we see +# the same results. + +$stdout = $node_replica->pg_recvlogical_upto('postgres', 'before_basebackup', + $endpos, 30, 'include-xids' => '0', 'skip-empty-xacts' => '1'); + +# walsender likes to add a newline +chomp($stdout); +is($stdout, $final_expected_output_bb, 'got same output from walsender via pg_recvlogical on before_basebackup'); + +# We don't need the standby anymore +$node_replica->teardown_node(); -- 2.5.5
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers