Code review only of 0001-0005.

I noticed you had two 0008, btw.

On Fri, Apr 07, 2023 at 11:12:26AM -0700, Andres Freund wrote:
> Hi,
> 
> On 2023-04-07 08:47:57 -0700, Andres Freund wrote:
> > Integrated all of these.
> 
> From 0e038eb5dfddec500fbf4625775d1fa508a208f6 Mon Sep 17 00:00:00 2001
> From: Andres Freund <and...@anarazel.de>
> Date: Thu, 6 Apr 2023 20:00:07 -0700
> Subject: [PATCH va67 1/9] Replace a replication slot's invalidated_at LSN with
>  an enum
> 
> diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
> index 8872c80cdfe..ebcb637baed 100644
> --- a/src/include/replication/slot.h
> +++ b/src/include/replication/slot.h
> @@ -37,6 +37,17 @@ typedef enum ReplicationSlotPersistency
>       RS_TEMPORARY
>  } ReplicationSlotPersistency;
>  
> +/*
> + * Slots can be invalidated, e.g. due to max_slot_wal_keep_size. If so, the
> + * 'invalidated' field is set to a value other than _NONE.
> + */
> +typedef enum ReplicationSlotInvalidationCause
> +{
> +     RS_INVAL_NONE,
> +     /* required WAL has been removed */

I just wonder if RS_INVAL_WAL is too generic. Something like
RS_INVAL_WAL_MISSING or similar may be better since it seems there are
other inavlidation causes that may be related to WAL.

> +     RS_INVAL_WAL,
> +} ReplicationSlotInvalidationCause;
> +

0002 LGTM

> From 52c25cc15abc4470d19e305d245b9362e6b8d6a3 Mon Sep 17 00:00:00 2001
> From: Andres Freund <and...@anarazel.de>
> Date: Fri, 7 Apr 2023 09:32:48 -0700
> Subject: [PATCH va67 3/9] Support invalidating replication slots due to
>  horizon and wal_level
> MIME-Version: 1.0
> Content-Type: text/plain; charset=UTF-8
> Content-Transfer-Encoding: 8bit
> 
> Needed for supporting logical decoding on a standby. The new invalidation
> methods will be used in a subsequent commit.
> 

You probably are aware, but applying 0003 and 0004 both gives me two
warnings:

warning: 1 line adds whitespace errors.
Warning: commit message did not conform to UTF-8.
You may want to amend it after fixing the message, or set the config
variable i18n.commitEncoding to the encoding your project uses.

> diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
> index df23b7ed31e..c2a9accebf6 100644
> --- a/src/backend/replication/slot.c
> +++ b/src/backend/replication/slot.c
> @@ -1241,8 +1241,58 @@ ReplicationSlotReserveWal(void)
>  }
>  
>  /*
> - * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot
> - * and mark it invalid, if necessary and possible.
> + * Report that replication slot needs to be invalidated
> + */
> +static void
> +ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
> +                                        bool terminating,
> +                                        int pid,
> +                                        NameData slotname,
> +                                        XLogRecPtr restart_lsn,
> +                                        XLogRecPtr oldestLSN,
> +                                        TransactionId 
> snapshotConflictHorizon)
> +{
> +     StringInfoData err_detail;
> +     bool            hint = false;
> +
> +     initStringInfo(&err_detail);
> +
> +     switch (cause)
> +     {
> +             case RS_INVAL_WAL:
> +                     hint = true;
> +                     appendStringInfo(&err_detail, _("The slot's restart_lsn 
> %X/%X exceeds the limit by %llu bytes."),
> +                                                      
> LSN_FORMAT_ARGS(restart_lsn),

I'm not sure what the below cast is meant to do. If you are trying to
protect against overflow/underflow, I think you'd need to cast before
doing the subtraction.

> +                                                      (unsigned long long) 
> (oldestLSN - restart_lsn));
> +                     break;
> +             case RS_INVAL_HORIZON:
> +                     appendStringInfo(&err_detail, _("The slot conflicted 
> with xid horizon %u."),
> +                                                      
> snapshotConflictHorizon);
> +                     break;
> +
> +             case RS_INVAL_WAL_LEVEL:
> +                     appendStringInfo(&err_detail, _("Logical decoding on 
> standby requires wal_level to be at least logical on the primary server"));
> +                     break;
> +             case RS_INVAL_NONE:
> +                     pg_unreachable();
> +     }

This ereport is quite hard to read. Is there any simplification you can
do of the ternaries without undue duplication?

> +     ereport(LOG,
> +                     terminating ?
> +                     errmsg("terminating process %d to release replication 
> slot \"%s\"",
> +                                pid, NameStr(slotname)) :
> +                     errmsg("invalidating obsolete replication slot \"%s\"",
> +                                NameStr(slotname)),
> +                     errdetail_internal("%s", err_detail.data),
> +                     hint ? errhint("You might need to increase 
> max_slot_wal_keep_size.") : 0);
> +
> +     pfree(err_detail.data);
> +}
> +
> +/*
> + * Helper for InvalidateObsoleteReplicationSlots
> + *
> + * Acquires the given slot and mark it invalid, if necessary and possible.
>   *
>   * Returns whether ReplicationSlotControlLock was released in the interim 
> (and
>   * in that case we're not holding the lock at return, otherwise we are).
> @@ -1253,7 +1303,10 @@ ReplicationSlotReserveWal(void)
>   * for syscalls, so caller must restart if we return true.
>   */
>  static bool
> -InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
> +InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
> +                                                        ReplicationSlot *s,
> +                                                        XLogRecPtr oldestLSN,
> +                                                        Oid dboid, 
> TransactionId snapshotConflictHorizon,
>                                                          bool *invalidated)
>  {
>       int                     last_signaled_pid = 0;
> @@ -1264,6 +1317,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, 
> XLogRecPtr oldestLSN,
>               XLogRecPtr      restart_lsn;
>               NameData        slotname;
>               int                     active_pid = 0;
> +             ReplicationSlotInvalidationCause conflict = RS_INVAL_NONE;
>  
>               Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, 
> LW_SHARED));
>  
> @@ -1286,10 +1340,45 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, 
> XLogRecPtr oldestLSN,
>               restart_lsn = s->data.restart_lsn;
>  
>               /*
> -              * If the slot is already invalid or is fresh enough, we don't 
> need to
> -              * do anything.
> +              * If the slot is already invalid or is a non conflicting slot, 
> we
> +              * don't need to do anything.
>                */
> -             if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= 
> oldestLSN)
> +             if (s->data.invalidated == RS_INVAL_NONE)
> +             {
> +                     switch (cause)
> +                     {
> +                             case RS_INVAL_WAL:
> +                                     if (s->data.restart_lsn != 
> InvalidXLogRecPtr &&
> +                                             s->data.restart_lsn < oldestLSN)
> +                                             conflict = cause;
> +                                     break;

Should the below be an error? a physical slot with RS_INVAL_HORIZON
invalidation cause?

> +                             case RS_INVAL_HORIZON:
> +                                     if (!SlotIsLogical(s))
> +                                             break;
> +                                     /* invalid DB oid signals a shared 
> relation */
> +                                     if (dboid != InvalidOid && dboid != 
> s->data.database)
> +                                             break;
> +                                     if 
> (TransactionIdIsValid(s->effective_xmin) &&
> +                                             
> TransactionIdPrecedesOrEquals(s->effective_xmin,
> +                                                                             
>                           snapshotConflictHorizon))
> +                                             conflict = cause;
> +                                     else if 
> (TransactionIdIsValid(s->effective_catalog_xmin) &&
> +                                                      
> TransactionIdPrecedesOrEquals(s->effective_catalog_xmin,
> +                                                                             
>                                    snapshotConflictHorizon))
> +                                             conflict = cause;
> +                                     break;
> +                             case RS_INVAL_WAL_LEVEL:
> +                                     if (SlotIsLogical(s))
> +                                             conflict = cause;
> +                                     break;

All three of default, pg_unreachable(), and break seems a bit like
overkill. Perhaps remove the break?

> +                             default:
> +                                     pg_unreachable();
> +                                     break;
> +                     }
> +             }
> +

> @@ -1390,14 +1476,11 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, 
> XLogRecPtr oldestLSN,
>                       ReplicationSlotMarkDirty();
>                       ReplicationSlotSave();
>                       ReplicationSlotRelease();
> +                     pgstat_drop_replslot(s);
>  
> -                     ereport(LOG,
> -                                     errmsg("invalidating obsolete 
> replication slot \"%s\"",
> -                                                NameStr(slotname)),
> -                                     errdetail("The slot's restart_lsn %X/%X 
> exceeds the limit by %llu bytes.",
> -                                                       
> LSN_FORMAT_ARGS(restart_lsn),
> -                                                       (unsigned long long) 
> (oldestLSN - restart_lsn)),
> -                                     errhint("You might need to increase 
> max_slot_wal_keep_size."));
> +                     ReportSlotInvalidation(conflict, false, active_pid,
> +                                                                slotname, 
> restart_lsn,
> +                                                                oldestLSN, 
> snapshotConflictHorizon);
>  
>                       /* done with this slot for now */
>                       break;
> @@ -1410,19 +1493,33 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, 
> XLogRecPtr oldestLSN,
>  }
>  
>  /*
> - * Mark any slot that points to an LSN older than the given segment
> - * as invalid; it requires WAL that's about to be removed.
> + * Invalidate slots that require resources about to be removed.
>   *
>   * Returns true when any slot have got invalidated.
>   *
> + * Whether a slot needs to be invalidated depends on the cause. A slot is
> + * removed if it:
> + * - RS_INVAL_WAL: requires a LSN older than the given segment
> + * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon, in the 
> given db
> +     dboid may be InvalidOid for shared relations

the comma above reduces readability

is this what you mean?

RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
db; dboid may be InvalidOid for shared relations

> From 311a1d8f9c2d1acf0c22e091d53f7a533073c8b7 Mon Sep 17 00:00:00 2001
> From: Andres Freund <and...@anarazel.de>
> Date: Fri, 7 Apr 2023 09:56:02 -0700
> Subject: [PATCH va67 4/9] Handle logical slot conflicts on standby
> MIME-Version: 1.0
> Content-Type: text/plain; charset=UTF-8
> Content-Transfer-Encoding: 8bit
> 
> During WAL replay on standby, when slot conflict is identified, invalidate
> such slots. Also do the same thing if wal_level on the primary server is
> reduced to below logical and there are existing logical slots on
> standby. Introduce a new ProcSignalReason value for slot conflict recovery.
> 
> Author: "Drouvot, Bertrand" <bertranddrouvot...@gmail.com>
> Author: Andres Freund <and...@anarazel.de>
> Author: Amit Khandekar <amitdkhan...@gmail.com> (in an older version)
> Reviewed-by: "Drouvot, Bertrand" <bertranddrouvot...@gmail.com>
> Reviewed-by: Andres Freund <and...@anarazel.de>
> Reviewed-by: Robert Haas <robertmh...@gmail.com>
> Reviewed-by: Fabr�zio de Royes Mello <fabriziome...@gmail.com>
> Reviewed-by: Bharath Rupireddy <bharath.rupireddyforpostg...@gmail.com>
> Reviewed-by: Amit Kapila <amit.kapil...@gmail.com>
> Reviewed-by: Alvaro Herrera <alvhe...@alvh.no-ip.org>
> Discussion: 
> https://postgr.es/m/20230407075009.igg7be27ha2ht...@awork3.anarazel.de
> ---
>  src/include/storage/procsignal.h     |  1 +
>  src/include/storage/standby.h        |  2 ++
>  src/backend/access/gist/gistxlog.c   |  2 ++
>  src/backend/access/hash/hash_xlog.c  |  1 +
>  src/backend/access/heap/heapam.c     |  3 +++
>  src/backend/access/nbtree/nbtxlog.c  |  2 ++
>  src/backend/access/spgist/spgxlog.c  |  1 +
>  src/backend/access/transam/xlog.c    | 15 +++++++++++++++
>  src/backend/replication/slot.c       |  8 +++++++-
>  src/backend/storage/ipc/procsignal.c |  3 +++
>  src/backend/storage/ipc/standby.c    | 20 +++++++++++++++++++-
>  src/backend/tcop/postgres.c          |  9 +++++++++
>  12 files changed, 65 insertions(+), 2 deletions(-)
> 
> diff --git a/src/include/storage/procsignal.h 
> b/src/include/storage/procsignal.h
> index 905af2231ba..2f52100b009 100644
> --- a/src/include/storage/procsignal.h
> +++ b/src/include/storage/procsignal.h
> @@ -42,6 +42,7 @@ typedef enum
>       PROCSIG_RECOVERY_CONFLICT_TABLESPACE,
>       PROCSIG_RECOVERY_CONFLICT_LOCK,
>       PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
> +     PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
>       PROCSIG_RECOVERY_CONFLICT_BUFFERPIN,
>       PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK,
>  
> diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
> index 2effdea126f..41f4dc372e6 100644
> --- a/src/include/storage/standby.h
> +++ b/src/include/storage/standby.h
> @@ -30,8 +30,10 @@ extern void InitRecoveryTransactionEnvironment(void);
>  extern void ShutdownRecoveryTransactionEnvironment(void);
>  
>  extern void ResolveRecoveryConflictWithSnapshot(TransactionId 
> snapshotConflictHorizon,
> +                                                                             
>                 bool isCatalogRel,
>                                                                               
>                 RelFileLocator locator);
>  extern void ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId 
> snapshotConflictHorizon,
> +                                                                             
>                            bool isCatalogRel,
>                                                                               
>                            RelFileLocator locator);
>  extern void ResolveRecoveryConflictWithTablespace(Oid tsid);
>  extern void ResolveRecoveryConflictWithDatabase(Oid dbid);
> diff --git a/src/backend/access/gist/gistxlog.c 
> b/src/backend/access/gist/gistxlog.c
> index b7678f3c144..9a86fb3feff 100644
> --- a/src/backend/access/gist/gistxlog.c
> +++ b/src/backend/access/gist/gistxlog.c
> @@ -197,6 +197,7 @@ gistRedoDeleteRecord(XLogReaderState *record)
>               XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
>  
>               
> ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
> +                                                                             
>         xldata->isCatalogRel,
>                                                                               
>         rlocator);
>       }
>  
> @@ -390,6 +391,7 @@ gistRedoPageReuse(XLogReaderState *record)
>        */
>       if (InHotStandby)
>               
> ResolveRecoveryConflictWithSnapshotFullXid(xlrec->snapshotConflictHorizon,
> +                                                                             
>                    xlrec->isCatalogRel,
>                                                                               
>                    xlrec->locator);
>  }
>  
> diff --git a/src/backend/access/hash/hash_xlog.c 
> b/src/backend/access/hash/hash_xlog.c
> index f2dd9be8d3f..e8e06c62a95 100644
> --- a/src/backend/access/hash/hash_xlog.c
> +++ b/src/backend/access/hash/hash_xlog.c
> @@ -1003,6 +1003,7 @@ hash_xlog_vacuum_one_page(XLogReaderState *record)
>  
>               XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
>               
> ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
> +                                                                             
>         xldata->isCatalogRel,
>                                                                               
>         rlocator);
>       }
>  
> diff --git a/src/backend/access/heap/heapam.c 
> b/src/backend/access/heap/heapam.c
> index 8b13e3f8925..f389ceee1ea 100644
> --- a/src/backend/access/heap/heapam.c
> +++ b/src/backend/access/heap/heapam.c
> @@ -8769,6 +8769,7 @@ heap_xlog_prune(XLogReaderState *record)
>        */
>       if (InHotStandby)
>               
> ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
> +                                                                             
>         xlrec->isCatalogRel,
>                                                                               
>         rlocator);
>  
>       /*
> @@ -8940,6 +8941,7 @@ heap_xlog_visible(XLogReaderState *record)
>        */
>       if (InHotStandby)
>               
> ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
> +                                                                             
>         xlrec->flags & VISIBILITYMAP_XLOG_CATALOG_REL,
>                                                                               
>         rlocator);
>  
>       /*
> @@ -9061,6 +9063,7 @@ heap_xlog_freeze_page(XLogReaderState *record)
>  
>               XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
>               
> ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
> +                                                                             
>         xlrec->isCatalogRel,
>                                                                               
>         rlocator);
>       }
>  
> diff --git a/src/backend/access/nbtree/nbtxlog.c 
> b/src/backend/access/nbtree/nbtxlog.c
> index 414ca4f6deb..c87e46ed66e 100644
> --- a/src/backend/access/nbtree/nbtxlog.c
> +++ b/src/backend/access/nbtree/nbtxlog.c
> @@ -669,6 +669,7 @@ btree_xlog_delete(XLogReaderState *record)
>               XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
>  
>               
> ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
> +                                                                             
>         xlrec->isCatalogRel,
>                                                                               
>         rlocator);
>       }
>  
> @@ -1007,6 +1008,7 @@ btree_xlog_reuse_page(XLogReaderState *record)
>  
>       if (InHotStandby)
>               
> ResolveRecoveryConflictWithSnapshotFullXid(xlrec->snapshotConflictHorizon,
> +                                                                             
>                    xlrec->isCatalogRel,
>                                                                               
>                    xlrec->locator);
>  }
>  
> diff --git a/src/backend/access/spgist/spgxlog.c 
> b/src/backend/access/spgist/spgxlog.c
> index b071b59c8ac..459ac929ba5 100644
> --- a/src/backend/access/spgist/spgxlog.c
> +++ b/src/backend/access/spgist/spgxlog.c
> @@ -879,6 +879,7 @@ spgRedoVacuumRedirect(XLogReaderState *record)
>  
>               XLogRecGetBlockTag(record, 0, &locator, NULL, NULL);
>               
> ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
> +                                                                             
>         xldata->isCatalogRel,
>                                                                               
>         locator);
>       }
>  
> diff --git a/src/backend/access/transam/xlog.c 
> b/src/backend/access/transam/xlog.c
> index 1485e8f9ca9..5227fc675c8 100644
> --- a/src/backend/access/transam/xlog.c
> +++ b/src/backend/access/transam/xlog.c
> @@ -7965,6 +7965,21 @@ xlog_redo(XLogReaderState *record)
>               /* Update our copy of the parameters in pg_control */
>               memcpy(&xlrec, XLogRecGetData(record), 
> sizeof(xl_parameter_change));
>  
> +             /*
> +              * Invalidate logical slots if we are in hot standby and the 
> primary
> +              * does not have a WAL level sufficient for logical decoding. 
> No need
> +              * to search for potentially conflicting logically slots if 
> standby is
> +              * running with wal_level lower than logical, because in that 
> case, we
> +              * would have either disallowed creation of logical slots or
> +              * invalidated existing ones.
> +              */
> +             if (InRecovery && InHotStandby &&
> +                     xlrec.wal_level < WAL_LEVEL_LOGICAL &&
> +                     wal_level >= WAL_LEVEL_LOGICAL)
> +                     InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_LEVEL,
> +                                                                             
>            0, InvalidOid,
> +                                                                             
>            InvalidTransactionId);
> +
>               LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
>               ControlFile->MaxConnections = xlrec.MaxConnections;
>               ControlFile->max_worker_processes = xlrec.max_worker_processes;
> diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
> index c2a9accebf6..1b1b51e21ed 100644
> --- a/src/backend/replication/slot.c
> +++ b/src/backend/replication/slot.c
> @@ -1443,7 +1443,13 @@ 
> InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
>                                                                          
> slotname, restart_lsn,
>                                                                          
> oldestLSN, snapshotConflictHorizon);
>  
> -                             (void) kill(active_pid, SIGTERM);
> +                             if (MyBackendType == B_STARTUP)

