Hello. At Tue, 24 Jul 2018 16:47:41 +0900, Masahiko Sawada <sawada.m...@gmail.com> wrote in <CAD21AoD0rChq7wQE=_o95quopcqgjcvg9omwdh07nt5cm81...@mail.gmail.com> > On Mon, Jul 23, 2018 at 4:16 PM, Kyotaro HORIGUCHI > <horiguchi.kyot...@lab.ntt.co.jp> wrote: > > Hello. > > > > At Fri, 20 Jul 2018 10:13:58 +0900, Masahiko Sawada <sawada.m...@gmail.com> > > wrote in > > <CAD21AoDayePWwu4t=vpp5p1qfdsbvks1d8j76bxp5rbxopb...@mail.gmail.com> .. > > Instead, I made the field be shown in flat "bytes" using bigint, > > which can be nicely shown using pg_size_pretty; > > Thank you for updating. I agree showing the remain in bytes. > > Here is review comments for v6 patch. > > @@ -967,9 +969,9 @@ postgres=# SELECT * FROM > pg_create_physical_replication_slot('node_a_slot'); > node_a_slot | > > postgres=# SELECT * FROM pg_replication_slots; > - slot_name | slot_type | datoid | database | active | xmin | > restart_lsn | confirmed_flush_lsn > --------------+-----------+--------+----------+--------+------+-------------+--------------------- > - node_a_slot | physical | | | f | | | > + slot_name | slot_type | datoid | database | active | xmin | > restart_lsn | confirmed_flush_lsn | wal_status | min_keep_lsn > +-------------+-----------+--------+----------+--------+------+-------------+---------------------+------------+-------------- > + node_a_slot | physical | | | f | | > | | unknown | 0/1000000 > > This funk should be updated.
Perhaps you need a fresh database cluster. > ----- > +/* > + * Returns minimum segment number the next checktpoint must leave considering > + * wal_keep_segments, replication slots and max_slot_wal_keep_size. > + * > + * If resetBytes is not NULL, returns remaining LSN bytes to advance until > any > + * slot loses reserving a WAL record. > + */ > +static XLogSegNo > +GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN, > XLogRecPtr restartLSN, uint64 *restBytes) > +{ > > You're assuming that the minSlotLSN is the minimum LSN of replication > slots but it's not mentioned anywhere. Since you check minSlotSeg <= I added description for parameters in the function comment. > currSeg but not force it, if a caller sets a wrong value to minSlotLSN > this function will return a wrong value with no complaints. Similarly I don't think such case can happen on a sane system. Even that happenes it behaves in the same way as minSlotLSN being invalid in the case. KeepLogSeg() also behaves in the same way and the wal recycling will be performed as pg_replication_losts predicted. Nothing can improve the behavior and I don't think placing assertion there is overkill. > there is not explanation about the resetartLSN, so you can add it. I'm > not sure the augment name restartLSN is suitable for the function in > xlog.c but I'd defer it to committers. Done. > Since this function assumes that both restartLSN and *restBytes are > valid or invalid (and NULL) it's better to add assertions for safety. > The current code accepts even the case where only either argment is > valid. > ----- > + if (limitSegs > 0 && currSeg <= restartSeg + limitSegs) > + { Even if the caller gives InvalidRecPtr as restartLSN, which is an insane situation, the function just treats the value as zero and reuturns the "correct" value for the restartLSN, which doesn't harm anything. > + /* > + * This slot still has all required segments. > Calculate how many > + * LSN bytes the slot has until it loses restart_lsn. > + */ > + fragbytes = wal_segment_size - (currLSN % > wal_segment_size); > + *restBytes = > + (restartSeg + limitSegs - currSeg) * > wal_segment_size > + + fragbytes; > + } > + } > > This code doesn't consider the case where wal_keep_segments > > max_slot_keep_size. In the case I think we should use (currSeg - > wal_keep_segments) as the lower bound in order to avoid showing > "streaming" in the wal_status although the remain is 0. Thanks. It should use keepSegs instead of limitSegs. Fixed. > ----- > + *restBytes = > + (restartSeg + limitSegs - currSeg) * > wal_segment_size > + + fragbytes; > > Maybe you can use XLogSegNoOffsetToRecPtr instead. Indeed. I'm not sure it is easier to read, though. (Maybe the functions should use wal_segment_size out-of-band. (That is, not passed as a parameter)). > ----- > + * 0 means that WAL record at targetLSN is alredy removed. > + * 1 means that WAL record at tagetLSN is availble. > + * 2 means that WAL record at tagetLSN is availble but about to be removed by > > s/alredy/already/ > s/tagetLSN/targetLSN/ > s/availble/available/ > ----- > + * If resetBytes is not NULL, returns remaining LSN bytes to advance until > any > + * slot loses reserving a WAL record. > > s/resetBytes/restBytes/ Ugggh! Sorry that my fingers are extra-fat.. Fixed. I rechecked through the whole patch and found one more typo. > ----- > + Specify the maximum size of WAL files > + that <link linkend="streaming-replication-slots">replication > + slots</link> are allowed to reatin in the <filename>pg_wal</filename> > > s/reatin/retain/ Thank you. I also found other leftovers in catalogs.sgml and high-availability.sgml. # The latter file seems needing amendment for v11. regards. -- Kyotaro Horiguchi NTT Open Source Software Center
>From 83284a5e3f23ad45492ce54421c6af9c86e1d598 Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp> Date: Thu, 11 Jan 2018 15:00:32 +0900 Subject: [PATCH 4/4] Documentation for slot-limit feature --- doc/src/sgml/catalogs.sgml | 28 ++++++++++++++++++++++++++++ doc/src/sgml/config.sgml | 22 ++++++++++++++++++++++ doc/src/sgml/high-availability.sgml | 14 ++++++++------ 3 files changed, 58 insertions(+), 6 deletions(-) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index fffb79f713..5acee4d0e8 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -9886,6 +9886,34 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx </entry> </row> + <row> + <entry><structfield>wal_status</structfield></entry> + <entry><type>text</type></entry> + <entry></entry> + + <entry>Availability of WAL records claimed by the + slot. <literal>streaming</literal>, <literal>keeping</literal>, + <literal>lost</literal> + or <literal>unknown</literal>. <literal>streaming</literal> means that + the claimed records are available. <literal>keeping</literal> means that + some of them are to be removed by the next checkpoint. + <literal>lost</literal> means that some of them have been removed. The + last two states are seen only when + <xref linkend="guc-max-slot-wal-keep-size"/> is not zero. If the slot + doesn't have valid restart_lsn, this field + is <literal>unknown</literal>. + </entry> + </row> + + <row> + <entry><structfield>remain</structfield></entry> + <entry><type>bigint</type></entry> + <entry></entry> + <entry>The amount in bytes that WAL location (LSN) can advance until the + slot may lose required WAL records. + </entry> + </row> + </tbody> </tgroup> </table> diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index bee4afbe4e..9d190c3daa 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -3116,6 +3116,28 @@ include_dir 'conf.d' </listitem> </varlistentry> + <varlistentry id="guc-max-slot-wal-keep-size" xreflabel="max_slot_wal_keep_size"> + <term><varname>max_slot_wal_keep_size</varname> (<type>integer</type>) + <indexterm> + <primary><varname>max_slot_wal_keep_size</varname> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + Specify the maximum size of WAL files + that <link linkend="streaming-replication-slots">replication + slots</link> are allowed to retain in the <filename>pg_wal</filename> + directory at checkpoint time. + If <varname>max_slot_wal_keep_size</varname> is zero (the default), + replication slots retain unlimited size of WAL files. + </para> + <para> + This parameter is used being rounded down to the multiples of WAL file + size. + </para> + </listitem> + </varlistentry> + <varlistentry id="guc-wal-sender-timeout" xreflabel="wal_sender_timeout"> <term><varname>wal_sender_timeout</varname> (<type>integer</type>) <indexterm> diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml index 934eb9052d..39068b8f82 100644 --- a/doc/src/sgml/high-availability.sgml +++ b/doc/src/sgml/high-availability.sgml @@ -927,9 +927,11 @@ primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass' <xref linkend="guc-archive-command"/>. However, these methods often result in retaining more WAL segments than required, whereas replication slots retain only the number of segments - known to be needed. An advantage of these methods is that they bound - the space requirement for <literal>pg_wal</literal>; there is currently no way - to do this using replication slots. + known to be needed. On the other hand, replication slots can retain so + many WAL segments that they fill up the space allotted + for <literal>pg_wal</literal>; + <xref linkend="guc-max-slot-wal-keep-size"/> limits the size of WAL files + retained by replication slots. </para> <para> Similarly, <xref linkend="guc-hot-standby-feedback"/> @@ -967,9 +969,9 @@ postgres=# SELECT * FROM pg_create_physical_replication_slot('node_a_slot'); node_a_slot | postgres=# SELECT * FROM pg_replication_slots; - slot_name | slot_type | datoid | database | active | xmin | restart_lsn | confirmed_flush_lsn --------------+-----------+--------+----------+--------+------+-------------+--------------------- - node_a_slot | physical | | | f | | | + slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | remain +-------------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+-------- + node_a_slot | | physical | | | f | f | | | | unknown | | unknown | 0 (1 row) </programlisting> To configure the standby to use this slot, <varname>primary_slot_name</varname> -- 2.16.3
>From 3e589f127df280f56bc8382426f9f3b1f16f21f1 Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp> Date: Thu, 21 Dec 2017 17:33:53 +0900 Subject: [PATCH 3/4] TAP test for the slot limit feature --- src/test/recovery/t/016_replslot_limit.pl | 161 ++++++++++++++++++++++++++++++ 1 file changed, 161 insertions(+) create mode 100644 src/test/recovery/t/016_replslot_limit.pl diff --git a/src/test/recovery/t/016_replslot_limit.pl b/src/test/recovery/t/016_replslot_limit.pl new file mode 100644 index 0000000000..15820e049e --- /dev/null +++ b/src/test/recovery/t/016_replslot_limit.pl @@ -0,0 +1,161 @@ +# Test for replication slot limit +# Ensure that max_slot_wal_keep_size limits the number of WAL files to +# be kept by replication slot. + +use strict; +use warnings; +use File::Path qw(rmtree); +use PostgresNode; +use TestLib; +use Test::More tests => 7; +use Time::HiRes qw(usleep); + +$ENV{PGDATABASE} = 'postgres'; + +# Initialize master node +my $node_master = get_new_node('master'); +$node_master->init(allows_streaming => 1); +$node_master->append_conf('postgresql.conf', qq( +min_wal_size = 32MB +max_wal_size = 48MB +)); +$node_master->start; +$node_master->safe_psql('postgres', "SELECT pg_create_physical_replication_slot('rep1')"); + + +# Take backup +my $backup_name = 'my_backup'; +$node_master->backup($backup_name); + +# Create a standby linking to it using a replication slot +my $node_standby = get_new_node('standby_1'); +$node_standby->init_from_backup($node_master, $backup_name, has_streaming => 1, primary_slot_name => 'rep1'); +$node_standby->append_conf('recovery.conf', qq( +primary_slot_name = 'rep1' +)); +$node_standby->start; + +# Wait until standby has replayed enough data on the standby +my $start_lsn = $node_master->lsn('write'); +$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn); + +# Stop standby +$node_standby->stop; + + +# Preparation done, currently the slot must be secured. +my $result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status FROM pg_replication_slots WHERE slot_name = 'rep1'"); +is($result, "$start_lsn|streaming", 'check initial state of standby'); + +# Advance WAL by ten segments (= 160MB) on master +advance_wal($node_master, 10); + +# All segments still must be secured after a checkpoint. +$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status FROM pg_replication_slots WHERE slot_name = 'rep1'"); +is($result, "$start_lsn|streaming", 'check that slot is keeping all segments'); + +# The stanby can connect master +$node_standby->start; + +$start_lsn = $node_master->lsn('write'); +$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn); + +$node_standby->stop; + + +# Advance WAL again +advance_wal($node_master, 10); + +# Set max_slot_wal_keep_size on master +my $max_slot_wal_keep_size_mb = 32; +$node_master->append_conf('postgresql.conf', qq( +max_slot_wal_keep_size = ${max_slot_wal_keep_size_mb}MB +)); +$node_master->reload; + +# Some segments become 'keeping' +$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status FROM pg_replication_slots WHERE slot_name = 'rep1'"); +is($result, "$start_lsn|keeping", 'check that some segments are about to removed'); + +# The stanby still can connect master +$node_standby->start; + +$start_lsn = $node_master->lsn('write'); +$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn); + +$node_standby->stop; + +ok(!find_in_log($node_standby, + "requested WAL segment [0-9A-F]+ has already been removed"), + 'check that no replication failure is caused by insecure state'); + +# Advance WAL again +my $logstart = get_log_size($node_master); +advance_wal($node_master, 10); + +# WARNING should be issued +ok(find_in_log($node_master, + "some replication slots have lost required WAL segments", + $logstart), + 'check that the warning is correctly logged'); + +# This slot should be broken +$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status FROM pg_replication_slots WHERE slot_name = 'rep1'"); +is($result, "$start_lsn|lost", 'check that overflown segments have been removed'); + +# The stanby no longer can connect to the master +$logstart = get_log_size($node_standby); +$node_standby->start; + +my $failed = 0; +for (my $i = 0 ; $i < 10000 ; $i++) +{ + if (find_in_log($node_standby, + "requested WAL segment [0-9A-F]+ has already been removed", + $logstart)) + { + $failed = 1; + last; + } + usleep(100_000); +} +ok($failed, 'check replication has been broken'); + +$node_standby->stop; + +##################################### +# Advance WAL of $node by $n segments +sub advance_wal +{ + my ($node, $n) = @_; + + # Advance by $n segments (= (16 * $n) MB) on master + for (my $i = 0 ; $i < $n ; $i++) + { + $node->safe_psql('postgres', "CREATE TABLE t (a int); DROP TABLE t; SELECT pg_switch_wal();"); + } + + $node->safe_psql('postgres', "CHECKPOINT;"); +} + +# return the size of logfile of $node in bytes +sub get_log_size +{ + my ($node) = @_; + + return (stat $node->logfile)[7]; +} + +# find $pat in logfile of $node after $off-th byte +sub find_in_log +{ + my ($node, $pat, $off) = @_; + + $off = 0 unless defined $off; + my $log = TestLib::slurp_file($node->logfile); + return 0 if (length($log) <= $off); + + $log = substr($log, $off); + + return $log =~ m/$pat/; +} -- 2.16.3
>From 712dda58cf51abc5ea64110d566c323e807aa8dd Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp> Date: Thu, 21 Dec 2017 21:23:25 +0900 Subject: [PATCH 2/4] Add monitoring aid for max_slot_wal_keep_size. Adds two columns "status" and "remain" in pg_replication_slot. Setting max_slot_wal_keep_size, long-disconnected slots may lose sync. The two columns shows whether the slot can be reconnected or not, or about to lose reserving WAL segments, and the remaining bytes of WAL that can be written until the slot loses reserving WAL records. --- contrib/test_decoding/expected/ddl.out | 4 +- src/backend/access/transam/xlog.c | 130 ++++++++++++++++++++++++++++++++- src/backend/catalog/system_views.sql | 4 +- src/backend/replication/slotfuncs.c | 32 +++++++- src/include/access/xlog.h | 1 + src/include/catalog/pg_proc.dat | 6 +- src/test/regress/expected/rules.out | 6 +- 7 files changed, 171 insertions(+), 12 deletions(-) diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out index b7c76469fc..6b6a2df213 100644 --- a/contrib/test_decoding/expected/ddl.out +++ b/contrib/test_decoding/expected/ddl.out @@ -706,7 +706,7 @@ SELECT pg_drop_replication_slot('regression_slot'); /* check that the slot is gone */ SELECT * FROM pg_replication_slots; - slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn ------------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+--------------------- + slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | remain +-----------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+-------- (0 rows) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 982eedad32..c9f28fd890 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -868,7 +868,7 @@ static void checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI, static void LocalSetXLogInsertAllowed(void); static void CreateEndOfRecoveryRecord(void); static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags); -static XLogSegNo GetOldestKeepSegment(XLogRecPtr currpos, XLogRecPtr minSlotPtr); +static XLogSegNo GetOldestKeepSegment(XLogRecPtr currpos, XLogRecPtr minSlotPtr, XLogRecPtr restartLSN, uint64 *restBytes); static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo); static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void); @@ -9490,15 +9490,114 @@ CreateRestartPoint(int flags) return true; } + +/* + * Returns the segment number of the oldest file in XLOG directory. + */ +static XLogSegNo +GetOldestXLogFileSegNo(void) +{ + DIR *xldir; + struct dirent *xlde; + XLogSegNo segno = 0; + + xldir = AllocateDir(XLOGDIR); + while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL) + { + TimeLineID tli; + XLogSegNo fsegno; + + /* Ignore files that are not XLOG segments */ + if (!IsXLogFileName(xlde->d_name) && + !IsPartialXLogFileName(xlde->d_name)) + continue; + + XLogFromFileName(xlde->d_name, &tli, &fsegno, wal_segment_size); + + /* get minimum segment ignoring timeline ID */ + if (segno == 0 || fsegno < segno) + segno = fsegno; + } + + FreeDir(xldir); + + return segno; +} + +/* + * Check if the record on the given targetLSN is present in XLOG files. + * + * Returns three kind of values. + * 0 means that WAL record at targetLSN is already removed. + * 1 means that WAL record at targetLSN is available. + * 2 means that WAL record at targetLSN is available but about to be removed by + * the next checkpoint. + */ +int +IsLsnStillAvaiable(XLogRecPtr targetLSN, uint64 *restBytes) +{ + XLogRecPtr currpos; + XLogRecPtr slotPtr; + XLogSegNo targetSeg; + XLogSegNo tailSeg; + XLogSegNo oldestSeg; + + Assert(!XLogRecPtrIsInvalid(targetLSN)); + Assert(restBytes); + + currpos = GetXLogWriteRecPtr(); + + SpinLockAcquire(&XLogCtl->info_lck); + oldestSeg = XLogCtl->lastRemovedSegNo; + SpinLockRelease(&XLogCtl->info_lck); + + /* + * oldestSeg is zero before at least one segment has been removed since + * startup. Use oldest segno taken from file names. + */ + if (oldestSeg == 0) + { + static XLogSegNo oldestFileSeg = 0; + + if (oldestFileSeg == 0) + oldestFileSeg = GetOldestXLogFileSegNo(); + /* let it have the same meaning with lastRemovedSegNo here */ + oldestSeg = oldestFileSeg - 1; + } + + /* oldest segment is just after the last removed segment */ + oldestSeg++; + + XLByteToSeg(targetLSN, targetSeg, wal_segment_size); + + slotPtr = XLogGetReplicationSlotMinimumLSN(); + tailSeg = GetOldestKeepSegment(currpos, slotPtr, targetLSN, restBytes); + + /* targetSeg is being reserved by slots */ + if (tailSeg <= targetSeg) + return 1; + + /* targetSeg is not reserved but still available */ + if (oldestSeg <= targetSeg) + return 2; + + /* targetSeg has gone */ + return 0; +} + /* * Returns minimum segment number the next checkpoint must leave considering * wal_keep_segments, replication slots and max_slot_wal_keep_size. * * currLSN is the current insert location * minSlotLSN is the minimum restart_lsn of all active slots + * restartLSN is restart_lsn of a slot. + + * If restBytes is not NULL, returns remaining LSN bytes to advance until the + * segment which contains restart_LSN is will be removed. */ static XLogSegNo -GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN) +GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN, XLogRecPtr restartLSN, uint64 *restBytes) { uint64 keepSegs = 0; XLogSegNo currSeg; @@ -9530,6 +9629,30 @@ GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN) if (wal_keep_segments > 0 && keepSegs < wal_keep_segments) keepSegs = wal_keep_segments; + /* + * Return remaining LSN bytes to advance until the slot gives up reserving + * WAL records if requested. + */ + if (restBytes) + { + uint64 fragbytes; + XLogSegNo restartSeg; + + *restBytes = 0; + + XLByteToSeg(restartLSN, restartSeg, wal_segment_size); + if (keepSegs > 0 && currSeg <= restartSeg + keepSegs) + { + /* + * This slot still has all required segments. Calculate how many + * LSN bytes the slot has until it loses restart_lsn. + */ + fragbytes = wal_segment_size - (currLSN % wal_segment_size); + XLogSegNoOffsetToRecPtr(restartSeg + keepSegs - currSeg, fragbytes, + wal_segment_size, *restBytes); + } + } + /* avoid underflow, don't go below 1 */ if (currSeg <= keepSegs) return 1; @@ -9558,7 +9681,8 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo) /* * We should keep certain number of WAL segments after this checkpoint. */ - minSegNo = GetOldestKeepSegment(recptr, slotminptr); + minSegNo = + GetOldestKeepSegment(recptr, slotminptr, InvalidXLogRecPtr, NULL); /* * warn if the checkpoint flushes the segments required by replication diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 7251552419..5db294f64e 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -797,7 +797,9 @@ CREATE VIEW pg_replication_slots AS L.xmin, L.catalog_xmin, L.restart_lsn, - L.confirmed_flush_lsn + L.confirmed_flush_lsn, + L.wal_status, + L.remain FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 8782bad4a2..d9ed9e8cf2 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -185,7 +185,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 11 +#define PG_GET_REPLICATION_SLOTS_COLS 13 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -307,6 +307,36 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) else nulls[i++] = true; + if (restart_lsn == InvalidXLogRecPtr) + { + values[i++] = CStringGetTextDatum("unknown"); + values[i++] = LSNGetDatum(InvalidXLogRecPtr); + } + else + { + uint64 remaining_bytes; + char *status; + + switch (IsLsnStillAvaiable(restart_lsn, &remaining_bytes)) + { + case 0: + status = "lost"; + break; + case 1: + status = "streaming"; + break; + case 2: + status = "keeping"; + break; + default: + status = "unknown"; + break; + } + + values[i++] = CStringGetTextDatum(status); + values[i++] = Int64GetDatum(remaining_bytes); + } + tuplestore_putvalues(tupstore, tupdesc, values, nulls); } LWLockRelease(ReplicationSlotControlLock); diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 12cd0d1d10..ad9d1dec29 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -269,6 +269,7 @@ extern void ShutdownXLOG(int code, Datum arg); extern void InitXLOGAccess(void); extern void CreateCheckPoint(int flags); extern bool CreateRestartPoint(int flags); +extern int IsLsnStillAvaiable(XLogRecPtr targetLSN, uint64 *restBytes); extern void XLogPutNextOid(Oid nextOid); extern XLogRecPtr XLogRestorePoint(const char *rpName); extern void UpdateFullPageWrites(void); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index a14651010f..4a096c9478 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -9796,9 +9796,9 @@ proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', prorettype => 'record', proargtypes => '', - proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}', + proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,remain}', prosrc => 'pg_get_replication_slots' }, { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 744d501e31..dcd5f19644 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1451,8 +1451,10 @@ pg_replication_slots| SELECT l.slot_name, l.xmin, l.catalog_xmin, l.restart_lsn, - l.confirmed_flush_lsn - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn) + l.confirmed_flush_lsn, + l.wal_status, + l.remain + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, remain) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper, -- 2.16.3
>From d29a5c6d53dbb88077f00c8a30d7a1ca0a5a24d5 Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp> Date: Thu, 21 Dec 2017 21:20:20 +0900 Subject: [PATCH 1/4] Add WAL relief vent for replication slots Adds a capability to limit the number of segments kept by replication slots by a GUC variable. --- src/backend/access/transam/xlog.c | 106 ++++++++++++++++++++------ src/backend/utils/misc/guc.c | 12 +++ src/backend/utils/misc/postgresql.conf.sample | 1 + src/include/access/xlog.h | 1 + 4 files changed, 95 insertions(+), 25 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 493f1db7b9..982eedad32 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -105,6 +105,7 @@ int wal_level = WAL_LEVEL_MINIMAL; int CommitDelay = 0; /* precommit delay in microseconds */ int CommitSiblings = 5; /* # concurrent xacts needed to sleep */ int wal_retrieve_retry_interval = 5000; +int max_slot_wal_keep_size_mb = 0; #ifdef WAL_DEBUG bool XLOG_DEBUG = false; @@ -867,6 +868,7 @@ static void checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI, static void LocalSetXLogInsertAllowed(void); static void CreateEndOfRecoveryRecord(void); static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags); +static XLogSegNo GetOldestKeepSegment(XLogRecPtr currpos, XLogRecPtr minSlotPtr); static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo); static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void); @@ -9488,6 +9490,53 @@ CreateRestartPoint(int flags) return true; } +/* + * Returns minimum segment number the next checkpoint must leave considering + * wal_keep_segments, replication slots and max_slot_wal_keep_size. + * + * currLSN is the current insert location + * minSlotLSN is the minimum restart_lsn of all active slots + */ +static XLogSegNo +GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN) +{ + uint64 keepSegs = 0; + XLogSegNo currSeg; + XLogSegNo minSlotSeg; + + XLByteToSeg(currLSN, currSeg, wal_segment_size); + XLByteToSeg(minSlotLSN, minSlotSeg, wal_segment_size); + + /* + * Calculate keep segments by slots first. The second term of the + * condition is just a sanity check. + */ + if (minSlotLSN != InvalidXLogRecPtr && minSlotSeg <= currSeg) + keepSegs = currSeg - minSlotSeg; + + /* Cap keepSegs by max_slot_wal_keep_size */ + if (max_slot_wal_keep_size_mb > 0) + { + uint64 limitSegs; + + limitSegs = ConvertToXSegs(max_slot_wal_keep_size_mb, wal_segment_size); + + /* Apply max_slot_wal_keep_size to keeping segments */ + if (limitSegs < keepSegs) + keepSegs = limitSegs; + } + + /* but, keep at least wal_keep_segments segments if any */ + if (wal_keep_segments > 0 && keepSegs < wal_keep_segments) + keepSegs = wal_keep_segments; + + /* avoid underflow, don't go below 1 */ + if (currSeg <= keepSegs) + return 1; + + return currSeg - keepSegs; +} + /* * Retreat *logSegNo to the last segment that we need to retain because of * either wal_keep_segments or replication slots. @@ -9499,38 +9548,45 @@ CreateRestartPoint(int flags) static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo) { - XLogSegNo segno; - XLogRecPtr keep; + XLogRecPtr slotminptr = InvalidXLogRecPtr; + XLogSegNo minSegNo; + XLogSegNo slotSegNo; - XLByteToSeg(recptr, segno, wal_segment_size); - keep = XLogGetReplicationSlotMinimumLSN(); + if (max_replication_slots > 0) + slotminptr = XLogGetReplicationSlotMinimumLSN(); - /* compute limit for wal_keep_segments first */ - if (wal_keep_segments > 0) + /* + * We should keep certain number of WAL segments after this checkpoint. + */ + minSegNo = GetOldestKeepSegment(recptr, slotminptr); + + /* + * warn if the checkpoint flushes the segments required by replication + * slots. + */ + if (!XLogRecPtrIsInvalid(slotminptr)) { - /* avoid underflow, don't go below 1 */ - if (segno <= wal_keep_segments) - segno = 1; + static XLogSegNo prev_lost_segs = 0; /* avoid duplicate messages */ + + XLByteToSeg(slotminptr, slotSegNo, wal_segment_size); + + if (slotSegNo < minSegNo) + { + XLogSegNo lost_segs = minSegNo - slotSegNo; + if (prev_lost_segs != lost_segs) + ereport(WARNING, + (errmsg ("some replication slots have lost required WAL segments"), + errdetail("The mostly affected slot has lost %ld segments.", + lost_segs))); + prev_lost_segs = lost_segs; + } else - segno = segno - wal_keep_segments; - } - - /* then check whether slots limit removal further */ - if (max_replication_slots > 0 && keep != InvalidXLogRecPtr) - { - XLogSegNo slotSegNo; - - XLByteToSeg(keep, slotSegNo, wal_segment_size); - - if (slotSegNo <= 0) - segno = 1; - else if (slotSegNo < segno) - segno = slotSegNo; + prev_lost_segs = 0; } /* don't delete WAL segments newer than the calculated segment */ - if (segno < *logSegNo) - *logSegNo = segno; + if (minSegNo < *logSegNo) + *logSegNo = minSegNo; } /* diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index c5ba149996..dd65f8f17c 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -2538,6 +2538,18 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"max_slot_wal_keep_size", PGC_SIGHUP, REPLICATION_SENDING, + gettext_noop("Sets the maximum size of extra WALs kept by replication slots."), + NULL, + GUC_UNIT_MB + }, + &max_slot_wal_keep_size_mb, + 0, 0, + MAX_KILOBYTES, /* XXX: This is in megabytes, like max/min_wal_size */ + NULL, NULL, NULL + }, + { {"wal_sender_timeout", PGC_SIGHUP, REPLICATION_SENDING, gettext_noop("Sets the maximum time to wait for WAL replication."), diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index c0d3fb8491..cb5b2bcc89 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -238,6 +238,7 @@ #max_wal_senders = 10 # max number of walsender processes # (change requires restart) #wal_keep_segments = 0 # in logfile segments; 0 disables +#max_slot_wal_keep_size = 0 # measured in bytes; 0 disables #wal_sender_timeout = 60s # in milliseconds; 0 disables #max_replication_slots = 10 # max number of replication slots diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 421ba6d775..12cd0d1d10 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -98,6 +98,7 @@ extern int wal_segment_size; extern int min_wal_size_mb; extern int max_wal_size_mb; extern int wal_keep_segments; +extern int max_slot_wal_keep_size_mb; extern int XLOGbuffers; extern int XLogArchiveTimeout; extern int wal_retrieve_retry_interval; -- 2.16.3