From b6df6bb72e7e7daafb364f338a7044a2f22c79cc Mon Sep 17 00:00:00 2001
From: Amit Kapila <akapila@postgresql.org>
Date: Fri, 5 Jun 2020 09:03:16 +0530
Subject: [PATCH v31 1/4] Immediately WAL-log subtransaction and top-level XID
 association.

The logical decoding infrastructure needs to know which top-level
transaction the subxact belongs to, in order to decode all the
changes. Until now that might be delayed until commit, due to the
caching (GPROC_MAX_CACHED_SUBXIDS), preventing features requiring
incremental decoding.

So we also write the assignment info into WAL immediately, as part
of the next WAL record (to minimize overhead) only when wal_level=logical.
We can not remove the existing XLOG_XACT_ASSIGNMENT WAL as that is
required for avoiding overflow in the hot standby snapshot.

Author: Tomas Vondra, Dilip Kumar, Amit Kapila
Reviewed-by: Amit Kapila
Tested-by: Neha Sharma and Mahendra Singh Thalor
Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com
---
 src/backend/access/transam/xact.c        | 50 ++++++++++++++++++++++++++++++++
 src/backend/access/transam/xloginsert.c  | 23 +++++++++++++--
 src/backend/access/transam/xlogreader.c  |  5 ++++
 src/backend/replication/logical/decode.c | 44 ++++++++++++++--------------
 src/include/access/xact.h                |  3 ++
 src/include/access/xlog.h                |  1 +
 src/include/access/xlogreader.h          |  3 ++
 src/include/access/xlogrecord.h          |  1 +
 8 files changed, 107 insertions(+), 23 deletions(-)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 905dc7d..a93fb8a 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -191,6 +191,7 @@ typedef struct TransactionStateData
 	bool		didLogXid;		/* has xid been included in WAL record? */
 	int			parallelModeLevel;	/* Enter/ExitParallelMode counter */
 	bool		chain;			/* start a new block after this one */
+	bool		assigned;		/* assigned to top-level XID */
 	struct TransactionStateData *parent;	/* back link to parent */
 } TransactionStateData;
 
@@ -223,6 +224,7 @@ typedef struct SerializedTransactionState
 static TransactionStateData TopTransactionStateData = {
 	.state = TRANS_DEFAULT,
 	.blockState = TBLOCK_DEFAULT,
+	.assigned = false,
 };
 
 /*
@@ -5120,6 +5122,7 @@ PushTransaction(void)
 	GetUserIdAndSecContext(&s->prevUser, &s->prevSecContext);
 	s->prevXactReadOnly = XactReadOnly;
 	s->parallelModeLevel = 0;
+	s->assigned = false;
 
 	CurrentTransactionState = s;
 
@@ -6022,3 +6025,50 @@ xact_redo(XLogReaderState *record)
 	else
 		elog(PANIC, "xact_redo: unknown op code %u", info);
 }
+
+/*
+ * IsSubTransactionAssignmentPending
+ *
+ * This is used to decide whether we need to WAL log the top-level XID for
+ * operation in a subtransaction.  We require that for logical decoding, see
+ * LogicalDecodingProcessRecord.
+ *
+ * This returns true if wal_level >= logical and we are inside a valid
+ * subtransaction, for which the assignment was not yet written to any WAL
+ * record.
+ */
+bool
+IsSubTransactionAssignmentPending(void)
+{
+	/* wal_level has to be logical */
+	if (!XLogLogicalInfoActive())
+		return false;
+
+	/* we need to be in a transaction state */
+	if (!IsTransactionState())
+		return false;
+
+	/* it has to be a subtransaction */
+	if (!IsSubTransaction())
+		return false;
+
+	/* the subtransaction has to have a XID assigned */
+	if (!TransactionIdIsValid(GetCurrentTransactionIdIfAny()))
+		return false;
+
+	/* and it should not be already 'assigned' */
+	return !CurrentTransactionState->assigned;
+}
+
+/*
+ * MarkSubTransactionAssigned
+ *
+ * Mark the subtransaction assignment as completed.
+ */
+void
+MarkSubTransactionAssigned(void)
+{
+	Assert(IsSubTransactionAssignmentPending());
+
+	CurrentTransactionState->assigned = true;
+}
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index b21679f..c526bb1 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -89,11 +89,13 @@ static XLogRecData hdr_rdt;
 static char *hdr_scratch = NULL;
 
 #define SizeOfXlogOrigin	(sizeof(RepOriginId) + sizeof(char))
