From 42e2cc8a1cdc92d0bca9f83fabc511670bee6bad Mon Sep 17 00:00:00 2001
From: Amit Kapila <akapila@postgresql.org>
Date: Tue, 30 Jun 2020 11:49:40 +0530
Subject: [PATCH v31 4/4] Gracefully handle concurrent aborts of transactions
 being decoded.

When decoding committed transactions this is not an issue, and we never
decode transactions that abort before the decoding starts.

But for an upcoming patch that allows decoding of in-progress
transactions, this may cause failures when the output plugin consults
catalogs (both system and user-defined).

We handle such failures by returning ERRCODE_TRANSACTION_ROLLBACK
sqlerrcode from system table scan APIs to the backend or WALSender
decoding a specific uncommitted transaction. The decoding logic on the
receipt of such an sqlerrcode aborts the ongoing decoding and returns
gracefully.

Author: Dilip Kumar, Nikhil Sontakke, Amit Kapila
Reviewed-by: Amit Kapila
Tested-by: Neha Sharma and Mahendra Singh Thalor
Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com
---
 doc/src/sgml/logicaldecoding.sgml         |  9 +++--
 src/backend/access/heap/heapam.c          | 10 ++++++
 src/backend/access/index/genam.c          | 53 +++++++++++++++++++++++++++++
 src/backend/access/table/tableam.c        |  8 +++++
 src/backend/access/transam/xact.c         | 19 +++++++++++
 src/backend/replication/logical/logical.c | 10 ++++++
 src/include/access/tableam.h              | 55 +++++++++++++++++++++++++++++++
 src/include/access/xact.h                 |  4 +++
 src/include/replication/logical.h         |  1 +
 9 files changed, 166 insertions(+), 3 deletions(-)

diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 18116c8..98b47b0 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -433,9 +433,12 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
 ALTER TABLE user_catalog_table SET (user_catalog_table = true);
 CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
 </programlisting>
-     Any actions leading to transaction ID assignment are prohibited. That, among others,
-     includes writing to tables, performing DDL changes, and
-     calling <literal>pg_current_xact_id()</literal>.
+     Note that access to user catalog tables or regular system catalog tables
+     in the output plugins has to be done via the <literal>systable_*</literal>
+     scan APIs only. Access via the <literal>heap_*</literal> scan APIs will
+     error out. Additionally, any actions leading to transaction ID assignment
+     are prohibited. That, among others, includes writing to tables, performing
+     DDL changes, and calling <literal>pg_current_xact_id()</literal>.
     </para>
    </sect2>
 
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 537913d..0022b31 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -1288,6 +1288,16 @@ heap_getnext(TableScanDesc sscan, ScanDirection direction)
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 				 errmsg_internal("only heap AM is supported")));
 
+	/*
+	 * We don't expect direct calls to heap_getnext with valid CheckXidAlive
+	 * for catalog or regular tables.  See detailed comments in xact.c where
+	 * these variables are declared.  Normally we have such a check at tableam
+	 * level API but this is called from many places so we need to ensure it
+	 * here.
+	 */
+	if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
+		elog(ERROR, "unexpected heap_getnext call during logical decoding");
+
 	/* Note: no locking manipulations needed */
 
 	if (scan->rs_base.rs_flags & SO_ALLOW_PAGEMODE)
diff --git a/src/backend/access/index/genam.c b/src/backend/access/index/genam.c
index dfba5ae..9d9a70a 100644
--- a/src/backend/access/index/genam.c
+++ b/src/backend/access/index/genam.c
@@ -28,6 +28,7 @@
 #include "lib/stringinfo.h"
 #include "miscadmin.h"
 #include "storage/bufmgr.h"
+#include "storage/procarray.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
@@ -429,10 +430,37 @@ systable_beginscan(Relation heapRelation,
 		sysscan->iscan = NULL;
 	}
 
+	/*
+	 * If CheckXidAlive is set then set a flag to indicate that system table
+	 * scan is in-progress.  See detailed comments in xact.c where these
+	 * variables are declared.
+	 */
+	if (TransactionIdIsValid(CheckXidAlive))
+		bsysscan = true;
+
 	return sysscan;
 }
 
 /*
+ * HandleConcurrentAbort - Handle concurrent abort of the CheckXidAlive.
+ *
+ * Error out, if CheckXidAlive is aborted. We can't directly use
+ * TransactionIdDidAbort as after crash such transaction might not have been
+ * marked as aborted.  See detailed comments in xact.c where the variable
+ * is declared.
+ */
+static inline void
+HandleConcurrentAbort()
+{
+	if (TransactionIdIsValid(CheckXidAlive) &&
+			!TransactionIdIsInProgress(CheckXidAlive) &&
+			!TransactionIdDidCommit(CheckXidAlive))
+			ereport(ERROR,
+				(errcode(ERRCODE_TRANSACTION_ROLLBACK),
+				 errmsg("transaction aborted during system catalog scan")));
+}
+
+/*
  * systable_getnext --- get next tuple in a heap-or-index scan
  *
  * Returns NULL if no more tuples available.
@@ -481,6 +509,12 @@ systable_getnext(SysScanDesc sysscan)
 		}
 	}
 
+	/*
+	 * Handle the concurrent abort while fetching the catalog tuple during
+	 * logical streaming of a transaction.
+	 */
+	HandleConcurrentAbort();
+
 	return htup;
 }
 
