From 17ff5462c3c4137796e044e95e609cbe77df673c Mon Sep 17 00:00:00 2001
From: Ajin Cherian <ajinc@fast.au.fujitsu.com>
Date: Fri, 26 Feb 2021 02:58:49 -0500
Subject: [PATCH v4] Avoid repeated decoding of prepared transactions.

Prepared transactions were decoded again after a restart on COMMIT PREPARED
when two-phase commits were enabled. This was done to avoid missing a prepared
transaction that is not part of initial snapshot. Now, this missing PREPARE is identified
by defining a new LSN called snapshot_was_exported_at_lsn and stored in the
slot and snapbuild structures. Prepared transactions that were prior this LSN
will be replayed on a COMMIT PREPARED.
---
 contrib/test_decoding/expected/twophase.out        | 38 +++++++---------------
 contrib/test_decoding/expected/twophase_stream.out | 28 ++--------------
 doc/src/sgml/catalogs.sgml                         | 11 +++++++
 doc/src/sgml/logicaldecoding.sgml                  |  9 ++---
 src/backend/catalog/system_views.sql               |  1 +
 src/backend/replication/logical/decode.c           |  2 ++
 src/backend/replication/logical/logical.c          |  3 +-
 src/backend/replication/logical/reorderbuffer.c    | 14 ++++----
 src/backend/replication/logical/snapbuild.c        | 23 ++++++++++++-
 src/backend/replication/slotfuncs.c                |  7 +++-
 src/include/catalog/pg_proc.dat                    |  6 ++--
 src/include/replication/reorderbuffer.h            |  1 +
 src/include/replication/slot.h                     |  6 ++++
 src/include/replication/snapbuild.h                |  4 ++-
 src/test/regress/expected/rules.out                |  3 +-
 15 files changed, 83 insertions(+), 73 deletions(-)

diff --git a/contrib/test_decoding/expected/twophase.out b/contrib/test_decoding/expected/twophase.out
index f9f6bed..c51870f 100644
--- a/contrib/test_decoding/expected/twophase.out
+++ b/contrib/test_decoding/expected/twophase.out
@@ -33,14 +33,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
 
 COMMIT PREPARED 'test_prepared#1';
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
-                        data                        
-----------------------------------------------------
- BEGIN
- table public.test_prepared1: INSERT: id[integer]:1
- table public.test_prepared1: INSERT: id[integer]:2
- PREPARE TRANSACTION 'test_prepared#1'
+               data                
+-----------------------------------
  COMMIT PREPARED 'test_prepared#1'
-(5 rows)
+(1 row)
 
 -- Test that rollback of a prepared xact is decoded.
 BEGIN;
@@ -103,13 +99,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
 
 COMMIT PREPARED 'test_prepared#3';
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
-                                  data                                   
--------------------------------------------------------------------------
- BEGIN
- table public.test_prepared1: INSERT: id[integer]:4 data[text]:'frakbar'
- PREPARE TRANSACTION 'test_prepared#3'
+               data                
+-----------------------------------
  COMMIT PREPARED 'test_prepared#3'
-(4 rows)
+(1 row)
 
 -- make sure stuff still works
 INSERT INTO test_prepared1 VALUES (6);
@@ -159,14 +152,10 @@ RESET statement_timeout;
 COMMIT PREPARED 'test_prepared_lock';
 -- consume the commit
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
-                                   data                                    
----------------------------------------------------------------------------
- BEGIN
- table public.test_prepared1: INSERT: id[integer]:8 data[text]:'othercol'
- table public.test_prepared1: INSERT: id[integer]:9 data[text]:'othercol2'
- PREPARE TRANSACTION 'test_prepared_lock'
+                 data                 
+--------------------------------------
  COMMIT PREPARED 'test_prepared_lock'
-(5 rows)
+(1 row)
 
 -- Test savepoints and sub-xacts. Creating savepoints will create
 -- sub-xacts implicitly.
@@ -189,13 +178,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
 COMMIT PREPARED 'test_prepared_savepoint';
 -- consume the commit
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
-                            data                            
-------------------------------------------------------------
- BEGIN
- table public.test_prepared_savepoint: INSERT: a[integer]:1
- PREPARE TRANSACTION 'test_prepared_savepoint'
+                   data                    
+-------------------------------------------
  COMMIT PREPARED 'test_prepared_savepoint'
-(4 rows)
+(1 row)
 
 -- Test that a GID containing "_nodecode" gets decoded at commit prepared time.
 BEGIN;
diff --git a/contrib/test_decoding/expected/twophase_stream.out b/contrib/test_decoding/expected/twophase_stream.out
index 3acc4acd3..d54e640 100644
--- a/contrib/test_decoding/expected/twophase_stream.out
+++ b/contrib/test_decoding/expected/twophase_stream.out
@@ -60,32 +60,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-
 COMMIT PREPARED 'test1';
 --should show the COMMIT PREPARED and the other changes in the transaction
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
-                            data                             
--------------------------------------------------------------
- BEGIN
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa1'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa2'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa3'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa4'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa5'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa6'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa7'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa8'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa9'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa10'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa11'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa12'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa13'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa14'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa15'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa16'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa17'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa18'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa19'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa20'
- PREPARE TRANSACTION 'test1'
+          data           
+-------------------------
  COMMIT PREPARED 'test1'