+#define SizeOfXLogTransactionId	(sizeof(TransactionId) + sizeof(char))
 
 #define HEADER_SCRATCH_SIZE \
 	(SizeOfXLogRecord + \
 	 MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \
-	 SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin)
+	 SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin + \
+	 SizeOfXLogTransactionId)
 
 /*
  * An array of XLogRecData structs, to hold registered data.
@@ -195,6 +197,10 @@ XLogResetInsertion(void)
 {
 	int			i;
 
+	/* reset the subxact assignment flag (if needed) */
+	if (curinsert_flags & XLOG_INCLUDE_XID)
+		MarkSubTransactionAssigned();
+
 	for (i = 0; i < max_registered_block_id; i++)
 		registered_buffers[i].in_use = false;
 
@@ -398,7 +404,7 @@ void
 XLogSetRecordFlags(uint8 flags)
 {
 	Assert(begininsert_called);
-	curinsert_flags = flags;
+	curinsert_flags |= flags;
 }
 
 /*
@@ -748,6 +754,19 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 		scratch += sizeof(replorigin_session_origin);
 	}
 
+	/* followed by toplevel XID, if not already included in previous record */
+	if (IsSubTransactionAssignmentPending())
+	{
+		TransactionId xid = GetTopTransactionIdIfAny();
+
+		/* update the flag (later used by XLogResetInsertion) */
+		XLogSetRecordFlags(XLOG_INCLUDE_XID);
+
+		*(scratch++) = (char) XLR_BLOCK_ID_TOPLEVEL_XID;
+		memcpy(scratch, &xid, sizeof(TransactionId));
+		scratch += sizeof(TransactionId);
+	}
+
 	/* followed by main data, if any */
 	if (mainrdata_len > 0)
 	{
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index cb76be4..a757bac 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -1197,6 +1197,7 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
 
 	state->decoded_record = record;
 	state->record_origin = InvalidRepOriginId;
+	state->toplevel_xid = InvalidTransactionId;
 
 	ptr = (char *) record;
 	ptr += SizeOfXLogRecord;
@@ -1235,6 +1236,10 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
 		{
 			COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId));
 		}
