From aea5b0bc00cb4c282b3e659c492b825880c2e8e6 Mon Sep 17 00:00:00 2001
From: Ajin Cherian <ajinc@fast.au.fujitsu.com>
Date: Fri, 26 Feb 2021 02:58:49 -0500
Subject: [PATCH v6 1/2] Avoid repeated decoding of prepared transactions after
 the restart.

In commit a271a1b50e, we allowed decoding at prepare time and the prepare
was decoded again if there is a restart after decoding it. It was done
that way because we can't distinguish between the cases where we have not
decoded the prepare because it was prior to consistent snapshot or we have
decoded it earlier but restarted. To distinguish between these two cases,
we have introduced an initial_consisten_point at the slot level which is
an LSN at which we found a consistent point at the time of slot creation.
This is also the point where we have exported a snapshot for the initial
copy. So, prepare transaction prior to this point are sent along with
commit prepared.

This commit bumps SNAPBUILD_VERSION because of change in SnapBuild. It
will break existing slots which is fine in a major release.

Author: Ajin Cherian, based on idea by Andres Freund
Reviewed-by: Amit Kapila and Vignesh C
Discussion: https://postgr.es/m/d0f60d60-133d-bf8d-bd70-47784d8fabf3@enterprisedb.com
---
 contrib/test_decoding/expected/twophase.out        | 38 +++++++---------------
 contrib/test_decoding/expected/twophase_stream.out | 28 ++--------------
 doc/src/sgml/logicaldecoding.sgml                  |  9 ++---
 src/backend/replication/logical/decode.c           |  2 ++
 src/backend/replication/logical/logical.c          |  3 +-
 src/backend/replication/logical/reorderbuffer.c    | 10 +++---
 src/backend/replication/logical/snapbuild.c        | 26 +++++++++++++--
 src/include/replication/reorderbuffer.h            |  1 +
 src/include/replication/slot.h                     |  7 ++++
 src/include/replication/snapbuild.h                |  4 ++-
 10 files changed, 61 insertions(+), 67 deletions(-)

diff --git a/contrib/test_decoding/expected/twophase.out b/contrib/test_decoding/expected/twophase.out
index f9f6bed..c51870f 100644
--- a/contrib/test_decoding/expected/twophase.out
+++ b/contrib/test_decoding/expected/twophase.out
@@ -33,14 +33,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
 
 COMMIT PREPARED 'test_prepared#1';
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
-                        data                        
-----------------------------------------------------
- BEGIN
- table public.test_prepared1: INSERT: id[integer]:1
- table public.test_prepared1: INSERT: id[integer]:2
- PREPARE TRANSACTION 'test_prepared#1'
+               data                
+-----------------------------------
  COMMIT PREPARED 'test_prepared#1'
-(5 rows)
+(1 row)
 
 -- Test that rollback of a prepared xact is decoded.
 BEGIN;
@@ -103,13 +99,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
 
 COMMIT PREPARED 'test_prepared#3';
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
-                                  data                                   
--------------------------------------------------------------------------
- BEGIN
- table public.test_prepared1: INSERT: id[integer]:4 data[text]:'frakbar'
- PREPARE TRANSACTION 'test_prepared#3'
+               data                
+-----------------------------------
  COMMIT PREPARED 'test_prepared#3'
-(4 rows)
+(1 row)
 
 -- make sure stuff still works
 INSERT INTO test_prepared1 VALUES (6);
@@ -159,14 +152,10 @@ RESET statement_timeout;
 COMMIT PREPARED 'test_prepared_lock';
 -- consume the commit
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
-                                   data                                    
----------------------------------------------------------------------------
- BEGIN
- table public.test_prepared1: INSERT: id[integer]:8 data[text]:'othercol'
- table public.test_prepared1: INSERT: id[integer]:9 data[text]:'othercol2'
- PREPARE TRANSACTION 'test_prepared_lock'
+                 data                 
+--------------------------------------
  COMMIT PREPARED 'test_prepared_lock'
-(5 rows)
+(1 row)
 
 -- Test savepoints and sub-xacts. Creating savepoints will create
 -- sub-xacts implicitly.
@@ -189,13 +178,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
 COMMIT PREPARED 'test_prepared_savepoint';
 -- consume the commit
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
-                            data                            
-------------------------------------------------------------
- BEGIN
- table public.test_prepared_savepoint: INSERT: a[integer]:1
- PREPARE TRANSACTION 'test_prepared_savepoint'
+                   data                    
+-------------------------------------------
  COMMIT PREPARED 'test_prepared_savepoint'
