Hi,

On 2017-04-15 05:18:49 +0200, Petr Jelinek wrote:
> Hi, here is updated patch (details inline).

I'm not yet all that happy, sorry:

Looking at 0001:
- GetOldestSafeDecodingTransactionId() only guarantees to return an xid
  safe for decoding (note how procArray->replication_slot_catalog_xmin
  is checked), not one for the initial snapshot - so afaics this whole
  exercise doesn't guarantee much so far.
- A later commit introduces need_full_snapshot as a
  CreateInitDecodingContext, but you don't use it, not here.  That seems
  wrong.
- I remain unhappy with the handling of the reset of effective_xmin in
  FreeDecodingContext(). What if we ERROR/FATAL out before that happens?

What do you think about something like the attached?  I've not yet
tested it any way except running the regression tests.

- Andres
>From b20c8e1edb31d517ecb714467a7acbeec1b926dc Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Sun, 23 Apr 2017 20:41:29 -0700
Subject: [PATCH] Preserve required !catalog tuples while computing initial
 decoding snapshot.

The logical decoding machinery already preserved all the required
catalog tuples, which is sufficient in the course of normal logical
decoding, but did not guarantee that non-catalog tuples were preserved
during computation of the initial snapshot when creating a slot over
the replication protocol.

This could cause a corrupted initial snapshot being exported.  The
time window for issues is usually not terribly large, but on a busy
server it's perfectly possible to it hit it.  Ongoing decoding is not
affected by this bug.

To avoid increased overhead for the SQL API, only retain additional
tuples when a logical slot is being created over the replication
protocol.  To do so this commit changes the signature of
CreateInitDecodingContext(), but it seems unlikely that it's being
used in an extension, so that's probably ok.

In a drive-by fix, fix handling of
ReplicationSlotsComputeRequiredXmin's already_locked argument, which
should only apply to ProcArrayLock, not ReplicationSlotControlLock.

