From f8239516407569e1e4b4c96507975f02dd9400ce Mon Sep 17 00:00:00 2001
From: Amit Kapila <akapila@postgresql.org>
Date: Fri, 5 Jun 2020 09:03:16 +0530
Subject: [PATCH v27] Immediately WAL-log subtransaction and top-level XID
 association.

The logical decoding infrastructure needs to know which top-level
transaction the subxact belongs to, in order to decode all the
changes. Until now that might be delayed until commit, due to the
caching (GPROC_MAX_CACHED_SUBXIDS), preventing features requiring
incremental decoding.

So we also write the assignment info into WAL immediately, as part
of the next WAL record (to minimize overhead).  However, we can not
remove the existing XLOG_XACT_ASSIGNMENT WAL as that is required
for avoiding overflow in the hot standby snapshot.

Author: Tomas Vondra, Dilip Kumar, Amit Kapila
Reviewed-by: Amit Kapila
Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com
---
 src/backend/access/transam/xact.c        | 50 ++++++++++++++++++++++++++++++++
 src/backend/access/transam/xloginsert.c  | 23 +++++++++++++--
 src/backend/access/transam/xlogreader.c  |  5 ++++
 src/backend/replication/logical/decode.c | 44 ++++++++++++++--------------
 src/include/access/xact.h                |  3 ++
 src/include/access/xlog.h                |  1 +
 src/include/access/xlogreader.h          |  3 ++
 src/include/access/xlogrecord.h          |  1 +
 8 files changed, 107 insertions(+), 23 deletions(-)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index cd30b62..04fd5ca 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -191,6 +191,7 @@ typedef struct TransactionStateData
 	bool		didLogXid;		/* has xid been included in WAL record? */
 	int			parallelModeLevel;	/* Enter/ExitParallelMode counter */
 	bool		chain;			/* start a new block after this one */
+	bool		assigned;		/* assigned to top-level XID */
 	struct TransactionStateData *parent;	/* back link to parent */
 } TransactionStateData;
 
@@ -223,6 +224,7 @@ typedef struct SerializedTransactionState
 static TransactionStateData TopTransactionStateData = {
 	.state = TRANS_DEFAULT,
 	.blockState = TBLOCK_DEFAULT,
+	.assigned = false,
 };
 
 /*
@@ -5118,6 +5120,7 @@ PushTransaction(void)
 	GetUserIdAndSecContext(&s->prevUser, &s->prevSecContext);
 	s->prevXactReadOnly = XactReadOnly;
 	s->parallelModeLevel = 0;
+	s->assigned = false;
 
 	CurrentTransactionState = s;
 
@@ -6020,3 +6023,50 @@ xact_redo(XLogReaderState *record)
 	else
 		elog(PANIC, "xact_redo: unknown op code %u", info);
 }
+
+/*
+ * IsSubTransactionAssignmentPending
+ *
+ * This is used to decide whether we need to WAL log the top-level XID for
+ * operation in a subtransaction.  We require that for logical decoding, see
+ * LogicalDecodingProcessRecord.
+ *
+ * This returns true if wal_level >= logical and we are inside a valid
+ * subtransaction, for which the assignment was not yet written to any WAL
+ * record.
+ */
+bool
+IsSubTransactionAssignmentPending(void)
+{
+	/* wal_level has to be logical */
+	if (!XLogLogicalInfoActive())
+		return false;
+
+	/* we need to be in a transaction state */
+	if (!IsTransactionState())
+		return false;
+
+	/* it has to be a subtransaction */
+	if (!IsSubTransaction())
+		return false;
+
+	/* the subtransaction has to have a XID assigned */
+	if (!TransactionIdIsValid(GetCurrentTransactionIdIfAny()))
+		return false;
+
+	/* and it should not be already 'assigned' */
+	return !CurrentTransactionState->assigned;
+}
+
+/*
+ * MarkSubTransactionAssigned
+ *
+ * Mark the subtransaction assignment as completed.
+ */
+void
+MarkSubTransactionAssigned(void)
+{
+	Assert(IsSubTransactionAssignmentPending());
+
+	CurrentTransactionState->assigned = true;
+}
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index b21679f..c526bb1 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -89,11 +89,13 @@ static XLogRecData hdr_rdt;
 static char *hdr_scratch = NULL;
 
 #define SizeOfXlogOrigin	(sizeof(RepOriginId) + sizeof(char))
