From 017c25df53759f3a88a96b73e2240cfba444d4b4 Mon Sep 17 00:00:00 2001
From: Ajin Cherian <ajinc@fast.au.fujitsu.com>
Date: Wed, 24 Feb 2021 04:34:40 -0500
Subject: [PATCH v2] Avoid repeated decoding of prepared transactions.

Prepared transactions were decoded again after a restart on COMMIT PREPARED
when two-phase commits were enabled. This was done to avoid missing a prepared
transaction that is not part of initial snapshot. Now, this missing PREPARE is identified
by defining a new LSN called snapshot_was_exported_at_lsn and stored in the
slot and snapbuild structures. Prepared transactions that were prior this LSN
will be replayed on a COMMIT PREPARED.
---
 contrib/test_decoding/expected/twophase.out        | 38 +++++++---------------
 contrib/test_decoding/expected/twophase_stream.out | 28 ++--------------
 src/backend/replication/logical/decode.c           | 11 +++++--
 src/backend/replication/logical/logical.c          | 13 +++++++-
 src/backend/replication/logical/reorderbuffer.c    | 28 ++++------------
 src/backend/replication/logical/snapbuild.c        | 19 ++++++++++-
 src/include/replication/reorderbuffer.h            |  1 +
 src/include/replication/slot.h                     |  7 ++++
 src/include/replication/snapbuild.h                |  4 ++-
 9 files changed, 71 insertions(+), 78 deletions(-)

diff --git a/contrib/test_decoding/expected/twophase.out b/contrib/test_decoding/expected/twophase.out
index f9f6bed..c51870f 100644
--- a/contrib/test_decoding/expected/twophase.out
+++ b/contrib/test_decoding/expected/twophase.out
@@ -33,14 +33,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
 
 COMMIT PREPARED 'test_prepared#1';
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
-                        data                        
-----------------------------------------------------
- BEGIN
- table public.test_prepared1: INSERT: id[integer]:1
- table public.test_prepared1: INSERT: id[integer]:2
- PREPARE TRANSACTION 'test_prepared#1'
+               data                
+-----------------------------------
  COMMIT PREPARED 'test_prepared#1'
-(5 rows)
+(1 row)
 
 -- Test that rollback of a prepared xact is decoded.
 BEGIN;
@@ -103,13 +99,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
 
 COMMIT PREPARED 'test_prepared#3';
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
-                                  data                                   
--------------------------------------------------------------------------
- BEGIN
- table public.test_prepared1: INSERT: id[integer]:4 data[text]:'frakbar'
- PREPARE TRANSACTION 'test_prepared#3'
+               data                
+-----------------------------------
  COMMIT PREPARED 'test_prepared#3'
-(4 rows)
+(1 row)
 
 -- make sure stuff still works
 INSERT INTO test_prepared1 VALUES (6);
@@ -159,14 +152,10 @@ RESET statement_timeout;
 COMMIT PREPARED 'test_prepared_lock';
 -- consume the commit
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
-                                   data                                    
----------------------------------------------------------------------------
- BEGIN
- table public.test_prepared1: INSERT: id[integer]:8 data[text]:'othercol'
- table public.test_prepared1: INSERT: id[integer]:9 data[text]:'othercol2'
- PREPARE TRANSACTION 'test_prepared_lock'
+                 data                 
+--------------------------------------
  COMMIT PREPARED 'test_prepared_lock'
-(5 rows)
+(1 row)
 
 -- Test savepoints and sub-xacts. Creating savepoints will create
 -- sub-xacts implicitly.
@@ -189,13 +178,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
 COMMIT PREPARED 'test_prepared_savepoint';
 -- consume the commit
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
-                            data                            
-------------------------------------------------------------
- BEGIN
- table public.test_prepared_savepoint: INSERT: a[integer]:1
- PREPARE TRANSACTION 'test_prepared_savepoint'
+                   data                    
+-------------------------------------------
  COMMIT PREPARED 'test_prepared_savepoint'
-(4 rows)
+(1 row)
 
 -- Test that a GID containing "_nodecode" gets decoded at commit prepared time.
 BEGIN;