Reported-By: Erik Rijkers
Analyzed-By: Petr Jelinek
Author: Petr Jelinek, heavily editorialized by Andres Freund
Reviewed-By: Andres Freund
Discussion: https://postgr.es/m/9a897b86-46e1-9915-ee4c-da02e4ff6...@2ndquadrant.com
Backport: 9.4, where logical decoding was introduced.
---
 src/backend/replication/logical/logical.c   | 25 +++++++++++++++++--------
 src/backend/replication/logical/snapbuild.c | 12 ++++++++++++
 src/backend/replication/slot.c              | 25 +++++++++++++++++++++----
 src/backend/replication/slotfuncs.c         |  4 ++--
 src/backend/replication/walsender.c         |  1 +
 src/backend/storage/ipc/procarray.c         | 14 +++++++++++---
 src/include/replication/logical.h           |  1 +
 src/include/storage/procarray.h             |  2 +-
 8 files changed, 66 insertions(+), 18 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 5529ac8fb4..032e91c371 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -210,6 +210,7 @@ StartupDecodingContext(List *output_plugin_options,
 LogicalDecodingContext *
 CreateInitDecodingContext(char *plugin,
 						  List *output_plugin_options,
+						  bool need_full_snapshot,
 						  XLogPageReadCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
 						  LogicalOutputPluginWriterWrite do_write)
@@ -267,23 +268,31 @@ CreateInitDecodingContext(char *plugin,
 	 * the slot machinery about the new limit. Once that's done the
 	 * ProcArrayLock can be released as the slot machinery now is
 	 * protecting against vacuum.
+	 *
+	 * Note that, temporarily, the data, not just the catalog, xmin has to be
+	 * reserved if a data snapshot is to be exported.  Otherwise the initial
+	 * data snapshot created here is not guaranteed to be valid. After that
+	 * the data xmin doesn't need to be managed anymore and the global xmin
+	 * should be recomputed. As we are fine with losing the pegged data xmin
+	 * after crash - no chance a snapshot would get exported anymore - we can
+	 * get away with just setting the slot's
+	 * effective_xmin. ReplicationSlotRelease will reset it again.
+	 *
 	 * ----
 	 */
 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
 
-	slot->effective_catalog_xmin = GetOldestSafeDecodingTransactionId();
-	slot->data.catalog_xmin = slot->effective_catalog_xmin;
+	xmin_horizon = GetOldestSafeDecodingTransactionId(need_full_snapshot);
+
+	slot->effective_catalog_xmin = xmin_horizon;
+	slot->data.catalog_xmin = xmin_horizon;
+	if (need_full_snapshot)
+		slot->effective_xmin = xmin_horizon;
 
 	ReplicationSlotsComputeRequiredXmin(true);
 
 	LWLockRelease(ProcArrayLock);
 
-	/*
-	 * tell the snapshot builder to only assemble snapshot once reaching the
-	 * running_xact's record with the respective xmin.
-	 */
-	xmin_horizon = slot->data.catalog_xmin;
-
 	ReplicationSlotMarkDirty();
 	ReplicationSlotSave();
 
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 358ec28932..458a52b68b 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -606,6 +606,18 @@ SnapBuildExportSnapshot(SnapBuild *builder)
 
 	snap = SnapBuildInitialSnapshot(builder);
 
+#ifdef USE_ASSERT_CHECKING
+	{
+		TransactionId safeXid;
+
+		LWLockAcquire(ProcArrayLock, LW_SHARED);
+		safeXid = GetOldestSafeDecodingTransactionId(true);
+		LWLockRelease(ProcArrayLock);
+
+		Assert(TransactionIdPrecedesOrEquals(safeXid, builder->xmin));
+	}
+#endif
+
 	/*
 	 * now that we've built a plain snapshot, make it active and use the
 	 * normal mechanisms for exporting it
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index e8ad0f7b39..5f63d0484a 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -398,6 +398,22 @@ ReplicationSlotRelease(void)
 		SpinLockRelease(&slot->mutex);
 	}
 
+
+	/*
+	 * If slot needed to temporarily restrain both data and catalog xmin to
+	 * create the catalog snapshot, remove that temporary constraint.
+	 * Snapshots can only be exported while the initial snapshot is still
+	 * acquired.
+	 */
+	if (!TransactionIdIsValid(slot->data.xmin) &&
+		TransactionIdIsValid(slot->effective_xmin))
+	{
+		SpinLockAcquire(&slot->mutex);
+		slot->effective_xmin = InvalidTransactionId;
+		SpinLockRelease(&slot->mutex);
+		ReplicationSlotsComputeRequiredXmin(false);
+	}
+
 	MyReplicationSlot = NULL;
 
 	/* might not have been set when we've been a plain slot */
@@ -612,6 +628,9 @@ ReplicationSlotPersist(void)
 
 /*
  * Compute the oldest xmin across all slots and store it in the ProcArray.
+ *
+ * If already_locked is true, ProcArrayLock has already been acquired
+ * exclusively.
  */
 void
 ReplicationSlotsComputeRequiredXmin(bool already_locked)
@@ -622,8 +641,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
 
 	Assert(ReplicationSlotCtl != NULL);
 
-	if (!already_locked)
-		LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 
 	for (i = 0; i < max_replication_slots; i++)
 	{
@@ -652,8 +670,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
 			agg_catalog_xmin = effective_catalog_xmin;
 	}
 
-	if (!already_locked)
-		LWLockRelease(ReplicationSlotControlLock);
+	LWLockRelease(ReplicationSlotControlLock);
 
 	ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
 }
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 7104c94795..6ee1e68819 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -131,8 +131,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 	/*
 	 * Create logical decoding context, to build the initial snapshot.
 	 */
-	ctx = CreateInitDecodingContext(
-									NameStr(*plugin), NIL,
+	ctx = CreateInitDecodingContext(NameStr(*plugin), NIL,
+									false, /* do not build snapshot */
 									logical_read_local_xlog_page, NULL, NULL);
 
 	/* build initial snapshot, might take a while */
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 064cf5ee28..43c8a73f3e 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -909,6 +909,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 		}
 
 		ctx = CreateInitDecodingContext(cmd->plugin, NIL,
+										true, /* build snapshot */
 										logical_read_xlog_page,
 										WalSndPrepareWrite, WalSndWriteData);
 
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index ebf6a92923..233eb606f5 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -2151,7 +2151,7 @@ GetOldestActiveTransactionId(void)
  * that the caller will immediately use the xid to peg the xmin horizon.
  */
 TransactionId
-GetOldestSafeDecodingTransactionId(void)
+GetOldestSafeDecodingTransactionId(bool catalogOnly)
 {
 	ProcArrayStruct *arrayP = procArray;
 	TransactionId oldestSafeXid;
@@ -2174,9 +2174,17 @@ GetOldestSafeDecodingTransactionId(void)
 	/*
 	 * If there's already a slot pegging the xmin horizon, we can start with
 	 * that value, it's guaranteed to be safe since it's computed by this
-	 * routine initially and has been enforced since.
+	 * routine initially and has been enforced since.  We can always use the
+	 * slot's general xmin horizon, but the catalog horizon is only usable
+	 * when we only catalog data is going to be looked at.
 	 */
-	if (TransactionIdIsValid(procArray->replication_slot_catalog_xmin) &&
+	if (TransactionIdIsValid(procArray->replication_slot_xmin) &&
+		TransactionIdPrecedes(procArray->replication_slot_xmin,
+							  oldestSafeXid))
+		oldestSafeXid = procArray->replication_slot_xmin;
+
+	if (catalogOnly &&
+		TransactionIdIsValid(procArray->replication_slot_catalog_xmin) &&
 		TransactionIdPrecedes(procArray->replication_slot_catalog_xmin,
 							  oldestSafeXid))
 		oldestSafeXid = procArray->replication_slot_catalog_xmin;
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 7d6c88efe3..80f04c3cb9 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -82,6 +82,7 @@ extern void CheckLogicalDecodingRequirements(void);
 
 extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
 						  List *output_plugin_options,
+						  bool need_full_snapshot,
 						  XLogPageReadCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
 						  LogicalOutputPluginWriterWrite do_write);
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index 9b42e49524..805ecd25ec 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -88,7 +88,7 @@ extern bool TransactionIdIsInProgress(TransactionId xid);
 extern bool TransactionIdIsActive(TransactionId xid);
 extern TransactionId GetOldestXmin(Relation rel, int flags);
 extern TransactionId GetOldestActiveTransactionId(void);
-extern TransactionId GetOldestSafeDecodingTransactionId(void);
+extern TransactionId GetOldestSafeDecodingTransactionId(bool catalogOnly);
 
 extern VirtualTransactionId *GetVirtualXIDsDelayingChkpt(int *nvxids);
 extern bool HaveVirtualXIDsDelayingChkpt(VirtualTransactionId *vxids, int nvxids);
-- 
2.12.0.264.gd6db3f2165.dirty

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to