+#define SizeOfXLogTransactionId	(sizeof(TransactionId) + sizeof(char))
 
 #define HEADER_SCRATCH_SIZE \
 	(SizeOfXLogRecord + \
 	 MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \
-	 SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin)
+	 SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin + \
+	 SizeOfXLogTransactionId)
 
 /*
  * An array of XLogRecData structs, to hold registered data.
@@ -195,6 +197,10 @@ XLogResetInsertion(void)
 {
 	int			i;
 
+	/* reset the subxact assignment flag (if needed) */
+	if (curinsert_flags & XLOG_INCLUDE_XID)
+		MarkSubTransactionAssigned();
+
 	for (i = 0; i < max_registered_block_id; i++)
 		registered_buffers[i].in_use = false;
 
@@ -398,7 +404,7 @@ void
 XLogSetRecordFlags(uint8 flags)
 {
 	Assert(begininsert_called);
-	curinsert_flags = flags;
+	curinsert_flags |= flags;
 }
 
 /*
@@ -748,6 +754,19 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 		scratch += sizeof(replorigin_session_origin);
 	}
 
+	/* followed by toplevel XID, if not already included in previous record */
+	if (IsSubTransactionAssignmentPending())
+	{
+		TransactionId xid = GetTopTransactionIdIfAny();
+
+		/* update the flag (later used by XLogResetInsertion) */
+		XLogSetRecordFlags(XLOG_INCLUDE_XID);
+
+		*(scratch++) = (char) XLR_BLOCK_ID_TOPLEVEL_XID;
+		memcpy(scratch, &xid, sizeof(TransactionId));
+		scratch += sizeof(TransactionId);
+	}
+
 	/* followed by main data, if any */
 	if (mainrdata_len > 0)
 	{
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 5995798..560ec27 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -1195,6 +1195,7 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
 
 	state->decoded_record = record;
 	state->record_origin = InvalidRepOriginId;
+	state->toplevel_xid = InvalidTransactionId;
 
 	ptr = (char *) record;
 	ptr += SizeOfXLogRecord;
@@ -1233,6 +1234,10 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
 		{
 			COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId));
 		}