-(4 rows)
+(1 row)
 
 -- Test that a GID containing "_nodecode" gets decoded at commit prepared time.
 BEGIN;
diff --git a/contrib/test_decoding/expected/twophase_stream.out b/contrib/test_decoding/expected/twophase_stream.out
index 3acc4acd3..d54e640 100644
--- a/contrib/test_decoding/expected/twophase_stream.out
+++ b/contrib/test_decoding/expected/twophase_stream.out
@@ -60,32 +60,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-
 COMMIT PREPARED 'test1';
 --should show the COMMIT PREPARED and the other changes in the transaction
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
-                            data                             
--------------------------------------------------------------
- BEGIN
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa1'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa2'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa3'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa4'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa5'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa6'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa7'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa8'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa9'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa10'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa11'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa12'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa13'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa14'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa15'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa16'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa17'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa18'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa19'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa20'
- PREPARE TRANSACTION 'test1'
+          data           
+-------------------------
  COMMIT PREPARED 'test1'
-(23 rows)
+(1 row)
 
 -- streaming test with sub-transaction and PREPARE/COMMIT PREPARED but with
 -- filtered gid. gids with '_nodecode' will not be decoded at prepare time.
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 6455664..18d592d 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -191,9 +191,6 @@ postgres=# COMMIT PREPARED 'test_prepared1';
 postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1');
     lsn    | xid |                    data                    
 -----------+-----+--------------------------------------------
- 0/1689DC0 | 529 | BEGIN 529
- 0/1689DC0 | 529 | table public.data: INSERT: id[integer]:3 data[text]:'5'
- 0/1689FC0 | 529 | PREPARE TRANSACTION 'test_prepared1', txid 529
  0/168A060 | 529 | COMMIT PREPARED 'test_prepared1', txid 529
 (4 row)
 
@@ -822,10 +819,8 @@ typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx
       <parameter>gid</parameter> field, which is part of the
       <parameter>txn</parameter> parameter, can be used in this callback to
       check if the plugin has already received this <command>PREPARE</command>
-      in which case it can skip the remaining changes of the transaction.
-      This can only happen if the user restarts the decoding after receiving
-      the <command>PREPARE</command> for a transaction but before receiving
-      the <command>COMMIT PREPARED</command>, say because of some error.
+      in which case it can either error out or skip the remaining changes of 
+      the transaction.
       <programlisting>
        typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
                                                     ReorderBufferTXN *txn);
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index afa1df0..423188d 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -716,6 +716,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	if (two_phase)
 	{
 		ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
+									SnapBuildInitialConsistentPoint(ctx->snapshot_builder),
 									commit_time, origin_id, origin_lsn,
 									parsed->twophase_gid, true);
 	}
@@ -854,6 +855,7 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	{
 		ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
 									abort_time, origin_id, origin_lsn,
+									InvalidXLogRecPtr,
 									parsed->twophase_gid, false);
 	}
 	else
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index baeb45f..3f6d723 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -207,7 +207,7 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder = ReorderBufferAllocate();
 	ctx->snapshot_builder =
 		AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn,
-								need_full_snapshot);
+								need_full_snapshot, slot->data.initial_consistent_point);
 
 	ctx->reorder->private_data = ctx;
 
@@ -590,6 +590,7 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
 
 	SpinLockAcquire(&slot->mutex);
 	slot->data.confirmed_flush = ctx->reader->EndRecPtr;
