From 34621aef44b38af447c670f94da9e703bdd495fa Mon Sep 17 00:00:00 2001
From: "Zheng (Zane) Li" <zhelli@amazon.com>
Date: Fri, 20 May 2022 19:55:50 +0000
Subject: [PATCH] Intorduce new field
 ReplicationSlotPersistentData.non_xact_op_at, XLogReaderState.NonXactOpRecPtr
 and SnapBuild.start_decoding_nonxactop_at. This is the end lsn of last
 non-transactional message/operation decoded. We introduce this new field in
 an attempt to solve the issue of missing decoding of non-transactional
 message/operation under concurrency/before build state reach
 SNAPBUILD_CONSISTENT. Once the build state reach SNAPBUILD_CONSISTENT,
 non_xact_op_at can be set to ReplicationSlotPersistentData.confirmed_flush.

---
 src/backend/replication/logical/decode.c      | 18 ++++++++++---
 src/backend/replication/logical/logical.c     | 25 +++++++++++++++---
 .../replication/logical/logicalfuncs.c        |  2 +-
 src/backend/replication/logical/snapbuild.c   | 26 +++++++++++++++++--
 src/backend/replication/slotfuncs.c           |  2 +-
 src/backend/replication/walsender.c           |  2 +-
 src/include/access/xlogreader.h               |  3 +++
 src/include/replication/logical.h             |  2 +-
 src/include/replication/slot.h                |  9 +++++++
 src/include/replication/snapbuild.h           |  5 +++-
 10 files changed, 80 insertions(+), 14 deletions(-)

diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index aa2427ba73..a0b700a662 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -589,12 +589,24 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	if (message->transactional &&
 		!SnapBuildProcessChange(builder, xid, buf->origptr))
 		return;
+
 	else if (!message->transactional &&
-			 (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
-			  SnapBuildXactNeedsSkip(builder, buf->origptr)))
+			 ((SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) ||
+			  /* Can't prcess non-transactional msg immediately if consumer functions are not provided */
+			  ctx->prepare_write == NULL ||
+			  SnapBuildNonXactOpNeedsSkip(builder, buf->origptr)))
 		return;
 
-	snapshot = SnapBuildGetOrBuildSnapshot(builder, xid);
+	/*
+	 * Build or get snapshot for non-transactional msg for immedieate processing.
+	 * Transactional msg will be queued and processed upon decoding of the commit
+	 * record.
+	 */
+	if (!message->transactional)
+	{
+		snapshot = SnapBuildGetOrBuildSnapshot(builder, xid);
+	}
+
 	ReorderBufferQueueMessage(ctx->reorder, xid, snapshot, buf->endptr,
 							  message->transactional,
 							  message->message, /* first part of message is
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 625a7f4273..302a61efb8 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -208,7 +208,8 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder = ReorderBufferAllocate();
 	ctx->snapshot_builder =
 		AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn,
-								need_full_snapshot, slot->data.two_phase_at);
+								need_full_snapshot, slot->data.two_phase_at,
+								slot->data.non_xact_op_at);
 
 	ctx->reorder->private_data = ctx;
 
@@ -1216,6 +1217,12 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix,
 							  message_size, message);
 
+	if (!transactional)
+	{
+		SnapBuildSetNonXactOpAt(ctx->snapshot_builder, message_lsn);
+		ctx->reader->NonXactOpRecPtr = message_lsn;
+	}
+
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
 }
@@ -1531,6 +1538,12 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	ctx->callbacks.stream_message_cb(ctx, txn, message_lsn, transactional, prefix,
 									 message_size, message);
 
+	if (!transactional)
+	{
+		SnapBuildSetNonXactOpAt(ctx->snapshot_builder, message_lsn);
+		ctx->reader->NonXactOpRecPtr = message_lsn;
+	}
+
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
 }
@@ -1648,7 +1661,7 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
 
 	/* candidate already valid with the current flush position, apply */
 	if (updated_xmin)
-		LogicalConfirmReceivedLocation(slot->data.confirmed_flush);
+		LogicalConfirmReceivedLocation(slot->data.confirmed_flush, InvalidXLogRecPtr);
 }
 
 /*
@@ -1726,14 +1739,14 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 
 	/* candidates are already valid with the current flush position, apply */
 	if (updated_lsn)
