From 7e69b0f926743a3b15233b6c9c5b3436b927cb1d Mon Sep 17 00:00:00 2001
From: Amit Kapila <akapila@postgresql.org>
Date: Thu, 25 Feb 2021 16:46:39 +0530
Subject: [PATCH v3 2/2] Fixed review comments.

---
 src/backend/replication/logical/decode.c      | 11 ++------
 src/backend/replication/logical/logical.c     | 14 ++--------
 .../replication/logical/reorderbuffer.c       | 28 +++++++++++++++----
 src/backend/replication/logical/snapbuild.c   | 18 +++++++-----
 src/include/replication/slot.h                |  7 ++---
 src/include/replication/snapbuild.h           |  4 +--
 6 files changed, 43 insertions(+), 39 deletions(-)

diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 9f6e5d52ab..7f83bbb6a3 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -663,7 +663,6 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	XLogRecPtr	origin_lsn = InvalidXLogRecPtr;
 	TimestampTz commit_time = parsed->xact_time;
 	RepOriginId origin_id = XLogRecGetOrigin(buf->record);
-	XLogRecPtr  snapshot_was_exported_at_lsn = InvalidXLogRecPtr;
 	int			i;
 
 	if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
@@ -716,14 +715,8 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	 */
 	if (two_phase)
 	{
-		/*
-		 * Get the LSN at which the snapshot for this slot was exported.
-		 * ReorderBufferFinishPrepared will decide based on this if the
-		 * transaction should be replayed on COMMIT PREPARED.
-		 */
-		snapshot_was_exported_at_lsn = SnapBuildExportLSNAt(ctx->snapshot_builder);
 		ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
-									snapshot_was_exported_at_lsn,
+									SnapBuildExportAt(ctx->snapshot_builder),
 									commit_time, origin_id, origin_lsn,
 									parsed->twophase_gid, true);
 	}
@@ -782,6 +775,7 @@ DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	/* We can't start streaming unless a consistent state is reached. */
 	if (SnapBuildCurrentState(builder) < SNAPBUILD_CONSISTENT)
 	{
+		ReorderBufferSkipPrepare(ctx->reorder, xid);
 		return;
 	}
 
@@ -799,6 +793,7 @@ DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	 */
 	if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id))
 	{
+		ReorderBufferSkipPrepare(ctx->reorder, xid);
 		ReorderBufferInvalidate(ctx->reorder, xid, buf->origptr);
 		return;
 	}
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 5634635e94..2ea82c6513 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -207,7 +207,7 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder = ReorderBufferAllocate();
 	ctx->snapshot_builder =
 		AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn,
-								need_full_snapshot, slot->data.snapshot_was_exported_at_lsn);
+								need_full_snapshot, slot->data.snapshot_was_exported_at);
 
 	ctx->reorder->private_data = ctx;
 
@@ -590,17 +590,7 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
 
 	SpinLockAcquire(&slot->mutex);
 	slot->data.confirmed_flush = ctx->reader->EndRecPtr;
-
-	/*
-	 * The snapshot_was_exported_at_lsn point is required in two-phase
-	 * commits to handle prepared transactions that were not part of this
-	 * snapshot at export time. PREPAREs prior to this point need special
-	 * handling if two-phase commits are enabled.
-	 * The snapshot_was_exported_at_lsn is only updated once when
-	 * the slot is created and is not modified on restarts unlike the
-	 * confirmed_flush point.
-	 */
-	slot->data.snapshot_was_exported_at_lsn = ctx->reader->EndRecPtr;
+	slot->data.snapshot_was_exported_at = ctx->reader->EndRecPtr;
 	SpinLockRelease(&slot->mutex);
 }
 
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 5a3c986c85..8aefc7eaa7 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2623,6 +2623,21 @@ ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid,
 	return true;
 }
 
+/* Remember that we have skipped prepare */
+void
+ReorderBufferSkipPrepare(ReorderBuffer* rb, TransactionId xid)
+{
+	ReorderBufferTXN* txn;
+
+	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
+
+	/* unknown transaction, nothing to do */
+	if (txn == NULL)
+		return;
+
+	txn->txn_flags |= RBTXN_SKIPPED_PREPARE;
+}
+
 /*
  * Prepare a two-phase transaction.
  *
@@ -2657,7 +2672,7 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
 void
 ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
 							XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
-							XLogRecPtr snapshot_was_exported_at_lsn,
+							XLogRecPtr snapshot_was_exported_at,
 							TimestampTz commit_time, RepOriginId origin_id,
 							XLogRecPtr origin_lsn, char *gid, bool is_commit)
 {
@@ -2682,12 +2697,13 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
 	txn->gid = pstrdup(gid);
 
 	/*
-	 * It is possible that this transaction was not decoded at prepare time
-	 * because by that time we didn't have a consistent snapshot.
-	 * In which case we need to replay the prepared transaction here because
-	 * downstream would not have seen this transaction yet.
+	 * It is possible that this transaction is not decoded at prepare time
+	 * either because by that time we didn't have a consistent snapshot or it
+	 * was decoded earlier but we have restarted. We only need to send the
+	 * prepare if it was not decoded earlier. We don't need to decode the xact
+	 * for aborts if it is not done already.
 	 */
