Alvaro Herrera <[email protected]> wrote:
> So I've been trying to understand the "Introduce an option to make
> logical replication database specific." patch and I have to confess I
> just cannot.
>
> As far as I can read, the point is that if we reach
> SnapBuildProcessRunningXacts() when db_specific is true (which means
> standby_decode is called in an output plugin that has set
> need_shared_catalogs to false), _and_ we've not reached consistent state
> yet, then we'll call LogStandbySnapshot with our DB oid to emit a new
> xl_running_xacts message.
Right.
> So the WAL-decoding process emits WAL. I don't know if in normal
> conditions logical decoding processes emit WAL. If this is exceptional,
> I think we should add a comment.
Emitting WAL in logical decoding is not exceptional: SnapBuildWaitSnapshot()
already calls LogStandbySnapshot(), in order to get to the next phase.
> Now, this additional WAL message will be processed by all other
> processes decoding WAL. Perhaps it will ignored by most of them.
Right. That's one thing that I realized late yesterday, after having posted
the latest version of the patch. In SnapBuildProcessRunningXacts(), we need
if (OidIsValid(running->dbid))
return;
rather than
if (db_specific)
return;
because other backends can also generate their database-specific records.
> But
> most importantly, it will also reach back to ourselves, at which point
> we can hopefully use it to see that we might have reached consistent
> state within our database. Then we know our snapshot is ready to be
> used.
>
> Is this correct?
Yes.
> I think the reason it's safe to skip a lot of the processing caused by
> this additional process, is that xl_running_xacts messages are also
> emitted in other places in a non-database specific manner. So all the
> other placecs that are emitting that message continue to exist and
> cause logical-decoders operate in the same way as before.
Yes. I'm still thinking though if, after having used the database-specific
record to reach CONSITENT, we sould enforce one cluster-wide record, so that
the cleanup (in "our backend") takes place sooner rather than later. Not sure
about that.
> I think we should sprinkle lots of comments in several places about
> this. For example, I propose that standby_redo() should have something
> like
>
> * If 'dbid' is valid, only gather transactions running in that database.
> + * Such records should not be the only ones emitted, because this has
> + * potentially dangerous side-effects which makes some places ignore them:
> + *
> + * 1. SnapBuildProcessRunningXacts will skip computing the xmin and restart
> + * point from its input record if the record's xmin is older that the
> + * snapbuilder's current xmin; this should normally be fine because that
> + * information will be updated from other xl_running_xacts records.
> + * 2. standby_redo will likewise skip processing such a record
> *
> (are there other things that should be mentioned?)
I added something like that, but - due to the reference to
SnapBuildProcessRunningXacts() - less verbose about snapbuild.c.
> Also, LogStandbySnapshot() should have a comment explaining that passing
> a valid dboid is a weird corner case which is to be used with care, and
> that functions X Y and Z are going to ignore snapshots carrying a valid
> dbid.
One more check added in standby_decode() (and mentioned in that function in
the comment).
> Why do we call SnapBuildFindSnapshot() to do this, instead of doing it
> directly in SnapBuildProcessRunningXacts? Seems like it would be more
> straightforward.
Yes, fixed.
One more problem I found when testing w/o background worker
(contrib/test_decoding) was that accessSharedCatalogsInDecoding was not set
back to true. Both AllocateSnapshotBuilder() FreeSnapshotBuilder() do that
now.
--
Antonin Houska
Web: https://www.cybertec-postgresql.com
>From f04837b6958ea486dbc256ed7f8eb377eace67cf Mon Sep 17 00:00:00 2001
From: Antonin Houska <[email protected]>
Date: Mon, 6 Apr 2026 10:49:18 +0200
Subject: [PATCH] Introduce an option to make logical replication database
specific.
By default, the logical decoding assumes access to shared catalogs, so the
snapshot builder needs to consider cluster-wide XIDs during startup. That in
turn means that, if any transaction is already running (and has XID assigned),
the snapshot builder needs to wait for its completion, as it does not know if
that transaction performed catalog changes earlier.
Possible problem with this concept is that if REPACK (CONCURRENTLY) is running
in some database, backends running the same command in other databases get
stuck until the first one has committed. Thus only as single backend in the
cluster can run REPACK (CONCURRENTLY) at any time. Likewise, REPACK
(CONCURRENTLY) can block walsenders starting on behalf of subscriptions
throughout the cluster.
This patch adds a new option to logical replication output plugin, to declare
that it does not use shared catalogs (i.e. catalogs that can be changed by
transactions running in other databases in the cluster). In that case, no
snapshot the backend will use during the decoding needs to contain information
about transactions running in other databases. Thus the snapshot builder only
needs to wait for completion of transactions in the current database.
Currently we only use this option in the REPACK background worker. It could
possibly be used in the plugin for logical replication too, however that would
need thorough analysis of that plugin.
The patch bumps WAL version number, due to a new field in xl_running_xacts.
---
contrib/pg_visibility/pg_visibility.c | 4 +-
doc/src/sgml/logicaldecoding.sgml | 4 ++
src/backend/access/index/genam.c | 8 +++
src/backend/access/rmgrdesc/standbydesc.c | 2 +
src/backend/access/transam/xlog.c | 2 +-
src/backend/access/transam/xlogfuncs.c | 2 +-
src/backend/postmaster/bgwriter.c | 2 +-
src/backend/replication/logical/decode.c | 16 +++++-
src/backend/replication/logical/logical.c | 3 +
src/backend/replication/logical/snapbuild.c | 64 ++++++++++++++++++++-
src/backend/replication/slot.c | 2 +-
src/backend/storage/ipc/procarray.c | 23 +++++++-
src/backend/storage/ipc/standby.c | 24 +++++++-
src/include/access/genam.h | 7 +++
src/include/access/xlog_internal.h | 2 +-
src/include/replication/output_plugin.h | 1 +
src/include/replication/snapbuild.h | 3 +-
src/include/storage/procarray.h | 2 +-
src/include/storage/standby.h | 3 +-
src/include/storage/standbydefs.h | 1 +
20 files changed, 157 insertions(+), 18 deletions(-)
diff --git a/contrib/pg_visibility/pg_visibility.c b/contrib/pg_visibility/pg_visibility.c
index dfab0b64cf5..d564bd2a00c 100644
--- a/contrib/pg_visibility/pg_visibility.c
+++ b/contrib/pg_visibility/pg_visibility.c
@@ -621,7 +621,7 @@ GetStrictOldestNonRemovableTransactionId(Relation rel)
else if (rel == NULL || rel->rd_rel->relisshared)
{
/* Shared relation: take into account all running xids */
- runningTransactions = GetRunningTransactionData();
+ runningTransactions = GetRunningTransactionData(InvalidOid);
LWLockRelease(ProcArrayLock);
LWLockRelease(XidGenLock);
return runningTransactions->oldestRunningXid;
@@ -632,7 +632,7 @@ GetStrictOldestNonRemovableTransactionId(Relation rel)
* Normal relation: take into account xids running within the current
* database
*/
- runningTransactions = GetRunningTransactionData();
+ runningTransactions = GetRunningTransactionData(InvalidOid);
LWLockRelease(ProcArrayLock);
LWLockRelease(XidGenLock);
return runningTransactions->oldestDatabaseRunningXid;
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 6dc49108997..28089d1053c 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -959,6 +959,7 @@ typedef struct OutputPluginOptions
{
OutputPluginOutputType output_type;
bool receive_rewrites;
+ bool need_shared_catalogs;
} OutputPluginOptions;
</programlisting>
<literal>output_type</literal> has to either be set to
@@ -969,6 +970,9 @@ typedef struct OutputPluginOptions
also be called for changes made by heap rewrites during certain DDL
operations. These are of interest to plugins that handle DDL
replication, but they require special handling.
+ <literal>need_shared_catalogs</literal> can be set to false if you are
+ sure the plugin functions do not access shared system catalogs. It can
+ speed up creation of replication slots that use this plugin.
</para>
<para>
diff --git a/src/backend/access/index/genam.c b/src/backend/access/index/genam.c
index 1408989c568..44df2605068 100644
--- a/src/backend/access/index/genam.c
+++ b/src/backend/access/index/genam.c
@@ -394,6 +394,14 @@ systable_beginscan(Relation heapRelation,
SysScanDesc sysscan;
Relation irel;
+ /*
+ * If this backend promised that it won't access shared catalogs during
+ * logical decoding, this seems to be the right place to check.
+ */
+ Assert(!HistoricSnapshotActive() ||
+ accessSharedCatalogsInDecoding ||
+ !heapRelation->rd_rel->relisshared);
+
if (indexOK &&
!IgnoreSystemIndexes &&
!ReindexIsProcessingIndex(indexId))
diff --git a/src/backend/access/rmgrdesc/standbydesc.c b/src/backend/access/rmgrdesc/standbydesc.c
index 0a291354ae2..685d1bdb024 100644
--- a/src/backend/access/rmgrdesc/standbydesc.c
+++ b/src/backend/access/rmgrdesc/standbydesc.c
@@ -41,6 +41,8 @@ standby_desc_running_xacts(StringInfo buf, xl_running_xacts *xlrec)
for (i = 0; i < xlrec->subxcnt; i++)
appendStringInfo(buf, " %u", xlrec->xids[xlrec->xcnt + i]);
}
+
+ appendStringInfo(buf, "; dbid: %u", xlrec->dbid);
}
void
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index b82af9a85c0..3f08a832ca6 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7735,7 +7735,7 @@ CreateCheckPoint(int flags)
* recovery we don't need to write running xact data.
*/
if (!shutdown && XLogStandbyInfoActive())
- LogStandbySnapshot();
+ LogStandbySnapshot(InvalidOid);
START_CRIT_SECTION();
diff --git a/src/backend/access/transam/xlogfuncs.c b/src/backend/access/transam/xlogfuncs.c
index 65bbaeda59c..0f5979691e6 100644
--- a/src/backend/access/transam/xlogfuncs.c
+++ b/src/backend/access/transam/xlogfuncs.c
@@ -245,7 +245,7 @@ pg_log_standby_snapshot(PG_FUNCTION_ARGS)
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("pg_log_standby_snapshot() can only be used if \"wal_level\" >= \"replica\"")));
- recptr = LogStandbySnapshot();
+ recptr = LogStandbySnapshot(InvalidOid);
/*
* As a convenience, return the WAL location of the last inserted record
diff --git a/src/backend/postmaster/bgwriter.c b/src/backend/postmaster/bgwriter.c
index 1d8947774a9..a30de4262eb 100644
--- a/src/backend/postmaster/bgwriter.c
+++ b/src/backend/postmaster/bgwriter.c
@@ -289,7 +289,7 @@ BackgroundWriterMain(const void *startup_data, size_t startup_data_len)
if (now >= timeout &&
last_snapshot_lsn <= GetLastImportantRecPtr())
{
- last_snapshot_lsn = LogStandbySnapshot();
+ last_snapshot_lsn = LogStandbySnapshot(InvalidOid);
last_snapshot_ts = now;
}
}
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 57aaef57c61..49a59605515 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -381,7 +381,15 @@ standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
xl_running_xacts *running = (xl_running_xacts *) XLogRecGetData(r);
- SnapBuildProcessRunningXacts(builder, buf->origptr, running);
+
+ /*
+ * If the plugin does not access shared catalogs, only keep
+ * track of transactions in the current database. This is
+ * currently the only use case for database-specific running
+ * xacts record.
+ */
+ SnapBuildProcessRunningXacts(builder, buf->origptr, running,
+ !ctx->options.need_shared_catalogs);
/*
* Abort all transactions that we keep track of, that are
@@ -391,8 +399,12 @@ standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
* all running transactions which includes prepared ones,
* while shutdown checkpoints just know that no non-prepared
* transactions are in progress.
+ *
+ * The database-specific records might work here too, but it's
+ * not their purpose.
*/
- ReorderBufferAbortOld(ctx->reorder, running->oldestRunningXid);
+ if (!OidIsValid(running->dbid))
+ ReorderBufferAbortOld(ctx->reorder, running->oldestRunningXid);
}
break;
case XLOG_STANDBY_LOCK:
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index f20d0c542f3..8ceaf64d164 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -285,6 +285,9 @@ StartupDecodingContext(List *output_plugin_options,
ctx->write = do_write;
ctx->update_progress = update_progress;
+ /* Assume shared catalog access. The startup callback can change it. */
+ ctx->options.need_shared_catalogs = true;
+
ctx->output_plugin_options = output_plugin_options;
ctx->fast_forward = fast_forward;
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index b4269a3b102..a10e8c0ca77 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -125,6 +125,7 @@
#include <sys/stat.h>
#include <unistd.h>
+#include "access/genam.h"
#include "access/heapam_xlog.h"
#include "access/transam.h"
#include "access/xact.h"
@@ -154,6 +155,14 @@
static ResourceOwner SavedResourceOwnerDuringExport = NULL;
static bool ExportInProgress = false;
+/*
+ * If a backend is going to do logical decoding and the output plugin does
+ * not need to access shared catalogs, setting this variable to false can make
+ * the decoding startup faster. In particular, the backend will not need to
+ * wait for completion of already running transactions in other databases.
+ */
+bool accessSharedCatalogsInDecoding = true;
+
/* ->committed and ->catchange manipulation */
static void SnapBuildPurgeOlderTxn(SnapBuild *builder);
@@ -170,7 +179,8 @@ static inline bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, Transaction
uint32 xinfo);
/* xlog reading helper functions for SnapBuildProcessRunningXacts */
-static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running);
+static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn,
+ xl_running_xacts *running);
static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff);
/* serialization functions */
@@ -226,6 +236,9 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
MemoryContextSwitchTo(oldcontext);
+ /* The default is that shared catalog are used. */
+ accessSharedCatalogsInDecoding = true;
+
return builder;
}
@@ -244,6 +257,9 @@ FreeSnapshotBuilder(SnapBuild *builder)
builder->snapshot = NULL;
}
+ /* The default is that shared catalog are used. */
+ accessSharedCatalogsInDecoding = true;
+
/* other resources are deallocated via memory context reset */
MemoryContextDelete(context);
}
@@ -1136,7 +1152,8 @@ SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
* anymore.
*/
void
-SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
+SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running,
+ bool db_specific)
{
ReorderBufferTXN *txn;
TransactionId xmin;
@@ -1148,6 +1165,33 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
*/
if (builder->state < SNAPBUILD_CONSISTENT)
{
+ /*
+ * To reduce the potential for unnecessarily waiting for completion of
+ * unrelated transactions, the caller can declare that only
+ * transactions of the current database are relevant at this
+ * stage.
+ */
+ if (db_specific)
+ {
+ /*
+ * If we must only keep track of transactions running in the current
+ * database, we need transaction info from exactly that database.
+ */
+ if (running->dbid != MyDatabaseId)
+ {
+ LogStandbySnapshot(MyDatabaseId);
+
+ return;
+ }
+
+ /*
+ * We'd better be able to check during scan if the plugin does not
+ * lie.
+ */
+ if (accessSharedCatalogsInDecoding)
+ accessSharedCatalogsInDecoding = false;
+ }
+
/* returns false if there's no point in performing cleanup just yet */
if (!SnapBuildFindSnapshot(builder, lsn, running))
return;
@@ -1155,6 +1199,16 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
else
SnapBuildSerialize(builder, lsn);
+ /*
+ * Database specific transaction info may exist to reach CONSISTENT state
+ * faster, however the code below makes no use of it. Moreover, such
+ * record might cause problems because the following normal (cluster-wide)
+ * record can have lower value of oldestRunningXid. In that case, let's
+ * wait with the cleanup for the next regular cluster-wide record.
+ */
+ if (OidIsValid(running->dbid))
+ return;
+
/*
* Update range of interesting xids based on the running xacts
* information. We don't increase ->xmax using it, because once we are in
@@ -1465,7 +1519,11 @@ SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
*/
if (!RecoveryInProgress())
{
- LogStandbySnapshot();
+ /*
+ * If the last transaction info was about specific database, so needs
+ * to be the next one - at least until we're in the CONSISTENT state.
+ */
+ LogStandbySnapshot(running->dbid);
}
}
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 21a213a0ebf..a1f37e59dbc 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1756,7 +1756,7 @@ ReplicationSlotReserveWal(void)
XLogRecPtr flushptr;
/* make sure we have enough information to start */
- flushptr = LogStandbySnapshot();
+ flushptr = LogStandbySnapshot(InvalidOid);
/* and make sure it's fsynced to disk */
XLogFlush(flushptr);
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index f540bb6b23f..9299bcebbda 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -2623,9 +2623,11 @@ ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc)
*
* Note that if any transaction has overflowed its cached subtransactions
* then there is no real need include any subtransactions.
+ *
+ * If 'dbid' is valid, only gather transactions running in that database.
*/
RunningTransactions
-GetRunningTransactionData(void)
+GetRunningTransactionData(Oid dbid)
{
/* result workspace */
static RunningTransactionsData CurrentRunningXactsData;
@@ -2700,6 +2702,18 @@ GetRunningTransactionData(void)
if (!TransactionIdIsValid(xid))
continue;
+ /*
+ * Filter by database OID if requested.
+ */
+ if (OidIsValid(dbid))
+ {
+ int pgprocno = arrayP->pgprocnos[index];
+ PGPROC *proc = &allProcs[pgprocno];
+
+ if (proc->databaseId != dbid)
+ continue;
+ }
+
/*
* Be careful not to exclude any xids before calculating the values of
* oldestRunningXid and suboverflowed, since these are used to clean
@@ -2750,6 +2764,12 @@ GetRunningTransactionData(void)
PGPROC *proc = &allProcs[pgprocno];
int nsubxids;
+ /*
+ * Filter by database OID if requested.
+ */
+ if (OidIsValid(dbid) && proc->databaseId != dbid)
+ continue;
+
/*
* Save subtransaction XIDs. Other backends can't add or remove
* entries while we're holding XidGenLock.
@@ -2783,6 +2803,7 @@ GetRunningTransactionData(void)
* increases if slots do.
*/
+ CurrentRunningXacts->dbid = dbid;
CurrentRunningXacts->xcnt = count - subcount;
CurrentRunningXacts->subxcnt = subcount;
CurrentRunningXacts->subxid_status = suboverflowed ? SUBXIDS_IN_SUBTRANS : SUBXIDS_IN_ARRAY;
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index de9092fdf5b..29af7733948 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -1188,6 +1188,14 @@ standby_redo(XLogReaderState *record)
xl_running_xacts *xlrec = (xl_running_xacts *) XLogRecGetData(record);
RunningTransactionsData running;
+ /*
+ * Records issued for specific database are not suitable for physical
+ * replication because that affects the whole cluster. In particular,
+ * the list of XID is probably incomplete here.
+ */
+ if (OidIsValid(xlrec->dbid))
+ return;
+
running.xcnt = xlrec->xcnt;
running.subxcnt = xlrec->subxcnt;
running.subxid_status = xlrec->subxid_overflow ? SUBXIDS_MISSING : SUBXIDS_IN_ARRAY;
@@ -1277,11 +1285,22 @@ standby_redo(XLogReaderState *record)
* as there's no independent knob to just enable logical decoding. For
* details of how this is used, check snapbuild.c's introductory comment.
*
+ * If 'dbid' is valid, only gather transactions running in that
+ * database. snapbuild.c can use such running xacts information for faster
+ * startup, but it still needs normal (cluster-wide) during the actual
+ * decoding - see standby_decode() and SnapBuildProcessRunningXacts() for
+ * details. Other processes (e.g. checkpointer) issue the cluster-wide records
+ * whether logical decoding is active or not.
+ *
+ * Please be careful about using this argument for other purposes. In
+ * particular, physical replication *must* ignore the database-specific
+ * records, exactly because they do not cover the whole cluster - see
+ * standby_redo().
*
* Returns the RecPtr of the last inserted record.
*/
XLogRecPtr
-LogStandbySnapshot(void)
+LogStandbySnapshot(Oid dbid)
{
XLogRecPtr recptr;
RunningTransactions running;
@@ -1314,7 +1333,7 @@ LogStandbySnapshot(void)
* Log details of all in-progress transactions. This should be the last
* record we write, because standby will open up when it sees this.
*/
- running = GetRunningTransactionData();
+ running = GetRunningTransactionData(dbid);
/*
* GetRunningTransactionData() acquired ProcArrayLock, we must release it.
@@ -1358,6 +1377,7 @@ LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
xl_running_xacts xlrec;
XLogRecPtr recptr;
+ xlrec.dbid = CurrRunningXacts->dbid;
xlrec.xcnt = CurrRunningXacts->xcnt;
xlrec.subxcnt = CurrRunningXacts->subxcnt;
xlrec.subxid_overflow = (CurrRunningXacts->subxid_status != SUBXIDS_IN_ARRAY);
diff --git a/src/include/access/genam.h b/src/include/access/genam.h
index b69320a7fc8..c9a52277be1 100644
--- a/src/include/access/genam.h
+++ b/src/include/access/genam.h
@@ -136,6 +136,13 @@ typedef struct IndexOrderByDistance
bool isnull;
} IndexOrderByDistance;
+
+/*
+ * Keep track of whether logical decoding in this backend promised not to
+ * access shared catalogs, as a safety check. Somewhat misplaced, but ...
+ */
+extern bool accessSharedCatalogsInDecoding;
+
/*
* generalized index_ interface routines (in indexam.c)
*/
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index 10c18d39ff8..13ae3ad4fbb 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -32,7 +32,7 @@
/*
* Each page of XLOG file has a header like this:
*/
-#define XLOG_PAGE_MAGIC 0xD11E /* can be used as WAL version indicator */
+#define XLOG_PAGE_MAGIC 0xD11F /* can be used as WAL version indicator */
typedef struct XLogPageHeaderData
{
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 842fcde67f9..917f3cff232 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -27,6 +27,7 @@ typedef struct OutputPluginOptions
{
OutputPluginOutputType output_type;
bool receive_rewrites;
+ bool need_shared_catalogs;
} OutputPluginOptions;
/*
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index a22a83a2f23..d02530a912a 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -92,7 +92,8 @@ extern void SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
XLogRecPtr lsn,
xl_heap_new_cid *xlrec);
extern void SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn,
- xl_running_xacts *running);
+ xl_running_xacts *running,
+ bool db_specific);
extern void SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn);
extern bool SnapBuildSnapshotExists(XLogRecPtr lsn);
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index d718a5b542f..ec89c448220 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -47,7 +47,7 @@ extern bool ProcArrayInstallImportedXmin(TransactionId xmin,
VirtualTransactionId *sourcevxid);
extern bool ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc);
-extern RunningTransactions GetRunningTransactionData(void);
+extern RunningTransactions GetRunningTransactionData(Oid dbid);
extern bool TransactionIdIsInProgress(TransactionId xid);
extern TransactionId GetOldestNonRemovableTransactionId(Relation rel);
diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
index 6a314c693cd..8715c08e94f 100644
--- a/src/include/storage/standby.h
+++ b/src/include/storage/standby.h
@@ -126,6 +126,7 @@ typedef enum
typedef struct RunningTransactionsData
{
+ Oid dbid; /* only track xacts in this database */
int xcnt; /* # of xact ids in xids[] */
int subxcnt; /* # of subxact ids in xids[] */
subxids_array_status subxid_status;
@@ -143,7 +144,7 @@ typedef RunningTransactionsData *RunningTransactions;
extern void LogAccessExclusiveLock(Oid dbOid, Oid relOid);
extern void LogAccessExclusiveLockPrepare(void);
-extern XLogRecPtr LogStandbySnapshot(void);
+extern XLogRecPtr LogStandbySnapshot(Oid dbid);
extern void LogStandbyInvalidations(int nmsgs, SharedInvalidationMessage *msgs,
bool relcacheInitFileInval);
diff --git a/src/include/storage/standbydefs.h b/src/include/storage/standbydefs.h
index 231d251fd51..e75b7078766 100644
--- a/src/include/storage/standbydefs.h
+++ b/src/include/storage/standbydefs.h
@@ -46,6 +46,7 @@ typedef struct xl_standby_locks
*/
typedef struct xl_running_xacts
{
+ Oid dbid; /* only track xacts in this database */
int xcnt; /* # of xact ids in xids[] */
int subxcnt; /* # of subxact ids in xids[] */
bool subxid_overflow; /* snapshot overflowed, subxids missing */
--
2.47.3