-		LogicalConfirmReceivedLocation(slot->data.confirmed_flush);
+		LogicalConfirmReceivedLocation(slot->data.confirmed_flush, InvalidXLogRecPtr);
 }
 
 /*
  * Handle a consumer's confirmation having received all changes up to lsn.
  */
 void
-LogicalConfirmReceivedLocation(XLogRecPtr lsn)
+LogicalConfirmReceivedLocation(XLogRecPtr lsn, XLogRecPtr lsn_nonxact)
 {
 	Assert(lsn != InvalidXLogRecPtr);
 
@@ -1747,6 +1760,8 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 		SpinLockAcquire(&MyReplicationSlot->mutex);
 
 		MyReplicationSlot->data.confirmed_flush = lsn;
+		if (lsn_nonxact != InvalidXLogRecPtr)
+			MyReplicationSlot->data.non_xact_op_at = lsn_nonxact;
 
 		/* if we're past the location required for bumping xmin, do so */
 		if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr &&
@@ -1812,6 +1827,8 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 	{
 		SpinLockAcquire(&MyReplicationSlot->mutex);
 		MyReplicationSlot->data.confirmed_flush = lsn;
+		if (lsn_nonxact != InvalidXLogRecPtr)
+			MyReplicationSlot->data.non_xact_op_at = lsn_nonxact;
 		SpinLockRelease(&MyReplicationSlot->mutex);
 	}
 }
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 6058d36e0d..5d579218d8 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -294,7 +294,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 		 */
 		if (ctx->reader->EndRecPtr != InvalidXLogRecPtr && confirm)
 		{
-			LogicalConfirmReceivedLocation(ctx->reader->EndRecPtr);
+			LogicalConfirmReceivedLocation(ctx->reader->EndRecPtr, ctx->reader->NonXactOpRecPtr);
 
 			/*
 			 * If only the confirmed_flush_lsn has changed the slot won't get
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 1119a12db9..d5d378836a 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -175,6 +175,8 @@ struct SnapBuild
 	 */
 	XLogRecPtr	two_phase_at;
 
+	XLogRecPtr	start_decoding_nonxactop_at;
+
 	/*
 	 * Don't start decoding WAL until the "xl_running_xacts" information
 	 * indicates there are no running xids with an xid smaller than this.
@@ -281,7 +283,8 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
 						TransactionId xmin_horizon,
 						XLogRecPtr start_lsn,
 						bool need_full_snapshot,
-						XLogRecPtr two_phase_at)
+						XLogRecPtr two_phase_at,
+						XLogRecPtr non_xact_op_at)
 {
 	MemoryContext context;
 	MemoryContext oldcontext;
@@ -310,6 +313,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
 	builder->start_decoding_at = start_lsn;
 	builder->building_full_snapshot = need_full_snapshot;
 	builder->two_phase_at = two_phase_at;
+	builder->start_decoding_nonxactop_at = non_xact_op_at+1;
 
 	MemoryContextSwitchTo(oldcontext);
 
@@ -387,6 +391,15 @@ SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr)
 	builder->two_phase_at = ptr;
 }
 
+/*
+ * Set the LSN at which non-transactional op decoding is at.
+ */
+void
+SnapBuildSetNonXactOpAt(SnapBuild *builder, XLogRecPtr ptr)
+{
+	builder->start_decoding_nonxactop_at = ptr;
+}
+
 /*
  * Should the contents of transaction ending at 'ptr' be decoded?
  */
@@ -396,6 +409,15 @@ SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
 	return ptr < builder->start_decoding_at;
 }
 