+		else if (block_id == XLR_BLOCK_ID_TOPLEVEL_XID)
+		{
+			COPY_HEADER_FIELD(&state->toplevel_xid, sizeof(TransactionId));
+		}
 		else if (block_id <= XLR_MAX_BLOCK_ID)
 		{
 			/* XLogRecordBlockHeader */
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index c2e5e3a..0c0c371 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -94,11 +94,27 @@ void
 LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
 {
 	XLogRecordBuffer buf;
+	TransactionId txid;
 
 	buf.origptr = ctx->reader->ReadRecPtr;
 	buf.endptr = ctx->reader->EndRecPtr;
 	buf.record = record;
 
+	txid = XLogRecGetTopXid(record);
+
+	/*
+	 * If the top-level xid is valid, we need to assign the subxact to the
+	 * top-level xact. We need to do this for all records, hence we do it
+	 * before the switch.
+	 */
+	if (TransactionIdIsValid(txid))
+	{
+		ReorderBufferAssignChild(ctx->reorder,
+								 txid,
+								 record->decoded_record->xl_xid,
+								 buf.origptr);
+	}
+
 	/* cast so we get a warning when new rmgrs are added */
 	switch ((RmgrId) XLogRecGetRmid(record))
 	{
@@ -216,13 +232,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	/*
 	 * If the snapshot isn't yet fully built, we cannot decode anything, so
 	 * bail out.
-	 *
-	 * However, it's critical to process XLOG_XACT_ASSIGNMENT records even
-	 * when the snapshot is being built: it is possible to get later records
-	 * that require subxids to be properly assigned.
 	 */
-	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT &&
-		info != XLOG_XACT_ASSIGNMENT)
+	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
 		return;
 
 	switch (info)
@@ -264,22 +275,13 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				break;
 			}
 		case XLOG_XACT_ASSIGNMENT:
-			{
-				xl_xact_assignment *xlrec;
-				int			i;
-				TransactionId *sub_xid;
 
-				xlrec = (xl_xact_assignment *) XLogRecGetData(r);
-
-				sub_xid = &xlrec->xsub[0];
-
-				for (i = 0; i < xlrec->nsubxacts; i++)
-				{
-					ReorderBufferAssignChild(reorder, xlrec->xtop,
-											 *(sub_xid++), buf->origptr);
-				}
-				break;
-			}
+			/*
+			 * We assign subxact to the toplevel xact while processing each
+			 * record if required.  So, we don't need to do anything here.
+			 * See LogicalDecodingProcessRecord.
+			 */
+			break;
 		case XLOG_XACT_PREPARE:
 
 			/*
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 7ee04ba..8645b38 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -428,6 +428,9 @@ extern void UnregisterXactCallback(XactCallback callback, void *arg);
 extern void RegisterSubXactCallback(SubXactCallback callback, void *arg);
 extern void UnregisterSubXactCallback(SubXactCallback callback, void *arg);
 
+extern bool IsSubTransactionAssignmentPending(void);
+extern void MarkSubTransactionAssigned(void);
+
 extern int	xactGetCommittedChildren(TransactionId **ptr);
 
 extern XLogRecPtr XactLogCommitRecord(TimestampTz commit_time,
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index e917dfe..05cc2b6 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -237,6 +237,7 @@ extern bool XLOG_DEBUG;
  */
 #define XLOG_INCLUDE_ORIGIN		0x01	/* include the replication origin */
 #define XLOG_MARK_UNIMPORTANT	0x02	/* record not important for durability */
+#define XLOG_INCLUDE_XID		0x04	/* include XID of top-level xact */
 
 
 /* Checkpoint statistics */
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index d930fe9..24a4c44 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -191,6 +191,8 @@ struct XLogReaderState
 
 	RepOriginId record_origin;
 
+	TransactionId toplevel_xid; /* XID of top-level transaction */
+
 	/* information about blocks referenced by the record. */
 	DecodedBkpBlock blocks[XLR_MAX_BLOCK_ID + 1];
 
@@ -308,6 +310,7 @@ extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record,
 #define XLogRecGetRmid(decoder) ((decoder)->decoded_record->xl_rmid)
 #define XLogRecGetXid(decoder) ((decoder)->decoded_record->xl_xid)
 #define XLogRecGetOrigin(decoder) ((decoder)->record_origin)
+#define XLogRecGetTopXid(decoder) ((decoder)->toplevel_xid)
 #define XLogRecGetData(decoder) ((decoder)->main_data)
 #define XLogRecGetDataLen(decoder) ((decoder)->main_data_len)
 #define XLogRecHasAnyBlockRefs(decoder) ((decoder)->max_block_id >= 0)
diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h
index acd9af0..2f0c8bf 100644
--- a/src/include/access/xlogrecord.h
+++ b/src/include/access/xlogrecord.h
@@ -223,5 +223,6 @@ typedef struct XLogRecordDataHeaderLong
 #define XLR_BLOCK_ID_DATA_SHORT		255
 #define XLR_BLOCK_ID_DATA_LONG		254
 #define XLR_BLOCK_ID_ORIGIN			253
+#define XLR_BLOCK_ID_TOPLEVEL_XID	252
 
 #endif							/* XLOGRECORD_H */
-- 
1.8.3.1

