Hello,
On 2026-Apr-06, Mihail Nikalayeu wrote:
> > Yeah but I don't want the virtual tuple to be materialized (which would
> > happen in tts_virtual_materialize if I set shouldFree=true). The memory
> > should be freed in
> > ResetPerTupleExprContext(chgcxt->cc_estate);
> > anyway, right? Maybe deserves a comment.
>
> Not sure, ResetPerTupleExprContext resets just "ExecutorState".
> But slots are created in another memory context.
>
> Also, we can't reset slot->tts_mcxt itself - it will free the slot also.
So what I ended up doing, is to just not change to the slot's context in
restore_tuple. That was just dumb (mea culpa). So all the memory we
allocate for the slot lives in the executor context, and goes away on
the ResetPerTupleExprContext call at the bottom of the loop. BTW I
don't understand why you say that function only resets ExecutorState --
as far as I can tell, it does this
#define ResetPerTupleExprContext(estate) \
do { \
if ((estate)->es_per_tuple_exprcontext) \
ResetExprContext((estate)->es_per_tuple_exprcontext); \
} while (0)
which in turn does
#define ResetExprContext(econtext) \
MemoryContextReset((econtext)->ecxt_per_tuple_memory)
which AFAICT is exactly what we want.
Anyway, here's the three missing parts. I have not yet edited the
deadlock-checker one to protect autovacuum from processing tables under
repack.
--
Álvaro Herrera Breisgau, Deutschland — https://www.EnterpriseDB.com/
>From b114dd66af9423f3ac611f83ce033f91b2553d5b Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=C3=81lvaro=20Herrera?= <[email protected]>
Date: Tue, 7 Apr 2026 00:07:17 +0200
Subject: [PATCH v56 1/3] Allow logical replication snapshots to be
database-specific
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
By default, the logical decoding assumes access to shared catalogs, so
the snapshot builder needs to consider cluster-wide XIDs during startup.
That in turn means that, if any transaction is already running (and has
XID assigned), the snapshot builder needs to wait for its completion, as
it does not know if that transaction performed catalog changes earlier.
A possible problem with this concept is that if REPACK (CONCURRENTLY) is
running in some database, backends running the same command in other
databases get stuck until the first one has committed. Thus only a
single backend in the cluster can run REPACK (CONCURRENTLY) at any time.
Likewise, REPACK (CONCURRENTLY) can block walsenders starting on behalf
of subscriptions throughout the cluster.
This patch adds a new option to logical replication output plugin, to
declare that it does not use shared catalogs (i.e. catalogs that can be
changed by transactions running in other databases in the cluster). In
that case, no snapshot the backend will use during the decoding needs to
contain information about transactions running in other databases. Thus
the snapshot builder only needs to wait for completion of transactions
in the current database.
Currently we only use this option in the REPACK background worker. It
could possibly be used in the plugin for logical replication too,
however that would need thorough analysis of that plugin.
Bump WAL version number, due to a new field in xl_running_xacts.
Author: Antonin Houska <[email protected]>
Reviewed-by: Álvaro Herrera <[email protected]>
Discussion: https://postgr.es/m/90475.1775218118@localhost
---
contrib/pg_visibility/pg_visibility.c | 4 +-
doc/src/sgml/logicaldecoding.sgml | 4 ++
src/backend/access/index/genam.c | 8 +++
src/backend/access/rmgrdesc/standbydesc.c | 2 +
src/backend/access/transam/xlog.c | 2 +-
src/backend/access/transam/xlogfuncs.c | 2 +-
src/backend/postmaster/bgwriter.c | 2 +-
src/backend/replication/logical/decode.c | 17 +++++-
src/backend/replication/logical/logical.c | 3 +
src/backend/replication/logical/snapbuild.c | 63 ++++++++++++++++++++-
src/backend/replication/pgrepack/pgrepack.c | 7 +++
src/backend/replication/slot.c | 2 +-
src/backend/storage/ipc/procarray.c | 23 +++++++-
src/backend/storage/ipc/standby.c | 24 +++++++-
src/include/access/genam.h | 1 +
src/include/access/xlog_internal.h | 2 +-
src/include/miscadmin.h | 9 +++
src/include/replication/output_plugin.h | 1 +
src/include/replication/snapbuild.h | 3 +-
src/include/storage/procarray.h | 2 +-
src/include/storage/standby.h | 3 +-
src/include/storage/standbydefs.h | 1 +
22 files changed, 167 insertions(+), 18 deletions(-)
diff --git a/contrib/pg_visibility/pg_visibility.c b/contrib/pg_visibility/pg_visibility.c
index dfab0b64cf5..d564bd2a00c 100644
--- a/contrib/pg_visibility/pg_visibility.c
+++ b/contrib/pg_visibility/pg_visibility.c
@@ -621,7 +621,7 @@ GetStrictOldestNonRemovableTransactionId(Relation rel)
else if (rel == NULL || rel->rd_rel->relisshared)
{
/* Shared relation: take into account all running xids */
- runningTransactions = GetRunningTransactionData();
+ runningTransactions = GetRunningTransactionData(InvalidOid);
LWLockRelease(ProcArrayLock);
LWLockRelease(XidGenLock);
return runningTransactions->oldestRunningXid;
@@ -632,7 +632,7 @@ GetStrictOldestNonRemovableTransactionId(Relation rel)
* Normal relation: take into account xids running within the current
* database
*/
- runningTransactions = GetRunningTransactionData();
+ runningTransactions = GetRunningTransactionData(InvalidOid);
LWLockRelease(ProcArrayLock);
LWLockRelease(XidGenLock);
return runningTransactions->oldestDatabaseRunningXid;
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 6dc49108997..9b1d68d0de6 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -959,6 +959,7 @@ typedef struct OutputPluginOptions
{
OutputPluginOutputType output_type;
bool receive_rewrites;
+ bool need_shared_catalogs;
} OutputPluginOptions;
</programlisting>
<literal>output_type</literal> has to either be set to
@@ -969,6 +970,9 @@ typedef struct OutputPluginOptions
also be called for changes made by heap rewrites during certain DDL
operations. These are of interest to plugins that handle DDL
replication, but they require special handling.
+ <literal>need_shared_catalogs</literal> can be set to false if you are
+ certain the plugin functions do not access shared system catalogs.
+ Doing so can speed up creation of replication slots that use this plugin.
</para>
<para>
diff --git a/src/backend/access/index/genam.c b/src/backend/access/index/genam.c
index 1408989c568..97d44b84622 100644
--- a/src/backend/access/index/genam.c
+++ b/src/backend/access/index/genam.c
@@ -394,6 +394,14 @@ systable_beginscan(Relation heapRelation,
SysScanDesc sysscan;
Relation irel;
+ /*
+ * If this backend promised that it won't access shared catalogs during
+ * logical decoding, this it the right place to verify.
+ */
+ Assert(!HistoricSnapshotActive() ||
+ accessSharedCatalogsInDecoding ||
+ !heapRelation->rd_rel->relisshared);
+
if (indexOK &&
!IgnoreSystemIndexes &&
!ReindexIsProcessingIndex(indexId))
diff --git a/src/backend/access/rmgrdesc/standbydesc.c b/src/backend/access/rmgrdesc/standbydesc.c
index 0a291354ae2..685d1bdb024 100644
--- a/src/backend/access/rmgrdesc/standbydesc.c
+++ b/src/backend/access/rmgrdesc/standbydesc.c
@@ -41,6 +41,8 @@ standby_desc_running_xacts(StringInfo buf, xl_running_xacts *xlrec)
for (i = 0; i < xlrec->subxcnt; i++)
appendStringInfo(buf, " %u", xlrec->xids[xlrec->xcnt + i]);
}
+
+ appendStringInfo(buf, "; dbid: %u", xlrec->dbid);
}
void
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index b82af9a85c0..3f08a832ca6 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7735,7 +7735,7 @@ CreateCheckPoint(int flags)
* recovery we don't need to write running xact data.
*/
if (!shutdown && XLogStandbyInfoActive())
- LogStandbySnapshot();
+ LogStandbySnapshot(InvalidOid);
START_CRIT_SECTION();
diff --git a/src/backend/access/transam/xlogfuncs.c b/src/backend/access/transam/xlogfuncs.c
index 65bbaeda59c..0f5979691e6 100644
--- a/src/backend/access/transam/xlogfuncs.c
+++ b/src/backend/access/transam/xlogfuncs.c
@@ -245,7 +245,7 @@ pg_log_standby_snapshot(PG_FUNCTION_ARGS)
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("pg_log_standby_snapshot() can only be used if \"wal_level\" >= \"replica\"")));
- recptr = LogStandbySnapshot();
+ recptr = LogStandbySnapshot(InvalidOid);
/*
* As a convenience, return the WAL location of the last inserted record
diff --git a/src/backend/postmaster/bgwriter.c b/src/backend/postmaster/bgwriter.c
index 1d8947774a9..a30de4262eb 100644
--- a/src/backend/postmaster/bgwriter.c
+++ b/src/backend/postmaster/bgwriter.c
@@ -289,7 +289,7 @@ BackgroundWriterMain(const void *startup_data, size_t startup_data_len)
if (now >= timeout &&
last_snapshot_lsn <= GetLastImportantRecPtr())
{
- last_snapshot_lsn = LogStandbySnapshot();
+ last_snapshot_lsn = LogStandbySnapshot(InvalidOid);
last_snapshot_ts = now;
}
}
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index c9fea8cad28..38c5a4f5540 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -382,7 +382,16 @@ standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
xl_running_xacts *running = (xl_running_xacts *) XLogRecGetData(r);
- SnapBuildProcessRunningXacts(builder, buf->origptr, running);
+ /*
+ * Update this decoder's idea of transactions currently
+ * running. In doing so we will determine whether we have
+ * reached consistent status.
+ *
+ * If the output plugin doesn't need access to shared
+ * catalogs, we can ignore transactions in other databases.
+ */
+ SnapBuildProcessRunningXacts(builder, buf->origptr, running,
+ !ctx->options.need_shared_catalogs);
/*
* Abort all transactions that we keep track of, that are
@@ -392,8 +401,12 @@ standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
* all running transactions which includes prepared ones,
* while shutdown checkpoints just know that no non-prepared
* transactions are in progress.
+ *
+ * The database-specific records might work here too, but it's
+ * not their purpose.
*/
- ReorderBufferAbortOld(ctx->reorder, running->oldestRunningXid);
+ if (!OidIsValid(running->dbid))
+ ReorderBufferAbortOld(ctx->reorder, running->oldestRunningXid);
}
break;
case XLOG_STANDBY_LOCK:
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index f20d0c542f3..8ceaf64d164 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -285,6 +285,9 @@ StartupDecodingContext(List *output_plugin_options,
ctx->write = do_write;
ctx->update_progress = update_progress;
+ /* Assume shared catalog access. The startup callback can change it. */
+ ctx->options.need_shared_catalogs = true;
+
ctx->output_plugin_options = output_plugin_options;
ctx->fast_forward = fast_forward;
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index b4269a3b102..ffb4ab2cf2a 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -154,6 +154,14 @@
static ResourceOwner SavedResourceOwnerDuringExport = NULL;
static bool ExportInProgress = false;
+/*
+ * If a backend is going to do logical decoding and the output plugin does
+ * not need to access shared catalogs, setting this variable to false can make
+ * the decoding startup faster. In particular, the backend will not need to
+ * wait for completion of already running transactions in other databases.
+ */
+bool accessSharedCatalogsInDecoding = true;
+
/* ->committed and ->catchange manipulation */
static void SnapBuildPurgeOlderTxn(SnapBuild *builder);
@@ -170,7 +178,8 @@ static inline bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, Transaction
uint32 xinfo);
/* xlog reading helper functions for SnapBuildProcessRunningXacts */
-static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running);
+static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn,
+ xl_running_xacts *running);
static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff);
/* serialization functions */
@@ -226,6 +235,9 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
MemoryContextSwitchTo(oldcontext);
+ /* The default is that shared catalog are used. */
+ accessSharedCatalogsInDecoding = true;
+
return builder;
}
@@ -244,6 +256,9 @@ FreeSnapshotBuilder(SnapBuild *builder)
builder->snapshot = NULL;
}
+ /* The default is that shared catalog are used. */
+ accessSharedCatalogsInDecoding = true;
+
/* other resources are deallocated via memory context reset */
MemoryContextDelete(context);
}
@@ -1136,7 +1151,8 @@ SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
* anymore.
*/
void
-SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
+SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running,
+ bool db_specific)
{
ReorderBufferTXN *txn;
TransactionId xmin;
@@ -1148,6 +1164,33 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
*/
if (builder->state < SNAPBUILD_CONSISTENT)
{
+ /*
+ * To reduce the potential for unnecessarily waiting for completion of
+ * unrelated transactions, the caller can declare that only
+ * transactions of the current database are relevant at this stage.
+ */
+ if (db_specific)
+ {
+ /*
+ * If we must only keep track of transactions running in the
+ * current database, we need transaction info from exactly that
+ * database.
+ */
+ if (running->dbid != MyDatabaseId)
+ {
+ LogStandbySnapshot(MyDatabaseId);
+
+ return;
+ }
+
+ /*
+ * We'd better be able to check during scan if the plugin does not
+ * lie.
+ */
+ if (accessSharedCatalogsInDecoding)
+ accessSharedCatalogsInDecoding = false;
+ }
+
/* returns false if there's no point in performing cleanup just yet */
if (!SnapBuildFindSnapshot(builder, lsn, running))
return;
@@ -1155,6 +1198,16 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
else
SnapBuildSerialize(builder, lsn);
+ /*
+ * Database specific transaction info may exist to reach CONSISTENT state
+ * faster, however the code below makes no use of it. Moreover, such
+ * record might cause problems because the following normal (cluster-wide)
+ * record can have lower value of oldestRunningXid. In that case, let's
+ * wait with the cleanup for the next regular cluster-wide record.
+ */
+ if (OidIsValid(running->dbid))
+ return;
+
/*
* Update range of interesting xids based on the running xacts
* information. We don't increase ->xmax using it, because once we are in
@@ -1465,7 +1518,11 @@ SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
*/
if (!RecoveryInProgress())
{
- LogStandbySnapshot();
+ /*
+ * If the last transaction info was about specific database, so needs
+ * to be the next one - at least until we're in the CONSISTENT state.
+ */
+ LogStandbySnapshot(running->dbid);
}
}
diff --git a/src/backend/replication/pgrepack/pgrepack.c b/src/backend/replication/pgrepack/pgrepack.c
index 457b6518a8e..4da66aa0f44 100644
--- a/src/backend/replication/pgrepack/pgrepack.c
+++ b/src/backend/replication/pgrepack/pgrepack.c
@@ -52,6 +52,13 @@ repack_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
/* Probably unnecessary, as we don't use the SQL interface ... */
opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
+ /*
+ * REPACK doesn't need access to shared catalogs, so we can speed up the
+ * historic snapshot creation by setting this flag. We'll only have to
+ * wait for transactions in our database.
+ */
+ opt->need_shared_catalogs = false;
+
if (ctx->output_plugin_options != NIL)
{
ereport(ERROR,
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 21a213a0ebf..a1f37e59dbc 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1756,7 +1756,7 @@ ReplicationSlotReserveWal(void)
XLogRecPtr flushptr;
/* make sure we have enough information to start */
- flushptr = LogStandbySnapshot();
+ flushptr = LogStandbySnapshot(InvalidOid);
/* and make sure it's fsynced to disk */
XLogFlush(flushptr);
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index f540bb6b23f..9299bcebbda 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -2623,9 +2623,11 @@ ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc)
*
* Note that if any transaction has overflowed its cached subtransactions
* then there is no real need include any subtransactions.
+ *
+ * If 'dbid' is valid, only gather transactions running in that database.
*/
RunningTransactions
-GetRunningTransactionData(void)
+GetRunningTransactionData(Oid dbid)
{
/* result workspace */
static RunningTransactionsData CurrentRunningXactsData;
@@ -2700,6 +2702,18 @@ GetRunningTransactionData(void)
if (!TransactionIdIsValid(xid))
continue;
+ /*
+ * Filter by database OID if requested.
+ */
+ if (OidIsValid(dbid))
+ {
+ int pgprocno = arrayP->pgprocnos[index];
+ PGPROC *proc = &allProcs[pgprocno];
+
+ if (proc->databaseId != dbid)
+ continue;
+ }
+
/*
* Be careful not to exclude any xids before calculating the values of
* oldestRunningXid and suboverflowed, since these are used to clean
@@ -2750,6 +2764,12 @@ GetRunningTransactionData(void)
PGPROC *proc = &allProcs[pgprocno];
int nsubxids;
+ /*
+ * Filter by database OID if requested.
+ */
+ if (OidIsValid(dbid) && proc->databaseId != dbid)
+ continue;
+
/*
* Save subtransaction XIDs. Other backends can't add or remove
* entries while we're holding XidGenLock.
@@ -2783,6 +2803,7 @@ GetRunningTransactionData(void)
* increases if slots do.
*/
+ CurrentRunningXacts->dbid = dbid;
CurrentRunningXacts->xcnt = count - subcount;
CurrentRunningXacts->subxcnt = subcount;
CurrentRunningXacts->subxid_status = suboverflowed ? SUBXIDS_IN_SUBTRANS : SUBXIDS_IN_ARRAY;
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index de9092fdf5b..29af7733948 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -1188,6 +1188,14 @@ standby_redo(XLogReaderState *record)
xl_running_xacts *xlrec = (xl_running_xacts *) XLogRecGetData(record);
RunningTransactionsData running;
+ /*
+ * Records issued for specific database are not suitable for physical
+ * replication because that affects the whole cluster. In particular,
+ * the list of XID is probably incomplete here.
+ */
+ if (OidIsValid(xlrec->dbid))
+ return;
+
running.xcnt = xlrec->xcnt;
running.subxcnt = xlrec->subxcnt;
running.subxid_status = xlrec->subxid_overflow ? SUBXIDS_MISSING : SUBXIDS_IN_ARRAY;
@@ -1277,11 +1285,22 @@ standby_redo(XLogReaderState *record)
* as there's no independent knob to just enable logical decoding. For
* details of how this is used, check snapbuild.c's introductory comment.
*
+ * If 'dbid' is valid, only gather transactions running in that
+ * database. snapbuild.c can use such running xacts information for faster
+ * startup, but it still needs normal (cluster-wide) during the actual
+ * decoding - see standby_decode() and SnapBuildProcessRunningXacts() for
+ * details. Other processes (e.g. checkpointer) issue the cluster-wide records
+ * whether logical decoding is active or not.
+ *
+ * Please be careful about using this argument for other purposes. In
+ * particular, physical replication *must* ignore the database-specific
+ * records, exactly because they do not cover the whole cluster - see
+ * standby_redo().
*
* Returns the RecPtr of the last inserted record.
*/
XLogRecPtr
-LogStandbySnapshot(void)
+LogStandbySnapshot(Oid dbid)
{
XLogRecPtr recptr;
RunningTransactions running;
@@ -1314,7 +1333,7 @@ LogStandbySnapshot(void)
* Log details of all in-progress transactions. This should be the last
* record we write, because standby will open up when it sees this.
*/
- running = GetRunningTransactionData();
+ running = GetRunningTransactionData(dbid);
/*
* GetRunningTransactionData() acquired ProcArrayLock, we must release it.
@@ -1358,6 +1377,7 @@ LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
xl_running_xacts xlrec;
XLogRecPtr recptr;
+ xlrec.dbid = CurrRunningXacts->dbid;
xlrec.xcnt = CurrRunningXacts->xcnt;
xlrec.subxcnt = CurrRunningXacts->subxcnt;
xlrec.subxid_overflow = (CurrRunningXacts->subxid_status != SUBXIDS_IN_ARRAY);
diff --git a/src/include/access/genam.h b/src/include/access/genam.h
index b69320a7fc8..de55821e414 100644
--- a/src/include/access/genam.h
+++ b/src/include/access/genam.h
@@ -136,6 +136,7 @@ typedef struct IndexOrderByDistance
bool isnull;
} IndexOrderByDistance;
+
/*
* generalized index_ interface routines (in indexam.c)
*/
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index 10c18d39ff8..13ae3ad4fbb 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -32,7 +32,7 @@
/*
* Each page of XLOG file has a header like this:
*/
-#define XLOG_PAGE_MAGIC 0xD11E /* can be used as WAL version indicator */
+#define XLOG_PAGE_MAGIC 0xD11F /* can be used as WAL version indicator */
typedef struct XLogPageHeaderData
{
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index 2e10e3c814d..93b7816c09c 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -310,6 +310,15 @@ extern void PreventCommandIfReadOnly(const char *cmdname);
extern void PreventCommandIfParallelMode(const char *cmdname);
extern void PreventCommandDuringRecovery(const char *cmdname);
+/* in replication/snapbuild.c */
+
+/*
+ * Keep track of whether logical decoding in this backend promised not to
+ * access shared catalogs, as a safety check. This is checked by genam.c when
+ * a catalog scan takes place to verify that no shared catalogs are accessed.
+ */
+extern bool accessSharedCatalogsInDecoding;
+
/*****************************************************************************
* pdir.h -- *
* POSTGRES directory path definitions. *
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 842fcde67f9..917f3cff232 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -27,6 +27,7 @@ typedef struct OutputPluginOptions
{
OutputPluginOutputType output_type;
bool receive_rewrites;
+ bool need_shared_catalogs;
} OutputPluginOptions;
/*
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index a22a83a2f23..d02530a912a 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -92,7 +92,8 @@ extern void SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
XLogRecPtr lsn,
xl_heap_new_cid *xlrec);
extern void SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn,
- xl_running_xacts *running);
+ xl_running_xacts *running,
+ bool db_specific);
extern void SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn);
extern bool SnapBuildSnapshotExists(XLogRecPtr lsn);
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index d718a5b542f..ec89c448220 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -47,7 +47,7 @@ extern bool ProcArrayInstallImportedXmin(TransactionId xmin,
VirtualTransactionId *sourcevxid);
extern bool ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc);
-extern RunningTransactions GetRunningTransactionData(void);
+extern RunningTransactions GetRunningTransactionData(Oid dbid);
extern bool TransactionIdIsInProgress(TransactionId xid);
extern TransactionId GetOldestNonRemovableTransactionId(Relation rel);
diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
index 6a314c693cd..8715c08e94f 100644
--- a/src/include/storage/standby.h
+++ b/src/include/storage/standby.h
@@ -126,6 +126,7 @@ typedef enum
typedef struct RunningTransactionsData
{
+ Oid dbid; /* only track xacts in this database */
int xcnt; /* # of xact ids in xids[] */
int subxcnt; /* # of subxact ids in xids[] */
subxids_array_status subxid_status;
@@ -143,7 +144,7 @@ typedef RunningTransactionsData *RunningTransactions;
extern void LogAccessExclusiveLock(Oid dbOid, Oid relOid);
extern void LogAccessExclusiveLockPrepare(void);
-extern XLogRecPtr LogStandbySnapshot(void);
+extern XLogRecPtr LogStandbySnapshot(Oid dbid);
extern void LogStandbyInvalidations(int nmsgs, SharedInvalidationMessage *msgs,
bool relcacheInitFileInval);
diff --git a/src/include/storage/standbydefs.h b/src/include/storage/standbydefs.h
index 231d251fd51..e75b7078766 100644
--- a/src/include/storage/standbydefs.h
+++ b/src/include/storage/standbydefs.h
@@ -46,6 +46,7 @@ typedef struct xl_standby_locks
*/
typedef struct xl_running_xacts
{
+ Oid dbid; /* only track xacts in this database */
int xcnt; /* # of xact ids in xids[] */
int subxcnt; /* # of subxact ids in xids[] */
bool subxid_overflow; /* snapshot overflowed, subxids missing */
--
2.47.3
>From 8f2ad019a2dc82435bd26d6304f58b6b54b891bf Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=C3=81lvaro=20Herrera?= <[email protected]>
Date: Wed, 1 Apr 2026 19:54:14 +0200
Subject: [PATCH v56 2/3] Reserve replication slots specifically for REPACK
This allows REPACK to not interfere with other operations that use
replication slots. This eases configurability.
---
doc/src/sgml/config.sgml | 16 ++++
doc/src/sgml/ref/repack.sgml | 6 +-
src/backend/commands/repack.c | 2 +-
src/backend/commands/repack_worker.c | 7 +-
src/backend/replication/logical/launcher.c | 2 +-
src/backend/replication/logical/logical.c | 8 +-
.../replication/logical/logicalfuncs.c | 2 +-
src/backend/replication/logical/slotsync.c | 5 +-
src/backend/replication/slot.c | 84 ++++++++++++-------
src/backend/replication/slotfuncs.c | 19 +++--
src/backend/replication/walsender.c | 9 +-
src/backend/utils/misc/guc_parameters.dat | 8 ++
src/backend/utils/misc/postgresql.conf.sample | 2 +
src/include/replication/logical.h | 3 +-
src/include/replication/slot.h | 5 +-
15 files changed, 116 insertions(+), 62 deletions(-)
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 3324d2d3c49..3435732646e 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4639,6 +4639,22 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"' # Windows
</listitem>
</varlistentry>
+ <varlistentry id="guc-max-repack-replication-slots" xreflabel="max_repack_replication_slots">
+ <term><varname>max_repack_replication_slots</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>max_repack_replication_slots</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Specifies the maximum number of replication slots for use of
+ the <command>REPACK</command> command. The default is 5.
+ This parameter can only be set at server start.
+ </para>
+ </listitem>
+ </varlistentry>
+
+
<varlistentry id="guc-max-replication-slots" xreflabel="max_replication_slots">
<term><varname>max_replication_slots</varname> (<type>integer</type>)
<indexterm>
diff --git a/doc/src/sgml/ref/repack.sgml b/doc/src/sgml/ref/repack.sgml
index e993dfb3108..c532a39ee07 100644
--- a/doc/src/sgml/ref/repack.sgml
+++ b/doc/src/sgml/ref/repack.sgml
@@ -293,9 +293,9 @@ REPACK [ ( <replaceable class="parameter">option</replaceable> [, ...] ) ] USING
<listitem>
<para>
- The <link linkend="guc-max-replication-slots"><varname>max_replication_slots</varname></link>
- configuration parameter does not allow for creation of an additional
- replication slot.
+ The <link linkend="guc-max-repack-replication-slots"><varname>max_repack_replication_slots</varname></link>
+ configuration parameter does not allow for the creation of an
+ additional replication slot.
</para>
</listitem>
</itemizedlist>
diff --git a/src/backend/commands/repack.c b/src/backend/commands/repack.c
index 17b639b3b44..5e97fcf3818 100644
--- a/src/backend/commands/repack.c
+++ b/src/backend/commands/repack.c
@@ -3357,7 +3357,7 @@ start_repack_decoding_worker(Oid relid)
ereport(ERROR,
errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
errmsg("out of background worker slots"),
- errhint("You might need to increase \"%s\".", "max_worker_processes"));
+ errhint("You might need to increase \"%s\".", "max_repack_replication_slots"));
decoding_worker->seg = seg;
decoding_worker->error_mqh = mqh;
diff --git a/src/backend/commands/repack_worker.c b/src/backend/commands/repack_worker.c
index 94d034970b5..ea330310e27 100644
--- a/src/backend/commands/repack_worker.c
+++ b/src/backend/commands/repack_worker.c
@@ -212,7 +212,7 @@ repack_setup_logical_decoding(Oid relid)
* Make sure we can use logical decoding.
*/
CheckSlotPermissions();
- CheckLogicalDecodingRequirements();
+ CheckLogicalDecodingRequirements(true);
/*
* A single backend should not execute multiple REPACK commands at a time,
@@ -221,8 +221,8 @@ repack_setup_logical_decoding(Oid relid)
* RS_TEMPORARY so that the slot gets cleaned up on ERROR.
*/
snprintf(NameStr(slotname), NAMEDATALEN, "repack_%d", MyProcPid);
- ReplicationSlotCreate(NameStr(slotname), true, RS_TEMPORARY, false, false,
- false);
+ ReplicationSlotCreate(NameStr(slotname), true, RS_TEMPORARY, false, true,
+ false, false);
EnsureLogicalDecodingEnabled();
@@ -233,6 +233,7 @@ repack_setup_logical_decoding(Oid relid)
ctx = CreateInitDecodingContext(REPL_PLUGIN_NAME,
NIL,
true,
+ true,
InvalidXLogRecPtr,
XL_ROUTINE(.page_read = read_local_xlog_page,
.segment_open = wal_segment_open,
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 9e75a3e04ee..7adf4dbe0d1 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -1575,7 +1575,7 @@ CreateConflictDetectionSlot(void)
errmsg("creating replication conflict detection slot"));
ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false,
- false, false);
+ false, false, false);
init_conflict_slot_xmin();
}
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 8ceaf64d164..d8e02c53558 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -108,9 +108,9 @@ static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugi
* decoding.
*/
void
-CheckLogicalDecodingRequirements(void)
+CheckLogicalDecodingRequirements(bool repack)
{
- CheckSlotRequirements();
+ CheckSlotRequirements(repack);
/*
* NB: Adding a new requirement likely means that RestoreSlotFromDisk()
@@ -304,6 +304,7 @@ StartupDecodingContext(List *output_plugin_options,
* output_plugin_options -- contains options passed to the output plugin
* need_full_snapshot -- if true, must obtain a snapshot able to read all
* tables; if false, one that can read only catalogs is acceptable.
+ * for_repack -- if true, we're going to be decoding for REPACK.
* restart_lsn -- if given as invalid, it's this routine's responsibility to
* mark WAL as reserved by setting a convenient restart_lsn for the slot.
* Otherwise, we set for decoding to start from the given LSN without
@@ -324,6 +325,7 @@ LogicalDecodingContext *
CreateInitDecodingContext(const char *plugin,
List *output_plugin_options,
bool need_full_snapshot,
+ bool for_repack,
XLogRecPtr restart_lsn,
XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
@@ -340,7 +342,7 @@ CreateInitDecodingContext(const char *plugin,
* On a standby, this check is also required while creating the slot.
* Check the comments in the function.
*/
- CheckLogicalDecodingRequirements();
+ CheckLogicalDecodingRequirements(for_repack);
/* shorter lines... */
slot = MyReplicationSlot;
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 9760818941d..512013b0ef0 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -115,7 +115,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
CheckSlotPermissions();
- CheckLogicalDecodingRequirements();
+ CheckLogicalDecodingRequirements(false);
if (PG_ARGISNULL(0))
ereport(ERROR,
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 8b53bd3ac7f..ae900f13467 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -434,7 +434,7 @@ get_local_synced_slots(void)
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
- for (int i = 0; i < max_replication_slots; i++)
+ for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
@@ -823,6 +823,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid,
*/
ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY,
remote_slot->two_phase,
+ false,
remote_slot->failover,
true);
@@ -1707,7 +1708,7 @@ update_synced_slots_inactive_since(void)
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
- for (int i = 0; i < max_replication_slots; i++)
+ for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index a1f37e59dbc..e6722fc0212 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -160,6 +160,8 @@ ReplicationSlot *MyReplicationSlot = NULL;
/* GUC variables */
int max_replication_slots = 10; /* the maximum number of replication
* slots */
+int max_repack_replication_slots = 5; /* the maximum number of slots
+ * for REPACK */
/*
* Invalidate replication slots that have remained idle longer than this
@@ -199,12 +201,13 @@ ReplicationSlotsShmemRequest(void *arg)
{
Size size;
- if (max_replication_slots == 0)
+ if (max_replication_slots + max_repack_replication_slots == 0)
return;
size = offsetof(ReplicationSlotCtlData, replication_slots);
size = add_size(size,
- mul_size(max_replication_slots, sizeof(ReplicationSlot)));
+ mul_size(max_replication_slots + max_repack_replication_slots,
+ sizeof(ReplicationSlot)));
ShmemRequestStruct(.name = "ReplicationSlot Ctl",
.size = size,
.ptr = (void **) &ReplicationSlotCtl,
@@ -217,7 +220,7 @@ ReplicationSlotsShmemRequest(void *arg)
static void
ReplicationSlotsShmemInit(void *arg)
{
- for (int i = 0; i < max_replication_slots; i++)
+ for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
{
ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
@@ -366,6 +369,7 @@ IsSlotForConflictCheck(const char *name)
* db_specific: logical decoding is db specific; if the slot is going to
* be used for that pass true, otherwise false.
* two_phase: If enabled, allows decoding of prepared transactions.
+ * repack: If true, use a slot from the pool for REPACK.
* failover: If enabled, allows the slot to be synced to standbys so
* that logical replication can be resumed after failover.
* synced: True if the slot is synchronized from the primary server.
@@ -373,10 +377,11 @@ IsSlotForConflictCheck(const char *name)
void
ReplicationSlotCreate(const char *name, bool db_specific,
ReplicationSlotPersistency persistency,
- bool two_phase, bool failover, bool synced)
+ bool two_phase, bool repack, bool failover, bool synced)
{
ReplicationSlot *slot = NULL;
- int i;
+ int startpoint,
+ endpoint;
Assert(MyReplicationSlot == NULL);
@@ -425,12 +430,16 @@ ReplicationSlotCreate(const char *name, bool db_specific,
LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
/*
- * Check for name collision, and identify an allocatable slot. We need to
- * hold ReplicationSlotControlLock in shared mode for this, so that nobody
- * else can change the in_use flags while we're looking at them.
+ * Check for name collision (across the whole array), and identify an
+ * allocatable slot (in the array slice specific to our current use case:
+ * either general, or REPACK only). We need to hold
+ * ReplicationSlotControlLock in shared mode for this, so that nobody else
+ * can change the in_use flags while we're looking at them.
*/
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
- for (i = 0; i < max_replication_slots; i++)
+ startpoint = !repack ? 0 : max_replication_slots;
+ endpoint = max_replication_slots + (repack ? max_repack_replication_slots : 0);
+ for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
@@ -438,7 +447,9 @@ ReplicationSlotCreate(const char *name, bool db_specific,
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("replication slot \"%s\" already exists", name)));
- if (!s->in_use && slot == NULL)
+
+ if (i >= startpoint && i < endpoint &&
+ !s->in_use && slot == NULL)
slot = s;
}
LWLockRelease(ReplicationSlotControlLock);
@@ -448,7 +459,8 @@ ReplicationSlotCreate(const char *name, bool db_specific,
ereport(ERROR,
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
errmsg("all replication slots are in use"),
- errhint("Free one or increase \"max_replication_slots\".")));
+ errhint("Free one or increase \"%s\".",
+ repack ? "max_repack_replication_slots" : "max_replication_slots")));
/*
* Since this slot is not in use, nobody should be looking at any part of
@@ -541,7 +553,7 @@ SearchNamedReplicationSlot(const char *name, bool need_lock)
if (need_lock)
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
- for (i = 0; i < max_replication_slots; i++)
+ for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
@@ -569,7 +581,8 @@ int
ReplicationSlotIndex(ReplicationSlot *slot)
{
Assert(slot >= ReplicationSlotCtl->replication_slots &&
- slot < ReplicationSlotCtl->replication_slots + max_replication_slots);
+ slot < ReplicationSlotCtl->replication_slots +
+ (max_replication_slots + max_repack_replication_slots));
return slot - ReplicationSlotCtl->replication_slots;
}
@@ -863,7 +876,7 @@ ReplicationSlotCleanup(bool synced_only)
restart:
found_valid_logicalslot = false;
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
- for (i = 0; i < max_replication_slots; i++)
+ for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
@@ -1245,7 +1258,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
if (!already_locked)
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
- for (i = 0; i < max_replication_slots; i++)
+ for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
TransactionId effective_xmin;
@@ -1300,7 +1313,7 @@ ReplicationSlotsComputeRequiredLSN(void)
Assert(ReplicationSlotCtl != NULL);
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
- for (i = 0; i < max_replication_slots; i++)
+ for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
XLogRecPtr restart_lsn;
@@ -1367,12 +1380,12 @@ ReplicationSlotsComputeLogicalRestartLSN(void)
XLogRecPtr result = InvalidXLogRecPtr;
int i;
- if (max_replication_slots <= 0)
+ if (max_replication_slots + max_repack_replication_slots <= 0)
return InvalidXLogRecPtr;
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
- for (i = 0; i < max_replication_slots; i++)
+ for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
{
ReplicationSlot *s;
XLogRecPtr restart_lsn;
@@ -1447,11 +1460,11 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
*nslots = *nactive = 0;
- if (max_replication_slots <= 0)
+ if (max_replication_slots + max_repack_replication_slots <= 0)
return false;
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
- for (i = 0; i < max_replication_slots; i++)
+ for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
{
ReplicationSlot *s;
@@ -1508,13 +1521,13 @@ ReplicationSlotsDropDBSlots(Oid dboid)
bool found_valid_logicalslot;
bool dropped = false;
- if (max_replication_slots <= 0)
+ if (max_replication_slots + max_repack_replication_slots <= 0)
return;
restart:
found_valid_logicalslot = false;
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
- for (i = 0; i < max_replication_slots; i++)
+ for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
{
ReplicationSlot *s;
char *slotname;
@@ -1611,11 +1624,11 @@ CheckLogicalSlotExists(void)
{
bool found = false;
- if (max_replication_slots <= 0)
+ if (max_replication_slots + max_repack_replication_slots <= 0)
return false;
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
- for (int i = 0; i < max_replication_slots; i++)
+ for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
{
ReplicationSlot *s;
bool invalidated;
@@ -1649,17 +1662,24 @@ CheckLogicalSlotExists(void)
* slots.
*/
void
-CheckSlotRequirements(void)
+CheckSlotRequirements(bool repack)
{
/*
* NB: Adding a new requirement likely means that RestoreSlotFromDisk()
* needs the same check.
*/
- if (max_replication_slots == 0)
+ if (!repack && max_replication_slots == 0)
ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("replication slots can only be used if \"max_replication_slots\" > 0")));
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("replication slots can only be used if \"%s\" > 0",
+ "max_replication_slots"));
+
+ if (repack && max_repack_replication_slots == 0)
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("REPACK can only be used if \"%s\" > 0",
+ "max_repack_replication_slots"));
if (wal_level < WAL_LEVEL_REPLICA)
ereport(ERROR,
@@ -2210,7 +2230,7 @@ InvalidateObsoleteReplicationSlots(uint32 possible_causes,
Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0);
Assert(possible_causes != RS_INVAL_NONE);
- if (max_replication_slots == 0)
+ if (max_replication_slots == 0 && max_repack_replication_slots == 0)
return invalidated;
XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
@@ -2218,7 +2238,7 @@ InvalidateObsoleteReplicationSlots(uint32 possible_causes,
restart:
found_valid_logicalslot = false;
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
- for (int i = 0; i < max_replication_slots; i++)
+ for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
bool released_lock = false;
@@ -2323,7 +2343,7 @@ CheckPointReplicationSlots(bool is_shutdown)
*/
LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
- for (i = 0; i < max_replication_slots; i++)
+ for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
char path[MAXPGPATH];
@@ -2424,7 +2444,7 @@ StartupReplicationSlots(void)
FreeDir(replication_dir);
/* currently no slots exist, we're done. */
- if (max_replication_slots <= 0)
+ if (max_replication_slots + max_repack_replication_slots <= 0)
return;
/* Now that we have recovered all the data, compute replication xmin */
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 9f5e4f998fe..16fbd383735 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -53,7 +53,7 @@ create_physical_replication_slot(char *name, bool immediately_reserve,
/* acquire replication slot, this will check for conflicting names */
ReplicationSlotCreate(name, false,
temporary ? RS_TEMPORARY : RS_PERSISTENT, false,
- false, false);
+ false, false, false);
if (immediately_reserve)
{
@@ -90,7 +90,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
CheckSlotPermissions();
- CheckSlotRequirements();
+ CheckSlotRequirements(false);
create_physical_replication_slot(NameStr(*name),
immediately_reserve,
@@ -146,7 +146,7 @@ create_logical_replication_slot(char *name, char *plugin,
*/
ReplicationSlotCreate(name, true,
temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase,
- failover, false);
+ false, failover, false);
/*
* Ensure the logical decoding is enabled before initializing the logical
@@ -164,6 +164,7 @@ create_logical_replication_slot(char *name, char *plugin,
*/
ctx = CreateInitDecodingContext(plugin, NIL,
false, /* just catalogs is OK */
+ false, /* not repack */
restart_lsn,
XL_ROUTINE(.page_read = read_local_xlog_page,
.segment_open = wal_segment_open,
@@ -203,7 +204,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
CheckSlotPermissions();
- CheckLogicalDecodingRequirements();
+ CheckLogicalDecodingRequirements(false);
create_logical_replication_slot(NameStr(*name),
NameStr(*plugin),
@@ -240,7 +241,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
CheckSlotPermissions();
- CheckSlotRequirements();
+ CheckSlotRequirements(false);
ReplicationSlotDrop(NameStr(*name), true);
@@ -270,7 +271,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
currlsn = GetXLogWriteRecPtr();
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
- for (slotno = 0; slotno < max_replication_slots; slotno++)
+ for (slotno = 0; slotno < max_replication_slots + max_repack_replication_slots; slotno++)
{
ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
ReplicationSlot slot_contents;
@@ -648,9 +649,9 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
CheckSlotPermissions();
if (logical_slot)
- CheckLogicalDecodingRequirements();
+ CheckLogicalDecodingRequirements(false);
else
- CheckSlotRequirements();
+ CheckSlotRequirements(false);
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
@@ -665,7 +666,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
* managed to create the new slot, we advance the new slot's restart_lsn
* to the source slot's updated restart_lsn the second time we lock it.
*/
- for (int i = 0; i < max_replication_slots; i++)
+ for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index b4a2117a7f9..bad45adb004 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1241,7 +1241,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
{
ReplicationSlotCreate(cmd->slotname, false,
cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT,
- false, false, false);
+ false, false, false, false);
if (reserve_wal)
{
@@ -1261,7 +1261,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Assert(cmd->kind == REPLICATION_KIND_LOGICAL);
- CheckLogicalDecodingRequirements();
+ CheckLogicalDecodingRequirements(false);
/*
* Initially create persistent slot as ephemeral - that allows us to
@@ -1272,7 +1272,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
*/
ReplicationSlotCreate(cmd->slotname, true,
cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
- two_phase, failover, false);
+ two_phase, false, failover, false);
/*
* Do options check early so that we can bail before calling the
@@ -1330,6 +1330,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Assert(IsLogicalDecodingEnabled());
ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
+ false,
InvalidXLogRecPtr,
XL_ROUTINE(.page_read = logical_read_xlog_page,
.segment_open = WalSndSegmentOpen,
@@ -1487,7 +1488,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
QueryCompletion qc;
/* make sure that our requirements are still fulfilled */
- CheckLogicalDecodingRequirements();
+ CheckLogicalDecodingRequirements(false);
Assert(!MyReplicationSlot);
diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat
index fcb6ab80583..632f3ba4989 100644
--- a/src/backend/utils/misc/guc_parameters.dat
+++ b/src/backend/utils/misc/guc_parameters.dat
@@ -2079,6 +2079,14 @@
max => 'MAX_BACKENDS',
},
+{ name => 'max_repack_replication_slots', type => 'int', context => 'PGC_POSTMASTER', group => 'REPLICATION_SENDING',
+ short_desc => 'Sets the maximum number of replication slots for use by REPACK.',
+ variable => 'max_repack_replication_slots',
+ boot_val => '5',
+ min => '0',
+ max => 'MAX_BACKENDS',
+},
+
/* see max_wal_senders */
{ name => 'max_replication_slots', type => 'int', context => 'PGC_POSTMASTER', group => 'REPLICATION_SENDING',
short_desc => 'Sets the maximum number of simultaneously defined replication slots.',
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index e3e462f3efb..2e10eb4a36a 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -348,6 +348,8 @@
# (change requires restart)
#max_replication_slots = 10 # max number of replication slots
# (change requires restart)
+#max_repack_replication_slots = 5 # max number of replication slots for REPACK
+ # (change requires restart)
#wal_keep_size = 0 # in megabytes; 0 disables
#max_slot_wal_keep_size = -1 # in megabytes; -1 disables
#idle_replication_slot_timeout = 0 # in seconds; 0 disables
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index bc9d4ece672..bc075b16741 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -115,11 +115,12 @@ typedef struct LogicalDecodingContext
} LogicalDecodingContext;
-extern void CheckLogicalDecodingRequirements(void);
+extern void CheckLogicalDecodingRequirements(bool repack);
extern LogicalDecodingContext *CreateInitDecodingContext(const char *plugin,
List *output_plugin_options,
bool need_full_snapshot,
+ bool for_repack,
XLogRecPtr restart_lsn,
XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 1a3557de607..77c8d0975b6 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -324,13 +324,14 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot;
/* GUCs */
extern PGDLLIMPORT int max_replication_slots;
+extern PGDLLIMPORT int max_repack_replication_slots;
extern PGDLLIMPORT char *synchronized_standby_slots;
extern PGDLLIMPORT int idle_replication_slot_timeout_secs;
/* management of individual slots */
extern void ReplicationSlotCreate(const char *name, bool db_specific,
ReplicationSlotPersistency persistency,
- bool two_phase, bool failover,
+ bool two_phase, bool repack, bool failover,
bool synced);
extern void ReplicationSlotPersist(void);
extern void ReplicationSlotDrop(const char *name, bool nowait);
@@ -373,7 +374,7 @@ extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname
extern void StartupReplicationSlots(void);
extern void CheckPointReplicationSlots(bool is_shutdown);
-extern void CheckSlotRequirements(void);
+extern void CheckSlotRequirements(bool repack);
extern void CheckSlotPermissions(void);
extern ReplicationSlotInvalidationCause
GetSlotInvalidationCause(const char *cause_name);
--
2.47.3
>From 4303eea0a72408183f9f5afcf8d2801df20f8ffe Mon Sep 17 00:00:00 2001
From: Antonin Houska <[email protected]>
Date: Wed, 1 Apr 2026 17:35:47 +0200
Subject: [PATCH v56 3/3] Error out any process that would block at REPACK
Any process waiting on REPACK to release its lock would actually cause
it to deadlock when it tries to upgrade its lock to AEL, losing all work
done to that point. We avoid this by teaching the deadlock detector to
raise an error when this condition is detected.
---
src/backend/commands/repack.c | 47 ++++++++---
src/backend/storage/lmgr/deadlock.c | 15 ++++
src/include/storage/proc.h | 6 +-
src/test/modules/injection_points/Makefile | 1 +
.../expected/repack_deadlock.out | 63 ++++++++++++++
src/test/modules/injection_points/meson.build | 1 +
.../specs/repack_deadlock.spec | 83 +++++++++++++++++++
7 files changed, 201 insertions(+), 15 deletions(-)
create mode 100644 src/test/modules/injection_points/expected/repack_deadlock.out
create mode 100644 src/test/modules/injection_points/specs/repack_deadlock.spec
diff --git a/src/backend/commands/repack.c b/src/backend/commands/repack.c
index 5e97fcf3818..4dae0e8ebe0 100644
--- a/src/backend/commands/repack.c
+++ b/src/backend/commands/repack.c
@@ -285,6 +285,18 @@ ExecRepack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel)
* to understand and we don't lose any functionality.
*/
PreventInTransactionBlock(isTopLevel, "REPACK (CONCURRENTLY)");
+
+ /*
+ * Also set the PROC_IN_CONCURRENT_REPACK flag. This makes the
+ * deadlock checker cause anyone that would conflict with us to error
+ * out. It's important to set this flag ahead of actually locking the
+ * relation; it won't of course affect anyone until we do have a lock
+ * that others can conflict with.
+ */
+ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+ MyProc->statusFlags |= PROC_IN_CONCURRENT_REPACK;
+ ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
+ LWLockRelease(ProcArrayLock);
}
/*
@@ -489,11 +501,8 @@ RepackLockLevel(bool concurrent)
* If indexOid is InvalidOid, the table will be rewritten in physical order
* instead of index order.
*
- * Note that, in the concurrent case, the function releases the lock at some
- * point, in order to get AccessExclusiveLock for the final steps (i.e. to
- * swap the relation files). To make things simpler, the caller should expect
- * OldHeap to be closed on return, regardless CLUOPT_CONCURRENT. (The
- * AccessExclusiveLock is kept till the end of the transaction.)
+ * On return, OldHeap is closed but locked with AccessExclusiveLock - the lock
+ * will be released at end of the transaction.
*
* 'cmd' indicates which command is being executed, to be used for error
* messages.
@@ -1002,10 +1011,8 @@ rebuild_relation(Relation OldHeap, Relation index, bool verbose,
* Note that the worker has to wait for all transactions with XID
* already assigned to finish. If some of those transactions is
* waiting for a lock conflicting with ShareUpdateExclusiveLock on our
- * table (e.g. it runs CREATE INDEX), we can end up in a deadlock.
- * Not sure this risk is worth unlocking/locking the table (and its
- * clustering index) and checking again if it's still eligible for
- * REPACK CONCURRENTLY.
+ * table (e.g. it runs CREATE INDEX), it should encounter ERROR in the
+ * deadlock checking code.
*/
start_repack_decoding_worker(tableOid);
@@ -3090,7 +3097,19 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
LockRelationOid(OldHeap->rd_rel->reltoastrelid, AccessExclusiveLock);
/*
- * Tuples and pages of the old heap will be gone, but the heap will stay.
+ * Now that we have all access-exclusive locks on all relations, we no
+ * longer want other processes to error out when trying to acquire a
+ * conflicting lock. Therefore, unset our flag.
+ */
+ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+ MyProc->statusFlags &= ~PROC_IN_CONCURRENT_REPACK;
+ ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
+ LWLockRelease(ProcArrayLock);
+
+ /*
+ * Tuples and pages of the old heap will be gone, but the heap itself will
+ * stay. In order for predicate locks to continue to work, convert them
+ * to relation-level locks. We do this both for table and indexes.
*/
TransferPredicateLocksToHeapRelation(OldHeap);
foreach_ptr(RelationData, index, indexrels)
@@ -3364,9 +3383,11 @@ start_repack_decoding_worker(Oid relid)
/*
* The decoding setup must be done before the caller can have XID assigned
- * for any reason, otherwise the worker might end up in a deadlock,
- * waiting for the caller's transaction to end. Therefore wait here until
- * the worker indicates that it has the logical decoding initialized.
+ * for any reason, otherwise the worker might end up waiting for the
+ * caller's transaction to end. (Deadlock detector does not consider this
+ * a conflict because the worker is in the same locking group as the
+ * backend that launched it.) Therefore wait here until the worker
+ * indicates that it has the logical decoding initialized.
*/
ConditionVariablePrepareToSleep(&shared->cv);
for (;;)
diff --git a/src/backend/storage/lmgr/deadlock.c b/src/backend/storage/lmgr/deadlock.c
index b8962d875b6..c20ac682b0d 100644
--- a/src/backend/storage/lmgr/deadlock.c
+++ b/src/backend/storage/lmgr/deadlock.c
@@ -620,6 +620,21 @@ FindLockCycleRecurseMember(PGPROC *checkProc,
proc->statusFlags & PROC_IS_AUTOVACUUM)
blocking_autovacuum_proc = proc;
+ /*
+ * Similarly, if we note that we're blocked by some
+ * process running REPACK (CONCURRENTLY), just fail. That
+ * process is going to upgrade its lock at some point, and
+ * it would be inappropriate for any other process to
+ * cause that to fail.
+ */
+ if (checkProc == MyProc &&
+ proc->statusFlags & PROC_IN_CONCURRENT_REPACK)
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_IN_USE),
+ errmsg("could not wait for concurrent REPACK"),
+ errdetail("Process %d waits for REPACK running on process %d",
+ MyProc->pid, proc->pid));
+
/* We're done looking at this proclock */
break;
}
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 3e1d1fad5f9..76c6bb44251 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -70,10 +70,12 @@ struct XidCache
#define PROC_AFFECTS_ALL_HORIZONS 0x20 /* this proc's xmin must be
* included in vacuum horizons
* in all databases */
+#define PROC_IN_CONCURRENT_REPACK 0x40 /* REPACK (CONCURRENTLY) */
-/* flags reset at EOXact */
+/* flags reset at EOXact. A bit of a misnomer ... */
#define PROC_VACUUM_STATE_MASK \
- (PROC_IN_VACUUM | PROC_IN_SAFE_IC | PROC_VACUUM_FOR_WRAPAROUND)
+ (PROC_IN_VACUUM | PROC_IN_SAFE_IC | PROC_VACUUM_FOR_WRAPAROUND | \
+ PROC_IN_CONCURRENT_REPACK)
/*
* Xmin-related flags. Make sure any flags that affect how the process' Xmin
diff --git a/src/test/modules/injection_points/Makefile b/src/test/modules/injection_points/Makefile
index 2cd7d87c533..f7663859fe2 100644
--- a/src/test/modules/injection_points/Makefile
+++ b/src/test/modules/injection_points/Makefile
@@ -15,6 +15,7 @@ REGRESS_OPTS = --dlpath=$(top_builddir)/src/test/regress
ISOLATION = basic \
inplace \
repack \
+ repack_deadlock \
repack_toast \
syscache-update-pruned \
heap_lock_update
diff --git a/src/test/modules/injection_points/expected/repack_deadlock.out b/src/test/modules/injection_points/expected/repack_deadlock.out
new file mode 100644
index 00000000000..a86e4767536
--- /dev/null
+++ b/src/test/modules/injection_points/expected/repack_deadlock.out
@@ -0,0 +1,63 @@
+Parsed test spec with 2 sessions
+
+starting permutation: wait_before_lock add_column wakeup_before_lock check1
+injection_points_attach
+-----------------------
+
+(1 row)
+
+step wait_before_lock:
+ REPACK (CONCURRENTLY) repack_deadlock USING INDEX repack_deadlock_pkey;
+ <waiting ...>
+step add_column:
+ alter table repack_deadlock add column noise text;
+ <waiting ...>
+step add_column: <... completed>
+ERROR: could not wait for concurrent REPACK
+step wakeup_before_lock:
+ SELECT injection_points_wakeup('repack-concurrently-before-lock');
+
+injection_points_wakeup
+-----------------------
+
+(1 row)
+
+step wait_before_lock: <... completed>
+step check1:
+ INSERT INTO relfilenodes(node)
+ SELECT relfilenode FROM pg_class WHERE relname='repack_deadlock';
+
+ SELECT count(DISTINCT node) FROM relfilenodes;
+
+ SELECT i, j FROM repack_deadlock ORDER BY i, j;
+
+ INSERT INTO data_s1(i, j)
+ SELECT i, j FROM repack_deadlock;
+
+ SELECT count(*)
+ FROM data_s1 d1 FULL JOIN data_s2 d2 USING (i, j)
+ WHERE d1.i ISNULL OR d2.i ISNULL;
+
+count
+-----
+ 1
+(1 row)
+
+i|j
+-+-
+1|1
+2|2
+3|3
+4|4
+(4 rows)
+
+count
+-----
+ 4
+(1 row)
+
+injection_points_detach
+-----------------------
+
+(1 row)
+
diff --git a/src/test/modules/injection_points/meson.build b/src/test/modules/injection_points/meson.build
index a414abb924b..1cd88d6db65 100644
--- a/src/test/modules/injection_points/meson.build
+++ b/src/test/modules/injection_points/meson.build
@@ -46,6 +46,7 @@ tests += {
'basic',
'inplace',
'repack',
+ 'repack_deadlock',
'repack_toast',
'syscache-update-pruned',
'heap_lock_update',
diff --git a/src/test/modules/injection_points/specs/repack_deadlock.spec b/src/test/modules/injection_points/specs/repack_deadlock.spec
new file mode 100644
index 00000000000..9d23a6588c2
--- /dev/null
+++ b/src/test/modules/injection_points/specs/repack_deadlock.spec
@@ -0,0 +1,83 @@
+# Test REPACK with a concurrent transaction that would cause a deadlock
+setup
+{
+ CREATE EXTENSION injection_points;
+
+ CREATE TABLE repack_deadlock(i int PRIMARY KEY, j int);
+ INSERT INTO repack_deadlock(i, j) VALUES (1, 1), (2, 2), (3, 3), (4, 4);
+
+ CREATE TABLE relfilenodes(node oid);
+
+ CREATE TABLE data_s1(i int, j int);
+ CREATE TABLE data_s2(i int, j int);
+}
+
+teardown
+{
+ DROP TABLE repack_deadlock;
+ DROP EXTENSION injection_points;
+
+ DROP TABLE relfilenodes;
+ DROP TABLE data_s1;
+ DROP TABLE data_s2;
+}
+
+session s1
+setup
+{
+ SELECT injection_points_set_local();
+ SELECT injection_points_attach('repack-concurrently-before-lock', 'wait');
+}
+# Perform the initial load and wait for s2 to do some data changes.
+step wait_before_lock
+{
+ REPACK (CONCURRENTLY) repack_deadlock USING INDEX repack_deadlock_pkey;
+}
+# Check the table from the perspective of s1.
+#
+# Besides the contents, we also check that relfilenode has changed.
+
+# Have each session write the contents into a table and use FULL JOIN to check
+# if the outputs are identical.
+step check1
+{
+ INSERT INTO relfilenodes(node)
+ SELECT relfilenode FROM pg_class WHERE relname='repack_deadlock';
+
+ SELECT count(DISTINCT node) FROM relfilenodes;
+
+ SELECT i, j FROM repack_deadlock ORDER BY i, j;
+
+ INSERT INTO data_s1(i, j)
+ SELECT i, j FROM repack_deadlock;
+
+ SELECT count(*)
+ FROM data_s1 d1 FULL JOIN data_s2 d2 USING (i, j)
+ WHERE d1.i ISNULL OR d2.i ISNULL;
+}
+teardown
+{
+ SELECT injection_points_detach('repack-concurrently-before-lock');
+}
+
+session s2
+# Change the existing data. UPDATE changes both key and non-key columns. Also
+# update one row twice to test whether tuple version generated by this session
+# can be found.
+step add_column
+{
+ alter table repack_deadlock add column noise text;
+}
+
+step wakeup_before_lock
+{
+ SELECT injection_points_wakeup('repack-concurrently-before-lock');
+}
+
+# Test if data changes introduced while one session is performing REPACK
+# CONCURRENTLY find their way into the table.
+permutation
+ wait_before_lock
+ add_column
+ wakeup_before_lock
+ check1
--
2.47.3