From 129947ab2d0ba223862ed1c87be0f96b51645ba0 Mon Sep 17 00:00:00 2001
From: Ajin Cherian <ajinc@fast.au.fujitsu.com>
Date: Thu, 18 Feb 2021 20:18:16 -0500
Subject: [PATCH] Don't allow repeated decoding of prepared transactions.

Enforce full snapshot while decoding with two-phase enabled. This
allows the decoder to differentiate between prepared transaction that
were sent prior to restart and prepared transactions that were not sent
because they were prior to consistent snapshot.
---
 contrib/test_decoding/expected/twophase.out        | 38 +++++++---------------
 contrib/test_decoding/expected/twophase_stream.out | 28 ++--------------
 src/backend/replication/logical/decode.c           |  5 ++-
 src/backend/replication/logical/logical.c          |  8 +++++
 src/backend/replication/logical/reorderbuffer.c    | 15 +++++++++
 src/backend/replication/logical/snapbuild.c        |  9 +++++
 src/include/replication/reorderbuffer.h            |  1 +
 src/include/replication/snapbuild.h                |  1 +
 8 files changed, 53 insertions(+), 52 deletions(-)

diff --git a/contrib/test_decoding/expected/twophase.out b/contrib/test_decoding/expected/twophase.out
index f9f6bed..c51870f 100644
--- a/contrib/test_decoding/expected/twophase.out
+++ b/contrib/test_decoding/expected/twophase.out
@@ -33,14 +33,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
 
 COMMIT PREPARED 'test_prepared#1';
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
-                        data                        
-----------------------------------------------------
- BEGIN
- table public.test_prepared1: INSERT: id[integer]:1
- table public.test_prepared1: INSERT: id[integer]:2
- PREPARE TRANSACTION 'test_prepared#1'
+               data                
+-----------------------------------
  COMMIT PREPARED 'test_prepared#1'
-(5 rows)
+(1 row)
 
 -- Test that rollback of a prepared xact is decoded.
 BEGIN;
@@ -103,13 +99,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
 
 COMMIT PREPARED 'test_prepared#3';
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
-                                  data                                   
--------------------------------------------------------------------------
- BEGIN
- table public.test_prepared1: INSERT: id[integer]:4 data[text]:'frakbar'
- PREPARE TRANSACTION 'test_prepared#3'
+               data                
+-----------------------------------
  COMMIT PREPARED 'test_prepared#3'
-(4 rows)
+(1 row)
 
 -- make sure stuff still works
 INSERT INTO test_prepared1 VALUES (6);
@@ -159,14 +152,10 @@ RESET statement_timeout;
 COMMIT PREPARED 'test_prepared_lock';
 -- consume the commit
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
-                                   data                                    
----------------------------------------------------------------------------
- BEGIN
- table public.test_prepared1: INSERT: id[integer]:8 data[text]:'othercol'
- table public.test_prepared1: INSERT: id[integer]:9 data[text]:'othercol2'
- PREPARE TRANSACTION 'test_prepared_lock'
+                 data                 
+--------------------------------------
  COMMIT PREPARED 'test_prepared_lock'
-(5 rows)
+(1 row)
 
 -- Test savepoints and sub-xacts. Creating savepoints will create
 -- sub-xacts implicitly.
@@ -189,13 +178,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
 COMMIT PREPARED 'test_prepared_savepoint';
 -- consume the commit
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
-                            data                            
-------------------------------------------------------------
- BEGIN
- table public.test_prepared_savepoint: INSERT: a[integer]:1
- PREPARE TRANSACTION 'test_prepared_savepoint'
+                   data                    
+-------------------------------------------
  COMMIT PREPARED 'test_prepared_savepoint'
-(4 rows)
+(1 row)
 
 -- Test that a GID containing "_nodecode" gets decoded at commit prepared time.
 BEGIN;