Is SendProcSignal() marked warn_unused_result or something? I don't see
other callers who don't use its return value void casting it.

> +                                     (void) SendProcSignal(active_pid,
> +                                                                             
>   PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
> +                                                                             
>   InvalidBackendId);
> +                             else
> +                                     (void) kill(active_pid, SIGTERM);
> +
>                               last_signaled_pid = active_pid;
>                       }

> diff --git a/src/backend/storage/ipc/standby.c 
> b/src/backend/storage/ipc/standby.c
> index 9f56b4e95cf..3b5d654347e 100644
> --- a/src/backend/storage/ipc/standby.c
> +++ b/src/backend/storage/ipc/standby.c
> @@ -24,6 +24,7 @@
>  #include "access/xlogutils.h"
>  #include "miscadmin.h"
>  #include "pgstat.h"
> +#include "replication/slot.h"
>  #include "storage/bufmgr.h"
>  #include "storage/lmgr.h"
>  #include "storage/proc.h"
> @@ -466,6 +467,7 @@ 
> ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
>   */
>  void
>  ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
> +                                                                     bool 
> isCatalogRel,
>                                                                       
> RelFileLocator locator)
>  {
>       VirtualTransactionId *backends;
> @@ -491,6 +493,16 @@ ResolveRecoveryConflictWithSnapshot(TransactionId 
> snapshotConflictHorizon,
>                                                                               
>    PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
>                                                                               
>    WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT,
>                                                                               
>    true);
> +
> +     /*
> +      * Note that WaitExceedsMaxStandbyDelay() is not taken into account here
> +      * (as opposed to ResolveRecoveryConflictWithVirtualXIDs() above). That
> +      * seems OK, given that this kind of conflict should not normally be

do you mean "when using a physical replication slot"?

> +      * reached, e.g. by using a physical replication slot.
> +      */
> +     if (wal_level >= WAL_LEVEL_LOGICAL && isCatalogRel)
> +             InvalidateObsoleteReplicationSlots(RS_INVAL_HORIZON, 0, 
> locator.dbOid,
> +                                                                             
>    snapshotConflictHorizon);
>  }


0005 LGTM

- Melanie


Reply via email to