-(23 rows)
+(1 row)
 
 -- streaming test with sub-transaction and PREPARE/COMMIT PREPARED but with
 -- filtered gid. gids with '_nodecode' will not be decoded at prepare time.
diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index db29905..366a971 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -11479,6 +11479,17 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>snapshot_was_exported_at</structfield> <type>pg_lsn</type>
+      </para>
+      <para>
+       The address (<literal>LSN</literal>) at which the logical
+       slot found a consistent point at the time of slot creation.
+       <literal>NULL</literal> for physical slots.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>wal_status</structfield> <type>text</type>
       </para>
       <para>
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 6455664..18d592d 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -191,9 +191,6 @@ postgres=# COMMIT PREPARED 'test_prepared1';
 postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1');
     lsn    | xid |                    data                    
 -----------+-----+--------------------------------------------
- 0/1689DC0 | 529 | BEGIN 529
- 0/1689DC0 | 529 | table public.data: INSERT: id[integer]:3 data[text]:'5'
- 0/1689FC0 | 529 | PREPARE TRANSACTION 'test_prepared1', txid 529
  0/168A060 | 529 | COMMIT PREPARED 'test_prepared1', txid 529
 (4 row)
 
@@ -822,10 +819,8 @@ typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx
       <parameter>gid</parameter> field, which is part of the
       <parameter>txn</parameter> parameter, can be used in this callback to
       check if the plugin has already received this <command>PREPARE</command>
-      in which case it can skip the remaining changes of the transaction.
-      This can only happen if the user restarts the decoding after receiving
-      the <command>PREPARE</command> for a transaction but before receiving
-      the <command>COMMIT PREPARED</command>, say because of some error.
+      in which case it can either error out or skip the remaining changes of 
+      the transaction.
       <programlisting>
        typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
                                                     ReorderBufferTXN *txn);
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index fa58afd..3f94398 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -893,6 +893,7 @@ CREATE VIEW pg_replication_slots AS
             L.catalog_xmin,
             L.restart_lsn,
             L.confirmed_flush_lsn,
+			L.snapshot_was_exported_at,
             L.wal_status,
             L.safe_wal_size
     FROM pg_get_replication_slots() AS L
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index afa1df0..7f83bbb 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -716,6 +716,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	if (two_phase)
 	{
 		ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
+									SnapBuildExportAt(ctx->snapshot_builder),
 									commit_time, origin_id, origin_lsn,
 									parsed->twophase_gid, true);
 	}
@@ -854,6 +855,7 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	{
 		ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
 									abort_time, origin_id, origin_lsn,
+									InvalidXLogRecPtr,
 									parsed->twophase_gid, false);
 	}
 	else
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index baeb45f..2ea82c6 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -207,7 +207,7 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder = ReorderBufferAllocate();
 	ctx->snapshot_builder =
 		AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn,
-								need_full_snapshot);
+								need_full_snapshot, slot->data.snapshot_was_exported_at);
 
 	ctx->reorder->private_data = ctx;
 
@@ -590,6 +590,7 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
 
 	SpinLockAcquire(&slot->mutex);
 	slot->data.confirmed_flush = ctx->reader->EndRecPtr;
+	slot->data.snapshot_was_exported_at = ctx->reader->EndRecPtr;
 	SpinLockRelease(&slot->mutex);
 }
 
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index c3b9632..8aefc7e 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2625,9 +2625,9 @@ ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid,
 
 /* Remember that we have skipped prepare */
 void
-ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid)
+ReorderBufferSkipPrepare(ReorderBuffer* rb, TransactionId xid)
 {
-	ReorderBufferTXN *txn;
+	ReorderBufferTXN* txn;
 
 	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
 
@@ -2672,6 +2672,7 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
 void
 ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
 							XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+							XLogRecPtr snapshot_was_exported_at,
 							TimestampTz commit_time, RepOriginId origin_id,
 							XLogRecPtr origin_lsn, char *gid, bool is_commit)
 {
@@ -2698,12 +2699,11 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
 	/*
 	 * It is possible that this transaction is not decoded at prepare time
 	 * either because by that time we didn't have a consistent snapshot or it
-	 * was decoded earlier but we have restarted. We can't distinguish between
-	 * those two cases so we send the prepare in both the cases and let
-	 * downstream decide whether to process or skip it. We don't need to
-	 * decode the xact for aborts if it is not done already.
+	 * was decoded earlier but we have restarted. We only need to send the
+	 * prepare if it was not decoded earlier. We don't need to decode the xact
+	 * for aborts if it is not done already.
 	 */
-	if (!rbtxn_prepared(txn) && is_commit)
+	if ((txn->final_lsn < snapshot_was_exported_at) && is_commit)
 	{
 		txn->txn_flags |= RBTXN_PREPARE;
 
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index e117887..fe486c4 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -165,6 +165,16 @@ struct SnapBuild
 	XLogRecPtr	start_decoding_at;
 
 	/*
+	 * LSN at which we found a consistent point at the time of slot creation.
+	 * This is also the point where we have exported snapshot for initial copy.
+	 *
+	 * The prepared transactions that are not covered by initial snapshot needs
+	 * to be sent later along with commit prepared and they must be before this
+	 * point.
+	 */
+	XLogRecPtr  snapshot_was_exported_at;
+
+	/*
 	 * Don't start decoding WAL until the "xl_running_xacts" information
 	 * indicates there are no running xids with an xid smaller than this.
 	 */
@@ -269,7 +279,8 @@ SnapBuild *
 AllocateSnapshotBuilder(ReorderBuffer *reorder,
 						TransactionId xmin_horizon,
 						XLogRecPtr start_lsn,
-						bool need_full_snapshot)
+						bool need_full_snapshot,
+						XLogRecPtr snapshot_was_exported_at)
 {
 	MemoryContext context;
 	MemoryContext oldcontext;
@@ -297,6 +308,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
 	builder->initial_xmin_horizon = xmin_horizon;
 	builder->start_decoding_at = start_lsn;
 	builder->building_full_snapshot = need_full_snapshot;
+	builder->snapshot_was_exported_at = snapshot_was_exported_at;
 
 	MemoryContextSwitchTo(oldcontext);
 
@@ -357,6 +369,15 @@ SnapBuildCurrentState(SnapBuild *builder)
 }
 
 /*
+ * Return the LSN at which the snapshot was exported
+ */
+XLogRecPtr
+SnapBuildExportAt(SnapBuild *builder)
+{
+	return builder->snapshot_was_exported_at;
+}
+
+/*
  * Should the contents of transaction ending at 'ptr' be decoded?
  */
 bool
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index d24bb5b..f5efdff 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -236,7 +236,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 13
+#define PG_GET_REPLICATION_SLOTS_COLS 14
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	TupleDesc	tupdesc;
 	Tuplestorestate *tupstore;
@@ -344,6 +344,11 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		else
 			nulls[i++] = true;
 
+		if (slot_contents.data.snapshot_was_exported_at != InvalidXLogRecPtr)
+			values[i++] = LSNGetDatum(slot_contents.data.snapshot_was_exported_at);
+		else
+			nulls[i++] = true;
+
 		/*
 		 * If invalidated_at is valid and restart_lsn is invalid, we know for
 		 * certain that the slot has been invalidated.  Otherwise, test
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 1604412..e1e4f3e 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -10496,9 +10496,9 @@
   proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', prorettype => 'record',
   proargtypes => '',
-  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size}',
+  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,pg_lsn,text,int8}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,snapshot_was_exported_at,wal_status,safe_wal_size}',
   prosrc => 'pg_get_replication_slots' },
 { oid => '3786', descr => 'set up a logical replication slot',
   proname => 'pg_create_logical_replication_slot', provolatile => 'v',
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index bab31bf..1dbb50e 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -643,6 +643,7 @@ void		ReorderBufferCommit(ReorderBuffer *, TransactionId,
 								TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
 void		ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
 										XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+										XLogRecPtr snapshot_consistency_lsn,
 										TimestampTz commit_time,
 										RepOriginId origin_id, XLogRecPtr origin_lsn,
 										char *gid, bool is_commit);
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 38a9a0b..5764293 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -91,6 +91,12 @@ typedef struct ReplicationSlotPersistentData
 	 */
 	XLogRecPtr	confirmed_flush;
 
+	/*
+	 * LSN at which we found a consistent point at the time of slot creation.
+	 * This is also the point where we have exported snapshot for initial copy.
+	 */
+	XLogRecPtr  snapshot_was_exported_at;
+
 	/* plugin name */
 	NameData	plugin;
 } ReplicationSlotPersistentData;
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index d9f187a..f15ac66 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -61,7 +61,8 @@ extern void CheckPointSnapBuild(void);
 
 extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *cache,
 										  TransactionId xmin_horizon, XLogRecPtr start_lsn,
-										  bool need_full_snapshot);
+										  bool need_full_snapshot,
+										  XLogRecPtr snapshot_was_exported_at);
 extern void FreeSnapshotBuilder(SnapBuild *cache);
 
 extern void SnapBuildSnapDecRefcount(Snapshot snap);
@@ -75,6 +76,7 @@ extern Snapshot SnapBuildGetOrBuildSnapshot(SnapBuild *builder,
 											TransactionId xid);
 
 extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr);
+extern XLogRecPtr SnapBuildExportAt(SnapBuild *builder);
 
 extern void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn,
 							   TransactionId xid, int nsubxacts,
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 10a1f34..10647d4 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1476,9 +1476,10 @@ pg_replication_slots| SELECT l.slot_name,
     l.catalog_xmin,
     l.restart_lsn,
     l.confirmed_flush_lsn,
+    l.snapshot_was_exported_at,
     l.wal_status,
     l.safe_wal_size
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size)
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, snapshot_was_exported_at, wal_status, safe_wal_size)
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,
-- 
1.8.3.1