diff --git a/contrib/test_decoding/expected/twophase_stream.out b/contrib/test_decoding/expected/twophase_stream.out
index 3acc4acd3..d54e640 100644
--- a/contrib/test_decoding/expected/twophase_stream.out
+++ b/contrib/test_decoding/expected/twophase_stream.out
@@ -60,32 +60,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-
 COMMIT PREPARED 'test1';
 --should show the COMMIT PREPARED and the other changes in the transaction
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
-                            data                             
--------------------------------------------------------------
- BEGIN
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa1'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa2'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa3'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa4'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa5'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa6'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa7'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa8'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa9'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa10'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa11'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa12'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa13'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa14'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa15'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa16'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa17'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa18'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa19'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa20'
- PREPARE TRANSACTION 'test1'
+          data           
+-------------------------
  COMMIT PREPARED 'test1'
-(23 rows)
+(1 row)
 
 -- streaming test with sub-transaction and PREPARE/COMMIT PREPARED but with
 -- filtered gid. gids with '_nodecode' will not be decoded at prepare time.
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index afa1df0..9f6e5d5 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -663,6 +663,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	XLogRecPtr	origin_lsn = InvalidXLogRecPtr;
 	TimestampTz commit_time = parsed->xact_time;
 	RepOriginId origin_id = XLogRecGetOrigin(buf->record);
+	XLogRecPtr  snapshot_was_exported_at_lsn = InvalidXLogRecPtr;
 	int			i;
 
 	if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
@@ -715,7 +716,14 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	 */
 	if (two_phase)
 	{
+		/*
+		 * Get the LSN at which the snapshot for this slot was exported.
+		 * ReorderBufferFinishPrepared will decide based on this if the
+		 * transaction should be replayed on COMMIT PREPARED.
+		 */
+		snapshot_was_exported_at_lsn = SnapBuildExportLSNAt(ctx->snapshot_builder);
 		ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
+									snapshot_was_exported_at_lsn,
 									commit_time, origin_id, origin_lsn,
 									parsed->twophase_gid, true);
 	}
@@ -774,7 +782,6 @@ DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	/* We can't start streaming unless a consistent state is reached. */
 	if (SnapBuildCurrentState(builder) < SNAPBUILD_CONSISTENT)
 	{
-		ReorderBufferSkipPrepare(ctx->reorder, xid);
 		return;
 	}
 
@@ -792,7 +799,6 @@ DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	 */
 	if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id))
 	{
-		ReorderBufferSkipPrepare(ctx->reorder, xid);
 		ReorderBufferInvalidate(ctx->reorder, xid, buf->origptr);
 		return;
 	}
@@ -854,6 +860,7 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	{
 		ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
 									abort_time, origin_id, origin_lsn,
+									InvalidXLogRecPtr,
 									parsed->twophase_gid, false);
 	}
 	else
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index baeb45f..5634635 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -207,7 +207,7 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder = ReorderBufferAllocate();
 	ctx->snapshot_builder =
 		AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn,
-								need_full_snapshot);
+								need_full_snapshot, slot->data.snapshot_was_exported_at_lsn);
 
 	ctx->reorder->private_data = ctx;
 
@@ -590,6 +590,17 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
 
 	SpinLockAcquire(&slot->mutex);
 	slot->data.confirmed_flush = ctx->reader->EndRecPtr;
+
+	/*
+	 * The snapshot_was_exported_at_lsn point is required in two-phase
+	 * commits to handle prepared transactions that were not part of this
+	 * snapshot at export time. PREPAREs prior to this point need special
+	 * handling if two-phase commits are enabled.
+	 * The snapshot_was_exported_at_lsn is only updated once when
+	 * the slot is created and is not modified on restarts unlike the
+	 * confirmed_flush point.
+	 */
+	slot->data.snapshot_was_exported_at_lsn = ctx->reader->EndRecPtr;
 	SpinLockRelease(&slot->mutex);
 }
 
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index c3b9632..5a3c986 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2623,21 +2623,6 @@ ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid,
 	return true;
 }
 
-/* Remember that we have skipped prepare */
-void
-ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid)
-{
-	ReorderBufferTXN *txn;
-
-	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
-
-	/* unknown transaction, nothing to do */
-	if (txn == NULL)
-		return;
-
-	txn->txn_flags |= RBTXN_SKIPPED_PREPARE;
-}
-
 /*
  * Prepare a two-phase transaction.
  *
@@ -2672,6 +2657,7 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
 void
 ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
 							XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+							XLogRecPtr snapshot_was_exported_at_lsn,
 							TimestampTz commit_time, RepOriginId origin_id,
 							XLogRecPtr origin_lsn, char *gid, bool is_commit)
 {
@@ -2696,14 +2682,12 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
 	txn->gid = pstrdup(gid);
 
 	/*
-	 * It is possible that this transaction is not decoded at prepare time
-	 * either because by that time we didn't have a consistent snapshot or it
-	 * was decoded earlier but we have restarted. We can't distinguish between
-	 * those two cases so we send the prepare in both the cases and let
-	 * downstream decide whether to process or skip it. We don't need to
-	 * decode the xact for aborts if it is not done already.
+	 * It is possible that this transaction was not decoded at prepare time
+	 * because by that time we didn't have a consistent snapshot.
+	 * In which case we need to replay the prepared transaction here because
+	 * downstream would not have seen this transaction yet.
 	 */