@@ -517,6 +551,12 @@ systable_recheck_tuple(SysScanDesc sysscan, HeapTuple tup)
 											sysscan->slot,
 											freshsnap);
 
+	/*
+	 * Handle the concurrent abort while fetching the catalog tuple during
+	 * logical streaming of a transaction.
+	 */
+	HandleConcurrentAbort();
+
 	return result;
 }
 
@@ -545,6 +585,13 @@ systable_endscan(SysScanDesc sysscan)
 	if (sysscan->snapshot)
 		UnregisterSnapshot(sysscan->snapshot);
 
+	/*
+	 * Reset the sysbegin_called flag at the end of the systable scan.  See
+	 * detailed comments in xact.c where these variables are declared.
+	 */
+	if (TransactionIdIsValid(CheckXidAlive))
+		bsysscan = false;
+
 	pfree(sysscan);
 }
 
@@ -643,6 +690,12 @@ systable_getnext_ordered(SysScanDesc sysscan, ScanDirection direction)
 	if (htup && sysscan->iscan->xs_recheck)
 		elog(ERROR, "system catalog scans with lossy index conditions are not implemented");
 
+	/*
+	 * Handle the concurrent abort while fetching the catalog tuple during
+	 * logical streaming of a transaction.
+	 */
+	HandleConcurrentAbort();
+
 	return htup;
 }
 
diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c
index 4b2bb29..a61e279 100644
--- a/src/backend/access/table/tableam.c
+++ b/src/backend/access/table/tableam.c
@@ -235,6 +235,14 @@ table_tuple_get_latest_tid(TableScanDesc scan, ItemPointer tid)
 	const TableAmRoutine *tableam = rel->rd_tableam;
 
 	/*
+	 * We don't expect direct calls to table_tuple_get_latest_tid with valid
+	 * CheckXidAlive for catalog or regular tables.  See detailed comments in
+	 * xact.c where these variables are declared.
+	 */
+	if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
+		elog(ERROR, "unexpected table_tuple_get_latest_tid call during logical decoding");
+
+	/*
 	 * Since this can be called with user-supplied TID, don't trust the input
 	 * too much.
 	 */
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index d93b40f..c7f1877 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -83,6 +83,19 @@ bool		XactDeferrable;
 int			synchronous_commit = SYNCHRONOUS_COMMIT_ON;
 
 /*
+ * CheckXidAlive is a xid value pointing to a possibly ongoing (sub)
+ * transaction.  Currently, it is used in logical decoding.  It's possible
+ * that such transactions can get aborted while the decoding is ongoing in
+ * which case we skip decoding that particular transaction.  To ensure that we
+ * check whether the CheckXidAlive is aborted after fetching the tuple from
+ * system tables.  We also ensure that during logical decoding we never
+ * directly access the tableam or heap APIs because we are checking for the
+ * concurrent aborts only in systable_* APIs.
+ */
+TransactionId CheckXidAlive = InvalidTransactionId;
+bool		bsysscan = false;
+
+/*
  * When running as a parallel worker, we place only a single
  * TransactionStateData on the parallel worker's state stack, and the XID
  * reflected there will be that of the *innermost* currently-active
@@ -2670,6 +2683,9 @@ AbortTransaction(void)
 	/* Forget about any active REINDEX. */
 	ResetReindexState(s->nestingLevel);
 
+	/* Reset logical streaming state. */
+	ResetLogicalStreamingState();
+
 	/* If in parallel mode, clean up workers and exit parallel mode. */
 	if (IsInParallelMode())
 	{
@@ -4972,6 +4988,9 @@ AbortSubTransaction(void)
 	/* Forget about any active REINDEX. */
 	ResetReindexState(s->nestingLevel);
 
+	/* Reset logical streaming state. */
+	ResetLogicalStreamingState();
+
 	/* Exit from parallel mode, if necessary. */
 	if (IsInParallelMode())
 	{
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 6ee59bd..8deff89 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1442,3 +1442,13 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 		SpinLockRelease(&MyReplicationSlot->mutex);
 	}
 }
+
+/*
+ * Clear logical streaming state during (sub)transaction abort.
+ */
+void
+ResetLogicalStreamingState(void)
+{
+	CheckXidAlive = InvalidTransactionId;
+	bsysscan = false;
+}
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index b3d2a6d..acb6c38 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -19,6 +19,7 @@
 
 #include "access/relscan.h"
 #include "access/sdir.h"
+#include "access/xact.h"
 #include "utils/guc.h"
 #include "utils/rel.h"
 #include "utils/snapshot.h"