-	if ((txn->final_lsn < snapshot_was_exported_at_lsn) && is_commit)
+	if ((txn->final_lsn < snapshot_was_exported_at) && is_commit)
 	{
 		txn->txn_flags |= RBTXN_PREPARE;
 
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 7622b1dba1..fe486c4c83 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -165,10 +165,14 @@ struct SnapBuild
 	XLogRecPtr	start_decoding_at;
 
 	/*
-	 * In two-phase commits, if the PREPARE is prior to this LSN, then the
-	 * whole transaction needs to be replayed at COMMIT PREPARED.
+	 * LSN at which we found a consistent point at the time of slot creation.
+	 * This is also the point where we have exported snapshot for initial copy.
+	 *
+	 * The prepared transactions that are not covered by initial snapshot needs
+	 * to be sent later along with commit prepared and they must be before this
+	 * point.
 	 */
-	XLogRecPtr  snapshot_was_exported_at_lsn;
+	XLogRecPtr  snapshot_was_exported_at;
 
 	/*
 	 * Don't start decoding WAL until the "xl_running_xacts" information
@@ -276,7 +280,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
 						TransactionId xmin_horizon,
 						XLogRecPtr start_lsn,
 						bool need_full_snapshot,
-						XLogRecPtr snapshot_was_exported_at_lsn)
+						XLogRecPtr snapshot_was_exported_at)
 {
 	MemoryContext context;
 	MemoryContext oldcontext;
@@ -304,7 +308,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
 	builder->initial_xmin_horizon = xmin_horizon;
 	builder->start_decoding_at = start_lsn;
 	builder->building_full_snapshot = need_full_snapshot;
-	builder->snapshot_was_exported_at_lsn = snapshot_was_exported_at_lsn;
+	builder->snapshot_was_exported_at = snapshot_was_exported_at;
 
 	MemoryContextSwitchTo(oldcontext);
 
@@ -368,9 +372,9 @@ SnapBuildCurrentState(SnapBuild *builder)
  * Return the LSN at which the snapshot was exported
  */
 XLogRecPtr
-SnapBuildExportLSNAt(SnapBuild *builder)
+SnapBuildExportAt(SnapBuild *builder)
 {
-	return builder->snapshot_was_exported_at_lsn;
+	return builder->snapshot_was_exported_at;
 }
 
 /*
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index ad8fb371a6..5764293555 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -92,11 +92,10 @@ typedef struct ReplicationSlotPersistentData
 	XLogRecPtr	confirmed_flush;
 
 	/*
-	 * LSN at which this slot found consistent point and snapshot exported.
-	 * This is required for two-phase transactions to decide if the whole
-	 * transaction should be replayed at COMMIT PREPARED.
+	 * LSN at which we found a consistent point at the time of slot creation.
+	 * This is also the point where we have exported snapshot for initial copy.
 	 */
-	XLogRecPtr  snapshot_was_exported_at_lsn;
+	XLogRecPtr  snapshot_was_exported_at;
 
 	/* plugin name */
 	NameData	plugin;
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index 0693115005..f15ac6639f 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -62,7 +62,7 @@ extern void CheckPointSnapBuild(void);
 extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *cache,
 										  TransactionId xmin_horizon, XLogRecPtr start_lsn,
 										  bool need_full_snapshot,
-										  XLogRecPtr snapshot_was_exported_at_lsn);
+										  XLogRecPtr snapshot_was_exported_at);
 extern void FreeSnapshotBuilder(SnapBuild *cache);
 
 extern void SnapBuildSnapDecRefcount(Snapshot snap);
@@ -76,7 +76,7 @@ extern Snapshot SnapBuildGetOrBuildSnapshot(SnapBuild *builder,
 											TransactionId xid);
 
 extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr);
-extern XLogRecPtr SnapBuildExportLSNAt(SnapBuild *builder);
+extern XLogRecPtr SnapBuildExportAt(SnapBuild *builder);
 
 extern void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn,
 							   TransactionId xid, int nsubxacts,
-- 
2.28.0.windows.1