-	if (!rbtxn_prepared(txn) && is_commit)
+	if ((txn->final_lsn < snapshot_was_exported_at_lsn) && is_commit)
 	{
 		txn->txn_flags |= RBTXN_PREPARE;
 
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index e117887..7622b1d 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -165,6 +165,12 @@ struct SnapBuild
 	XLogRecPtr	start_decoding_at;
 
 	/*
+	 * In two-phase commits, if the PREPARE is prior to this LSN, then the
+	 * whole transaction needs to be replayed at COMMIT PREPARED.
+	 */
+	XLogRecPtr  snapshot_was_exported_at_lsn;
+
+	/*
 	 * Don't start decoding WAL until the "xl_running_xacts" information
 	 * indicates there are no running xids with an xid smaller than this.
 	 */
@@ -269,7 +275,8 @@ SnapBuild *
 AllocateSnapshotBuilder(ReorderBuffer *reorder,
 						TransactionId xmin_horizon,
 						XLogRecPtr start_lsn,
-						bool need_full_snapshot)
+						bool need_full_snapshot,
+						XLogRecPtr snapshot_was_exported_at_lsn)
 {
 	MemoryContext context;
 	MemoryContext oldcontext;
@@ -297,6 +304,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
 	builder->initial_xmin_horizon = xmin_horizon;
 	builder->start_decoding_at = start_lsn;
 	builder->building_full_snapshot = need_full_snapshot;
+	builder->snapshot_was_exported_at_lsn = snapshot_was_exported_at_lsn;
 
 	MemoryContextSwitchTo(oldcontext);
 
@@ -357,6 +365,15 @@ SnapBuildCurrentState(SnapBuild *builder)
 }
 
 /*
+ * Return the LSN at which the snapshot was exported
+ */
+XLogRecPtr
+SnapBuildExportLSNAt(SnapBuild *builder)
+{
+	return builder->snapshot_was_exported_at_lsn;
+}
+
+/*
  * Should the contents of transaction ending at 'ptr' be decoded?
  */
 bool
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index bab31bf..1dbb50e 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -643,6 +643,7 @@ void		ReorderBufferCommit(ReorderBuffer *, TransactionId,
 								TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
 void		ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
 										XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+										XLogRecPtr snapshot_consistency_lsn,
 										TimestampTz commit_time,
 										RepOriginId origin_id, XLogRecPtr origin_lsn,
 										char *gid, bool is_commit);
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 38a9a0b..ad8fb37 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -91,6 +91,13 @@ typedef struct ReplicationSlotPersistentData
 	 */
 	XLogRecPtr	confirmed_flush;
 
+	/*
+	 * LSN at which this slot found consistent point and snapshot exported.
+	 * This is required for two-phase transactions to decide if the whole
+	 * transaction should be replayed at COMMIT PREPARED.
+	 */
+	XLogRecPtr  snapshot_was_exported_at_lsn;
+
 	/* plugin name */
 	NameData	plugin;
 } ReplicationSlotPersistentData;
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index d9f187a..0693115 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -61,7 +61,8 @@ extern void CheckPointSnapBuild(void);
 
 extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *cache,
 										  TransactionId xmin_horizon, XLogRecPtr start_lsn,
-										  bool need_full_snapshot);
+										  bool need_full_snapshot,
+										  XLogRecPtr snapshot_was_exported_at_lsn);
 extern void FreeSnapshotBuilder(SnapBuild *cache);
 
 extern void SnapBuildSnapDecRefcount(Snapshot snap);
@@ -75,6 +76,7 @@ extern Snapshot SnapBuildGetOrBuildSnapshot(SnapBuild *builder,
 											TransactionId xid);
 
 extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr);
+extern XLogRecPtr SnapBuildExportLSNAt(SnapBuild *builder);
 
 extern void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn,
 							   TransactionId xid, int nsubxacts,
-- 
1.8.3.1