+		else if (block_id == XLR_BLOCK_ID_TOPLEVEL_XID)
+		{
+			COPY_HEADER_FIELD(&state->toplevel_xid, sizeof(TransactionId));
+		}
 		else if (block_id <= XLR_MAX_BLOCK_ID)
 		{
 			/* XLogRecordBlockHeader */
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index c2e5e3a..0c0c371 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -94,11 +94,27 @@ void
 LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
 {
 	XLogRecordBuffer buf;
+	TransactionId txid;
 
 	buf.origptr = ctx->reader->ReadRecPtr;
 	buf.endptr = ctx->reader->EndRecPtr;
 	buf.record = record;
 
+	txid = XLogRecGetTopXid(record);
+
+	/*
+	 * If the top-level xid is valid, we need to assign the subxact to the
+	 * top-level xact. We need to do this for all records, hence we do it
+	 * before the switch.
+	 */
+	if (TransactionIdIsValid(txid))
+	{
+		ReorderBufferAssignChild(ctx->reorder,
+								 txid,
+								 record->decoded_record->xl_xid,
+								 buf.origptr);
+	}
+
 	/* cast so we get a warning when new rmgrs are added */
 	switch ((RmgrId) XLogRecGetRmid(record))
 	{
@@ -216,13 +232,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	/*
 	 * If the snapshot isn't yet fully built, we cannot decode anything, so
 	 * bail out.
-	 *
-	 * However, it's critical to process XLOG_XACT_ASSIGNMENT records even
-	 * when the snapshot is being built: it is possible to get later records
-	 * that require subxids to be properly assigned.
 	 */
-	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT &&
-		info != XLOG_XACT_ASSIGNMENT)
+	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
 		return;
 
 	switch (info)
@@ -264,22 +275,13 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				break;
 			}
 		case XLOG_XACT_ASSIGNMENT:
-			{
-				xl_xact_assignment *xlrec;
-				int			i;
-				TransactionId *sub_xid;
 
-				xlrec = (xl_xact_assignment *) XLogRecGetData(r);
-
-				sub_xid = &xlrec->xsub[0];
-
-				for (i = 0; i < xlrec->nsubxacts; i++)
-				{
-					ReorderBufferAssignChild(reorder, xlrec->xtop,
-											 *(sub_xid++), buf->origptr);
-				}
-				break;
-			}
+			/*
+			 * We assign subxact to the toplevel xact while processing each
+			 * record if required.  So, we don't need to do anything here.
+			 * See LogicalDecodingProcessRecord.
+			 */
+			break;
 		case XLOG_XACT_PREPARE:
 
 			/*
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index db19187..aef8555 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -428,6 +428,9 @@ extern void UnregisterXactCallback(XactCallback callback, void *arg);
 extern void RegisterSubXactCallback(SubXactCallback callback, void *arg);
 extern void UnregisterSubXactCallback(SubXactCallback callback, void *arg);
 
+extern bool IsSubTransactionAssignmentPending(void);
+extern void MarkSubTransactionAssigned(void);
+
 extern int	xactGetCommittedChildren(TransactionId **ptr);
 
 extern XLogRecPtr XactLogCommitRecord(TimestampTz commit_time,
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 77ac4e7..8058eef 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -237,6 +237,7 @@ extern bool XLOG_DEBUG;
  */
 #define XLOG_INCLUDE_ORIGIN		0x01	/* include the replication origin */
 #define XLOG_MARK_UNIMPORTANT	0x02	/* record not important for durability */
+#define XLOG_INCLUDE_XID		0x04	/* include XID of top-level xact */
 
 
 /* Checkpoint statistics */
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index b0f2a6e..b976882 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -191,6 +191,8 @@ struct XLogReaderState
 
 	RepOriginId record_origin;
 
+	TransactionId toplevel_xid; /* XID of top-level transaction */
+
 	/* information about blocks referenced by the record. */
 	DecodedBkpBlock blocks[XLR_MAX_BLOCK_ID + 1];
 
@@ -304,6 +306,7 @@ extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record,
 #define XLogRecGetRmid(decoder) ((decoder)->decoded_record->xl_rmid)
 #define XLogRecGetXid(decoder) ((decoder)->decoded_record->xl_xid)
 #define XLogRecGetOrigin(decoder) ((decoder)->record_origin)
+#define XLogRecGetTopXid(decoder) ((decoder)->toplevel_xid)
 #define XLogRecGetData(decoder) ((decoder)->main_data)
 #define XLogRecGetDataLen(decoder) ((decoder)->main_data_len)
 #define XLogRecHasAnyBlockRefs(decoder) ((decoder)->max_block_id >= 0)
diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h
index acd9af0..2f0c8bf 100644
--- a/src/include/access/xlogrecord.h
+++ b/src/include/access/xlogrecord.h
@@ -223,5 +223,6 @@ typedef struct XLogRecordDataHeaderLong
 #define XLR_BLOCK_ID_DATA_SHORT		255
 #define XLR_BLOCK_ID_DATA_LONG		254
 #define XLR_BLOCK_ID_ORIGIN			253
+#define XLR_BLOCK_ID_TOPLEVEL_XID	252
 
 #endif							/* XLOGRECORD_H */
-- 
1.8.3.1

