From ae3312b8669246dc425179e083b61602d197dfc6 Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Fri, 10 Mar 2023 09:03:15 +1100
Subject: [PATCH v1] Add macros for ReorderBufferTXN toptxn.

Make toptxn related code more readable by introducing some simple macros.
---
 contrib/test_decoding/test_decoding.c           |  4 +--
 src/backend/replication/logical/reorderbuffer.c | 38 +++++++++++--------------
 src/backend/replication/pgoutput/pgoutput.c     |  6 ++--
 src/include/replication/reorderbuffer.h         |  3 ++
 4 files changed, 24 insertions(+), 27 deletions(-)

diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index b7e6048..d8710ee 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -815,11 +815,11 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
 	 * maintain the output_plugin_private only under the toptxn so if this is
 	 * not the toptxn then fetch the toptxn.
 	 */
-	ReorderBufferTXN *toptxn = txn->toptxn ? txn->toptxn : txn;
+	ReorderBufferTXN *toptxn = get_toptxn(txn);
 	TestDecodingTxnData *txndata = toptxn->output_plugin_private;
 	bool		xact_wrote_changes = txndata->xact_wrote_changes;
 
-	if (txn->toptxn == NULL)
+	if (isa_toptxn(txn))
 	{
 		Assert(txn->output_plugin_private != NULL);
 		pfree(txndata);
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 2d17c55..3e2edc9 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -717,10 +717,7 @@ ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		return;
 
 	/* Get the top transaction. */
-	if (txn->toptxn != NULL)
-		toptxn = txn->toptxn;
-	else
-		toptxn = txn;
+	toptxn = get_toptxn(txn);
 
 	/*
 	 * Indicate a partial change for toast inserts.  The change will be
@@ -812,10 +809,7 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
 		ReorderBufferTXN *toptxn;
 
 		/* get the top transaction */
-		if (txn->toptxn != NULL)
-			toptxn = txn->toptxn;
-		else
-			toptxn = txn;
+		toptxn = get_toptxn(txn);
 
 		toptxn->txn_flags |= RBTXN_HAS_STREAMABLE_CHANGE;
 	}
@@ -1667,7 +1661,7 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
 	 * about the toplevel xact (we send the XID in all messages), but we never
 	 * stream XIDs of empty subxacts.
 	 */
-	if ((!txn_prepared) && ((!txn->toptxn) || (txn->nentries_mem != 0)))
+	if ((!txn_prepared) && (isa_toptxn(txn) || (txn->nentries_mem != 0)))
 		txn->txn_flags |= RBTXN_IS_STREAMED;
 
 	if (txn_prepared)
@@ -3207,10 +3201,7 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 	 * Update the total size in top level as well. This is later used to
 	 * compute the decoding stats.
 	 */
-	if (txn->toptxn != NULL)
-		toptxn = txn->toptxn;
-	else
-		toptxn = txn;
+	toptxn = get_toptxn(txn);
 
 	if (addition)
 	{
@@ -3295,8 +3286,8 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
 	 * so that we can execute them all together.  See comments atop this
 	 * function.
 	 */
-	if (txn->toptxn)
-		txn = txn->toptxn;
+	if (isa_subtxn(txn))
+		txn = get_toptxn(txn);
 
 	Assert(nmsgs > 0);
 
@@ -3354,7 +3345,6 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
 								  XLogRecPtr lsn)
 {
 	ReorderBufferTXN *txn;
-	ReorderBufferTXN *toptxn;
 
 	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
 
@@ -3370,11 +3360,15 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
 	 * conveniently check just top-level transaction and decide whether to
 	 * build the hash table or not.
 	 */
-	toptxn = txn->toptxn;
-	if (toptxn != NULL && !rbtxn_has_catalog_changes(toptxn))
+	if (isa_subtxn(txn))
 	{
-		toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
-		dclist_push_tail(&rb->catchange_txns, &toptxn->catchange_node);
+		ReorderBufferTXN *toptxn = get_toptxn(txn);
+
+		if (!rbtxn_has_catalog_changes(toptxn))
+		{
+			toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
+			dclist_push_tail(&rb->catchange_txns, &toptxn->catchange_node);
+		}
 	}
 }
 
@@ -3619,7 +3613,7 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
 			(txn = ReorderBufferLargestStreamableTopTXN(rb)) != NULL)
 		{
 			/* we know there has to be one, because the size is not zero */
-			Assert(txn && !txn->toptxn);
+			Assert(txn && isa_toptxn(txn));
 			Assert(txn->total_size > 0);
 			Assert(rb->size >= txn->total_size);
 
@@ -4007,7 +4001,7 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	bool		txn_is_streamed;
 
 	/* We can never reach here for a subtransaction. */
-	Assert(txn->toptxn == NULL);
+	Assert(isa_toptxn(txn));
 
 	/*
 	 * We can't make any assumptions about base snapshot here, similar to what
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 00a2d73..abfeea6 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -694,8 +694,8 @@ maybe_send_schema(LogicalDecodingContext *ctx,
 	if (in_streaming)
 		xid = change->txn->xid;
 
-	if (change->txn->toptxn)
-		topxid = change->txn->toptxn->xid;
+	if (isa_subtxn(change->txn))
+		topxid = get_toptxn(change->txn)->xid;
 	else
 		topxid = xid;
 
@@ -1879,7 +1879,7 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
 	Assert(!in_streaming);
 
 	/* determine the toplevel transaction */
-	toptxn = (txn->toptxn) ? txn->toptxn : txn;
+	toptxn = get_toptxn(txn);
 
 	Assert(rbtxn_is_streamed(toptxn));
 
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 215d149..f7234ef 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -296,6 +296,9 @@ typedef struct ReorderBufferTXN
 	XLogRecPtr	end_lsn;
 
 	/* Toplevel transaction for this subxact (NULL for top-level). */
+#define isa_toptxn(rbtxn) (rbtxn->toptxn == NULL)
+#define isa_subtxn(rbtxn) (rbtxn->toptxn != NULL)
+#define get_toptxn(rbtxn) (isa_subtxn(rbtxn) ? rbtxn->toptxn : rbtxn)
 	struct ReorderBufferTXN *toptxn;
 
 	/*
-- 
1.8.3.1