diff --git a/contrib/test_decoding/expected/twophase_stream.out b/contrib/test_decoding/expected/twophase_stream.out
index 3acc4acd3..d54e640 100644
--- a/contrib/test_decoding/expected/twophase_stream.out
+++ b/contrib/test_decoding/expected/twophase_stream.out
@@ -60,32 +60,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-
 COMMIT PREPARED 'test1';
 --should show the COMMIT PREPARED and the other changes in the transaction
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
-                            data                             
--------------------------------------------------------------
- BEGIN
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa1'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa2'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa3'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa4'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa5'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa6'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa7'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa8'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa9'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa10'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa11'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa12'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa13'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa14'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa15'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa16'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa17'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa18'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa19'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa20'
- PREPARE TRANSACTION 'test1'
+          data           
+-------------------------
  COMMIT PREPARED 'test1'
-(23 rows)
+(1 row)
 
 -- streaming test with sub-transaction and PREPARE/COMMIT PREPARED but with
 -- filtered gid. gids with '_nodecode' will not be decoded at prepare time.
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index afa1df0..00d789d 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -789,10 +789,13 @@ DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	 * SnapBuildProcessRunningXacts. But we need to process cache
 	 * invalidations if there are any for the reasons mentioned in
 	 * DecodeCommit.
+	 *
+	 * We need to mark the transaction as prepared, so that we don't resend it on 
+	 * COMMIT PREPARED.
 	 */
 	if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id))
 	{
-		ReorderBufferSkipPrepare(ctx->reorder, xid);
+		ReorderBufferMarkPrepare(ctx->reorder, xid);
 		ReorderBufferInvalidate(ctx->reorder, xid, buf->origptr);
 		return;
 	}
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 0977aec..d98bc92 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -431,6 +431,10 @@ CreateInitDecodingContext(const char *plugin,
 		startup_cb_wrapper(ctx, &ctx->options, true);
 	MemoryContextSwitchTo(old_context);
 
+	/* If two-phase is on, then only full snapshot can be used */
+	if (ctx->twophase)
+		SetSnapBuildType(ctx->snapshot_builder, true);
+
 	ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
 
 	return ctx;
@@ -534,6 +538,10 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 
 	ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
 
+	/* If two-phase is on, then only full snapshot can be used */
+	if (ctx->twophase)
+		SetSnapBuildType(ctx->snapshot_builder, true);
+
 	ereport(LOG,
 			(errmsg("starting logical decoding for slot \"%s\"",
 					NameStr(slot->data.name)),
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 5a62ab8..4730cf0 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2638,6 +2638,21 @@ ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid)
 	txn->txn_flags |= RBTXN_SKIPPED_PREPARE;
 }
 
+/* Mark this transaction as prepared */
+void
+ReorderBufferMarkPrepare(ReorderBuffer *rb, TransactionId xid)
+{
+	ReorderBufferTXN *txn;
+
+	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
+
+	/* unknown transaction, nothing to do */
+	if (txn == NULL)
+		return;
+
+	txn->txn_flags |= RBTXN_PREPARE;
+}
+
 /*
  * Prepare a two-phase transaction.
  *
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 752cf2d..5e6899e 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -357,6 +357,15 @@ SnapBuildCurrentState(SnapBuild *builder)
 }
 
 /*
+ * Set snapshot type
+ */
+void
+SetSnapBuildType(SnapBuild *builder, bool need_full_snapshot)
+{
+	builder->building_full_snapshot = need_full_snapshot;
+}
+
+/*
  * Should the contents of transaction ending at 'ptr' be decoded?
  */
 bool
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index bab31bf..8824e60 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -676,6 +676,7 @@ bool		ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid,
 											 TimestampTz prepare_time,
 											 RepOriginId origin_id, XLogRecPtr origin_lsn);
 void		ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid);
+void		ReorderBufferMarkPrepare(ReorderBuffer *rb, TransactionId xid);
 void		ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, char *gid);
 ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *);
 TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb);
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index d9f187a..786d0d4 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -69,6 +69,7 @@ extern void SnapBuildSnapDecRefcount(Snapshot snap);
 extern Snapshot SnapBuildInitialSnapshot(SnapBuild *builder);
 extern const char *SnapBuildExportSnapshot(SnapBuild *snapstate);
 extern void SnapBuildClearExportedSnapshot(void);
+extern void SetSnapBuildType(SnapBuild *builder, bool need_full_snapshot);
 
 extern SnapBuildState SnapBuildCurrentState(SnapBuild *snapstate);
 extern Snapshot SnapBuildGetOrBuildSnapshot(SnapBuild *builder,
-- 
1.8.3.1