+	slot->data.initial_consistent_point = ctx->reader->EndRecPtr;
 	SpinLockRelease(&slot->mutex);
 }
 
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index c3b9632..91600ac 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2672,6 +2672,7 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
 void
 ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
 							XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+							XLogRecPtr initial_consistent_point,
 							TimestampTz commit_time, RepOriginId origin_id,
 							XLogRecPtr origin_lsn, char *gid, bool is_commit)
 {
@@ -2698,12 +2699,11 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
 	/*
 	 * It is possible that this transaction is not decoded at prepare time
 	 * either because by that time we didn't have a consistent snapshot or it
-	 * was decoded earlier but we have restarted. We can't distinguish between
-	 * those two cases so we send the prepare in both the cases and let
-	 * downstream decide whether to process or skip it. We don't need to
-	 * decode the xact for aborts if it is not done already.
+	 * was decoded earlier but we have restarted. We only need to send the
+	 * prepare if it was not decoded earlier. We don't need to decode the xact
+	 * for aborts if it is not done already.
 	 */
-	if (!rbtxn_prepared(txn) && is_commit)
+	if ((txn->final_lsn < initial_consistent_point) && is_commit)
 	{
 		txn->txn_flags |= RBTXN_PREPARE;
 
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index e117887..c42005e 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -165,6 +165,17 @@ struct SnapBuild
 	XLogRecPtr	start_decoding_at;
 
 	/*
+	 * LSN at which we found a consistent point at the time of slot creation.
+	 * This is also the point where we have exported a snapshot for the
+	 * initial copy.
+	 *
+	 * The prepared transactions that are not covered by initial snapshot
+	 * needs to be sent later along with commit prepared and they must be
+	 * before this point.
+	 */
+	XLogRecPtr	initial_consistent_point;
+
+	/*
 	 * Don't start decoding WAL until the "xl_running_xacts" information
 	 * indicates there are no running xids with an xid smaller than this.
 	 */
@@ -269,7 +280,8 @@ SnapBuild *
 AllocateSnapshotBuilder(ReorderBuffer *reorder,
 						TransactionId xmin_horizon,
 						XLogRecPtr start_lsn,
-						bool need_full_snapshot)
+						bool need_full_snapshot,
+						XLogRecPtr initial_consistent_point)
 {
 	MemoryContext context;
 	MemoryContext oldcontext;
@@ -297,6 +309,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
 	builder->initial_xmin_horizon = xmin_horizon;
 	builder->start_decoding_at = start_lsn;
 	builder->building_full_snapshot = need_full_snapshot;
+	builder->initial_consistent_point = initial_consistent_point;
 
 	MemoryContextSwitchTo(oldcontext);
 
@@ -357,6 +370,15 @@ SnapBuildCurrentState(SnapBuild *builder)
 }
 
 /*
+ * Return the LSN at which the snapshot was exported
+ */
+XLogRecPtr
+SnapBuildInitialConsistentPoint(SnapBuild *builder)
+{
+	return builder->initial_consistent_point;
+}
+
+/*
  * Should the contents of transaction ending at 'ptr' be decoded?
  */
 bool
@@ -1422,7 +1444,7 @@ typedef struct SnapBuildOnDisk
 	offsetof(SnapBuildOnDisk, version)
 
 #define SNAPBUILD_MAGIC 0x51A1E001
-#define SNAPBUILD_VERSION 3
+#define SNAPBUILD_VERSION 4 
 
 /*
  * Store/Load a snapshot from disk, depending on the snapshot builder's state.
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index bab31bf..565a961 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -643,6 +643,7 @@ void		ReorderBufferCommit(ReorderBuffer *, TransactionId,
 								TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
 void		ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
 										XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+										XLogRecPtr initial_consistent_point,
 										TimestampTz commit_time,
 										RepOriginId origin_id, XLogRecPtr origin_lsn,
 										char *gid, bool is_commit);
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 38a9a0b..5c3fde2 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -91,6 +91,13 @@ typedef struct ReplicationSlotPersistentData
 	 */
 	XLogRecPtr	confirmed_flush;
 
+	/*
+	 * LSN at which we found a consistent point at the time of slot creation.
+	 * This is also the point where we have exported a snapshot for the
+	 * initial copy.
+	 */
+	XLogRecPtr	initial_consistent_point;
+
 	/* plugin name */
 	NameData	plugin;
 } ReplicationSlotPersistentData;
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index d9f187a..fbabce6 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -61,7 +61,8 @@ extern void CheckPointSnapBuild(void);
 
 extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *cache,
 										  TransactionId xmin_horizon, XLogRecPtr start_lsn,
-										  bool need_full_snapshot);
+										  bool need_full_snapshot,
+										  XLogRecPtr initial_consistent_point);
 extern void FreeSnapshotBuilder(SnapBuild *cache);
 
 extern void SnapBuildSnapDecRefcount(Snapshot snap);
@@ -75,6 +76,7 @@ extern Snapshot SnapBuildGetOrBuildSnapshot(SnapBuild *builder,
 											TransactionId xid);
 
 extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr);
+extern XLogRecPtr SnapBuildInitialConsistentPoint(SnapBuild *builder);
 
 extern void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn,
 							   TransactionId xid, int nsubxacts,
-- 
1.8.3.1

