On Wed, 23 Aug 2023 at 14:21, Hayato Kuroda (Fujitsu) <kuroda.hay...@fujitsu.com> wrote: > > Dear Vignesh, > > > Here is a patch to persist to disk logical slots during a shutdown > > checkpoint if the updated confirmed_flush_lsn has not yet been > > persisted. > > Thanks for making the patch with different approach! Here are comments. > > 01. RestoreSlotFromDisk > > ``` > slot->candidate_xmin_lsn = InvalidXLogRecPtr; > slot->candidate_restart_lsn = InvalidXLogRecPtr; > slot->candidate_restart_valid = InvalidXLogRecPtr; > + slot->last_persisted_confirmed_flush = InvalidXLogRecPtr; > ``` > > last_persisted_confirmed_flush was set to InvalidXLogRecPtr, but isn't it > better > to use cp.slotdata. confirmed_flush? Assuming that the server is shut down > immediately, > your patch forces to save.
Modified > 02. t/002_always_persist.pl > > The original author of the patch is me, but I found that the test could pass > without your patch. This is because pg_logical_slot_get_changes()-> > pg_logical_slot_get_changes_guts(confirm = true) always mark the slot as > dirty. > IIUC we must use the logical replication system to verify the persistence. > Attached test can pass only when patch is applied. Update the test based on your another_test with slight modifications. Attached v4 version patch has the changes for the same. Regards, Vignesh
From 67ecd2665e6dfa8f5446140e16e45592644cead4 Mon Sep 17 00:00:00 2001 From: Julien Rouhaud <julien.rouh...@free.fr> Date: Fri, 14 Apr 2023 13:49:09 +0800 Subject: [PATCH v4] Persist to disk logical slots during a shutdown checkpoint if the updated confirmed_flush_lsn has not yet been persisted. It's entirely possible for a logical slot to have a confirmed_flush_lsn higher than the last value saved on disk while not being marked as dirty. It's currently not a problem to lose that value during a clean shutdown / restart cycle, but a later patch adding support for pg_upgrade of publications and logical slots will rely on that value being properly persisted to disk. Author: Julien Rouhaud Reviewed-by: Wang Wei, Peter Smith, Masahiko Sawada --- src/backend/access/transam/xlog.c | 2 +- src/backend/replication/slot.c | 33 ++++-- src/include/replication/slot.h | 5 +- src/test/subscription/meson.build | 1 + src/test/subscription/t/034_always_persist.pl | 106 ++++++++++++++++++ 5 files changed, 135 insertions(+), 12 deletions(-) create mode 100644 src/test/subscription/t/034_always_persist.pl diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 60c0b7ec3a..6dced61cf4 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7026,7 +7026,7 @@ static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags) { CheckPointRelationMap(); - CheckPointReplicationSlots(); + CheckPointReplicationSlots(flags & CHECKPOINT_IS_SHUTDOWN); CheckPointSnapBuild(); CheckPointLogicalRewriteHeap(); CheckPointReplicationOrigin(); diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 1dc27264f6..7fa683d563 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -109,7 +109,8 @@ static void ReplicationSlotDropPtr(ReplicationSlot *slot); /* internal persistency functions */ static void RestoreSlotFromDisk(const char *name); static void CreateSlotOnDisk(ReplicationSlot *slot); -static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel); +static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel, + bool is_shutdown); /* * Report shared-memory space needed by ReplicationSlotsShmemInit. @@ -321,6 +322,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->candidate_xmin_lsn = InvalidXLogRecPtr; slot->candidate_restart_valid = InvalidXLogRecPtr; slot->candidate_restart_lsn = InvalidXLogRecPtr; + slot->last_persisted_confirmed_flush = InvalidXLogRecPtr; /* * Create the slot on disk. We haven't actually marked the slot allocated @@ -783,7 +785,7 @@ ReplicationSlotSave(void) Assert(MyReplicationSlot != NULL); sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name)); - SaveSlotToPath(MyReplicationSlot, path, ERROR); + SaveSlotToPath(MyReplicationSlot, path, ERROR, false); } /* @@ -1565,11 +1567,10 @@ restart: /* * Flush all replication slots to disk. * - * This needn't actually be part of a checkpoint, but it's a convenient - * location. + * is_shutdown is true in case of a shutdown checkpoint. */ void -CheckPointReplicationSlots(void) +CheckPointReplicationSlots(bool is_shutdown) { int i; @@ -1594,7 +1595,7 @@ CheckPointReplicationSlots(void) /* save the slot to disk, locking is handled in SaveSlotToPath() */ sprintf(path, "pg_replslot/%s", NameStr(s->data.name)); - SaveSlotToPath(s, path, LOG); + SaveSlotToPath(s, path, LOG, is_shutdown); } LWLockRelease(ReplicationSlotAllocationLock); } @@ -1700,7 +1701,7 @@ CreateSlotOnDisk(ReplicationSlot *slot) /* Write the actual state file. */ slot->dirty = true; /* signal that we really need to write */ - SaveSlotToPath(slot, tmppath, ERROR); + SaveSlotToPath(slot, tmppath, ERROR, false); /* Rename the directory into place. */ if (rename(tmppath, path) != 0) @@ -1726,7 +1727,8 @@ CreateSlotOnDisk(ReplicationSlot *slot) * Shared functionality between saving and creating a replication slot. */ static void -SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel) +SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel, + bool is_shutdown) { char tmppath[MAXPGPATH]; char path[MAXPGPATH]; @@ -1740,8 +1742,16 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel) slot->just_dirtied = false; SpinLockRelease(&slot->mutex); - /* and don't do anything if there's nothing to write */ - if (!was_dirty) + /* + * Don't do anything if there's nothing to write, unless this is called for + * a logical slot during a shutdown checkpoint and if the updated + * confirmed_flush LSN has not yet been persisted, as we want to persist + * the updated confirmed_flush LSN in that case, even if that's the only + * modification. + */ + if (!was_dirty && + !(SlotIsLogical(slot) && is_shutdown && + (slot->data.confirmed_flush != slot->last_persisted_confirmed_flush))) return; LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE); @@ -1871,6 +1881,8 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel) SpinLockAcquire(&slot->mutex); if (!slot->just_dirtied) slot->dirty = false; + + slot->last_persisted_confirmed_flush = slot->data.confirmed_flush; SpinLockRelease(&slot->mutex); LWLockRelease(&slot->io_in_progress_lock); @@ -2067,6 +2079,7 @@ RestoreSlotFromDisk(const char *name) /* initialize in memory state */ slot->effective_xmin = cp.slotdata.xmin; slot->effective_catalog_xmin = cp.slotdata.catalog_xmin; + slot->last_persisted_confirmed_flush = cp.slotdata.confirmed_flush; slot->candidate_catalog_xmin = InvalidTransactionId; slot->candidate_xmin_lsn = InvalidXLogRecPtr; diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index a8a89dc784..b519f7af5f 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -178,6 +178,9 @@ typedef struct ReplicationSlot XLogRecPtr candidate_xmin_lsn; XLogRecPtr candidate_restart_valid; XLogRecPtr candidate_restart_lsn; + + /* The last persisted confirmed flush lsn */ + XLogRecPtr last_persisted_confirmed_flush; } ReplicationSlot; #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid) @@ -241,7 +244,7 @@ extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslo extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok); extern void StartupReplicationSlots(void); -extern void CheckPointReplicationSlots(void); +extern void CheckPointReplicationSlots(bool is_shutdown); extern void CheckSlotRequirements(void); extern void CheckSlotPermissions(void); diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build index bd673a9d68..cdd2f8ba47 100644 --- a/src/test/subscription/meson.build +++ b/src/test/subscription/meson.build @@ -40,6 +40,7 @@ tests += { 't/031_column_list.pl', 't/032_subscribe_use_index.pl', 't/033_run_as_table_owner.pl', + 't/034_always_persist.pl', 't/100_bugs.pl', ], }, diff --git a/src/test/subscription/t/034_always_persist.pl b/src/test/subscription/t/034_always_persist.pl new file mode 100644 index 0000000000..9973476fff --- /dev/null +++ b/src/test/subscription/t/034_always_persist.pl @@ -0,0 +1,106 @@ + +# Copyright (c) 2023, PostgreSQL Global Development Group + +# Test logical replication slots are always persisted to disk during a shutdown +# checkpoint. + +use strict; +use warnings; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +sub compare_confirmed_flush +{ + my ($node, $confirmed_flush_from_log) = @_; + + # Fetch Latest checkpoint location from the control file itself + my ($stdout, $stderr) = run_command([ 'pg_controldata', $node->data_dir ]); + my @control_data = split("\n", $stdout); + my $latest_checkpoint = undef; + foreach (@control_data) + { + if ($_ =~ /^Latest checkpoint location:\s*(.*)$/mg) + { + $latest_checkpoint = $1; + last; + } + } + die "Latest checkpoint location not found in control file found\n" + unless defined($latest_checkpoint); + + # Is it same as the value read from log? + ok($latest_checkpoint eq $confirmed_flush_from_log, + "Check the decoding starts from the confirmed_flush which is the same as the latest_checkpoint"); + + return; +} + +# Initialize publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('pub'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', q{ +autovacuum = off +checkpoint_timeout = 1h +}); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = PostgreSQL::Test::Cluster->new('sub'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create table +$node_publisher->safe_psql('postgres', "CREATE TABLE test_tbl (id int)"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tbl (id int)"); + +# Insert some data +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tbl VALUES (generate_series(1, 5));" +); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub FOR ALL TABLES"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub" +); + +$node_subscriber->wait_for_subscription_sync($node_publisher, 'sub'); + +my $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM test_tbl" +); + +is($result, qq(5), "check initial copy was done"); + +# Set wal_receiver_status_interval to zero to suppress keepalive messages +# between nodes. +$node_subscriber->append_conf('postgresql.conf', q{ +wal_receiver_status_interval = 0 +}); +$node_subscriber->reload(); + +my $offset = -s $node_publisher->logfile; + +# Restart publisher once. If the slot has persisted, the confirmed_flush_lsn +# becomes the same as the latest checkpoint location, which means the +# SHUTDOWN_CHECKPOINT record. +$node_publisher->restart(); + +# Wait until the walsender creates decoding context +$node_publisher->wait_for_log( + qr/Streaming transactions committing after ([A-F0-9]+\/[A-F0-9]+), reading WAL from ([A-F0-9]+\/[A-F0-9]+)./, + $offset +); + +# Extract confirmed_flush from the logfile +my $log_contents = slurp_file($node_publisher->logfile, $offset); +$log_contents =~ + qr/Streaming transactions committing after ([A-F0-9]+\/[A-F0-9]+), reading WAL from ([A-F0-9]+\/[A-F0-9]+)./ + or die "could not get confirmed_flush_lsn"; + +compare_confirmed_flush($node_publisher, $1); + +done_testing(); -- 2.34.1