@@ -903,6 +904,15 @@ static inline bool
 table_scan_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot)
 {
 	slot->tts_tableOid = RelationGetRelid(sscan->rs_rd);
+
+	/*
+	 * We don't expect direct calls to table_scan_getnextslot with valid
+	 * CheckXidAlive for catalog or regular tables.  See detailed comments in
+	 * xact.c where these variables are declared.
+	 */
+	if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
+		elog(ERROR, "unexpected table_scan_getnextslot call during logical decoding");
+
 	return sscan->rs_rd->rd_tableam->scan_getnextslot(sscan, direction, slot);
 }
 
@@ -1017,6 +1027,13 @@ table_index_fetch_tuple(struct IndexFetchTableData *scan,
 						TupleTableSlot *slot,
 						bool *call_again, bool *all_dead)
 {
+	/*
+	 * We don't expect direct calls to table_index_fetch_tuple with valid
+	 * CheckXidAlive for catalog or regular tables.  See detailed comments in
+	 * xact.c where these variables are declared.
+	 */
+	if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
+		elog(ERROR, "unexpected table_index_fetch_tuple call during logical decoding");
 
 	return scan->rel->rd_tableam->index_fetch_tuple(scan, tid, snapshot,
 													slot, call_again,
@@ -1056,6 +1073,14 @@ table_tuple_fetch_row_version(Relation rel,
 							  Snapshot snapshot,
 							  TupleTableSlot *slot)
 {
+	/*
+	 * We don't expect direct calls to table_tuple_fetch_row_version with
+	 * valid CheckXidAlive for catalog or regular tables.  See detailed
+	 * comments in xact.c where these variables are declared.
+	 */
+	if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
+		elog(ERROR, "unexpected table_tuple_fetch_row_version call during logical decoding");
+
 	return rel->rd_tableam->tuple_fetch_row_version(rel, tid, snapshot, slot);
 }
 
@@ -1712,6 +1737,14 @@ static inline bool
 table_scan_bitmap_next_block(TableScanDesc scan,
 							 struct TBMIterateResult *tbmres)
 {
+	/*
+	 * We don't expect direct calls to table_scan_bitmap_next_block with valid
+	 * CheckXidAlive for catalog or regular tables.  See detailed comments in
+	 * xact.c where these variables are declared.
+	 */
+	if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
+		elog(ERROR, "unexpected table_scan_bitmap_next_block call during logical decoding");
+
 	return scan->rs_rd->rd_tableam->scan_bitmap_next_block(scan,
 														   tbmres);
 }
@@ -1729,6 +1762,14 @@ table_scan_bitmap_next_tuple(TableScanDesc scan,
 							 struct TBMIterateResult *tbmres,
 							 TupleTableSlot *slot)
 {
+	/*
+	 * We don't expect direct calls to table_scan_bitmap_next_tuple with valid
+	 * CheckXidAlive for catalog or regular tables.  See detailed comments in
+	 * xact.c where these variables are declared.
+	 */
+	if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
+		elog(ERROR, "unexpected table_scan_bitmap_next_tuple call during logical decoding");
+
 	return scan->rs_rd->rd_tableam->scan_bitmap_next_tuple(scan,
 														   tbmres,
 														   slot);
@@ -1747,6 +1788,13 @@ static inline bool
 table_scan_sample_next_block(TableScanDesc scan,
 							 struct SampleScanState *scanstate)
 {
+	/*
+	 * We don't expect direct calls to table_scan_sample_next_block with valid
+	 * CheckXidAlive for catalog or regular tables.  See detailed comments in
+	 * xact.c where these variables are declared.
+	 */
+	if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
+		elog(ERROR, "unexpected table_scan_sample_next_block call during logical decoding");
 	return scan->rs_rd->rd_tableam->scan_sample_next_block(scan, scanstate);
 }
 
@@ -1763,6 +1811,13 @@ table_scan_sample_next_tuple(TableScanDesc scan,
 							 struct SampleScanState *scanstate,
 							 TupleTableSlot *slot)
 {
+	/*
+	 * We don't expect direct calls to table_scan_sample_next_tuple with valid
+	 * CheckXidAlive for catalog or regular tables.  See detailed comments in
+	 * xact.c where these variables are declared.
+	 */
+	if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
+		elog(ERROR, "unexpected table_scan_sample_next_tuple call during logical decoding");
 	return scan->rs_rd->rd_tableam->scan_sample_next_tuple(scan, scanstate,
 														   slot);
 }
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index ac3f5e3..5f767eb 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -81,6 +81,10 @@ typedef enum
 /* Synchronous commit level */
 extern int	synchronous_commit;
 
+/* used during logical streaming of a transaction */
+extern TransactionId CheckXidAlive;
+extern bool bsysscan;
+
 /*
  * Miscellaneous flag bits to record events which occur on the top level
  * transaction. These flags are only persisted in MyXactFlags and are intended
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index deef318..b0fae98 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -121,5 +121,6 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
 extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
 
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
+extern void ResetLogicalStreamingState(void);
 
 #endif
-- 
1.8.3.1