+/*
+ * Should the contents of a non-transactional op ending at 'ptr' be decoded?
+ */
+bool
+SnapBuildNonXactOpNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
+{
+	return ptr < builder->start_decoding_nonxactop_at;
+}
+
 /*
  * Increase refcount of a snapshot.
  *
@@ -661,7 +683,7 @@ SnapBuildExportSnapshot(SnapBuild *builder)
 Snapshot
 SnapBuildGetOrBuildSnapshot(SnapBuild *builder, TransactionId xid)
 {
-	Assert(builder->state == SNAPBUILD_CONSISTENT);
+	Assert(builder->state >= SNAPBUILD_FULL_SNAPSHOT);
 
 	/* only build a new snapshot if we don't have a prebuilt one */
 	if (builder->snapshot == NULL)
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index ca945994ef..3bbbe74058 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -530,7 +530,7 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
 
 		if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
 		{
-			LogicalConfirmReceivedLocation(moveto);
+			LogicalConfirmReceivedLocation(moveto, moveto);
 
 			/*
 			 * If only the confirmed_flush LSN has changed the slot won't get
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index e42671722a..b1373e52fd 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2170,7 +2170,7 @@ ProcessStandbyReplyMessage(void)
 	if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
 	{
 		if (SlotIsLogical(MyReplicationSlot))
-			LogicalConfirmReceivedLocation(flushPtr);
+			LogicalConfirmReceivedLocation(flushPtr, flushPtr);
 		else
 			PhysicalConfirmReceivedLocation(flushPtr);
 	}
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index e73ea4a840..b197a6a354 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -206,6 +206,9 @@ struct XLogReaderState
 	XLogRecPtr	ReadRecPtr;		/* start of last record read */
 	XLogRecPtr	EndRecPtr;		/* end+1 of last record read */
 
+	/* end of last non-transactional op record read */
+	XLogRecPtr	NonXactOpRecPtr;
+
 	/*
 	 * Set at the end of recovery: the start point of a partial record at the
 	 * end of WAL (InvalidXLogRecPtr if there wasn't one), and the start
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index edadacd589..c9714b51f0 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -136,7 +136,7 @@ extern void FreeDecodingContext(LogicalDecodingContext *ctx);
 extern void LogicalIncreaseXminForSlot(XLogRecPtr lsn, TransactionId xmin);
 extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
 												  XLogRecPtr restart_lsn);
-extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
+extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn, XLogRecPtr lsn_nonxact);
 
 extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
 									  TransactionId xid, const char *gid);
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 8c9f3321d5..941c6f6da3 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -83,6 +83,15 @@ typedef struct ReplicationSlotPersistentData
 	 */
 	XLogRecPtr	confirmed_flush;
 
+	/*
+	 * Oldest LSN that the client has acked receipt for a non-transactional
+	 * operation such as a non-transactional message. This is introduced
+	 * to slove missing non-transactional decoding when creating logical
+	 * replication slot under concurrency. Under normal situation
+	 * (SNAPBUILD_CONSISTENT) we can make non_xact_op_at = confirmed_flush
+	 */
+	XLogRecPtr	non_xact_op_at;
+
 	/*
 	 * LSN at which we enabled two_phase commit for this slot or LSN at which
 	 * we found a consistent point at the time of slot creation.
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index d179251aad..7b055dba7c 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -62,7 +62,8 @@ extern void CheckPointSnapBuild(void);
 extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *cache,
 										  TransactionId xmin_horizon, XLogRecPtr start_lsn,
 										  bool need_full_snapshot,
-										  XLogRecPtr two_phase_at);
+										  XLogRecPtr two_phase_at,
+										  XLogRecPtr non_xact_op_at);
 extern void FreeSnapshotBuilder(SnapBuild *cache);
 
 extern void SnapBuildSnapDecRefcount(Snapshot snap);
@@ -77,8 +78,10 @@ extern Snapshot SnapBuildGetOrBuildSnapshot(SnapBuild *builder,
 											TransactionId xid);
 
 extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr);
+extern bool SnapBuildNonXactOpNeedsSkip(SnapBuild *builder, XLogRecPtr ptr);
 extern XLogRecPtr SnapBuildGetTwoPhaseAt(SnapBuild *builder);
 extern void SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr);
+extern void SnapBuildSetNonXactOpAt(SnapBuild *builder, XLogRecPtr ptr);
 
 extern void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn,
 							   TransactionId xid, int nsubxacts,
-- 
2.32.0

