From 5755bbd0e0d0af2d6668cd10faf7ef6a874f3bc4 Mon Sep 17 00:00:00 2001
From: dilipkumar <dilipbalaut@gmail.com>
Date: Wed, 30 Sep 2020 12:13:31 +0530
Subject: [PATCH v3] Collect command invalidation in form of changes

Currently, we are accumulating all the invalidations under the
toplevel transactions so that we can execute them all once after
decoding is done to avoid any cache pollution.  In this patch we
are also collecting command level invalidation as a changes so
that during decoding we can execute the invalidation specific to
that command.
---
 .../replication/logical/reorderbuffer.c       | 103 ++++++++++++++----
 src/include/replication/reorderbuffer.h       |  13 ++-
 2 files changed, 93 insertions(+), 23 deletions(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 1975d629a6..13bcdaeb45 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -235,7 +235,7 @@ static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
 static ReorderBufferChange *ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state);
 static void ReorderBufferIterTXNFinish(ReorderBuffer *rb,
 									   ReorderBufferIterTXNState *state);
-static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn);
+static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs);
 
 /*
  * ---------------------------------------
@@ -482,6 +482,11 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
 				pfree(change->data.msg.message);
 			change->data.msg.message = NULL;
 			break;
+		case REORDER_BUFFER_CHANGE_INVALIDATION:
+			if (change->data.inval.invalidations)
+				pfree(change->data.inval.invalidations);
+			change->data.inval.invalidations = NULL;
+			break;
 		case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
 			if (change->data.snapshot)
 			{
@@ -2233,17 +2238,17 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 
 						TeardownHistoricSnapshot(false);
 						SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
-
-						/*
-						 * Every time the CommandId is incremented, we could
-						 * see new catalog contents, so execute all
-						 * invalidations.
-						 */
-						ReorderBufferExecuteInvalidations(rb, txn);
 					}
 
 					break;
 
+				case REORDER_BUFFER_CHANGE_INVALIDATION:
+					/* Execute the invalidation messages locally */
+					ReorderBufferExecuteInvalidations(
+							change->data.inval.ninvalidations,
+							change->data.inval.invalidations);
+					break;
+
 				case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
 					elog(ERROR, "tuplecid value in changequeue");
 					break;
@@ -2306,7 +2311,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		AbortCurrentTransaction();
 
 		/* make sure there's no cache pollution */
-		ReorderBufferExecuteInvalidations(rb, txn);
+		ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations);
 
 		if (using_subtxn)
 			RollbackAndReleaseCurrentSubTransaction();
@@ -2345,7 +2350,8 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		AbortCurrentTransaction();
 
 		/* make sure there's no cache pollution */
-		ReorderBufferExecuteInvalidations(rb, txn);
+		ReorderBufferExecuteInvalidations(txn->ninvalidations,
+										  txn->invalidations);
 
 		if (using_subtxn)
 			RollbackAndReleaseCurrentSubTransaction();
@@ -2802,10 +2808,13 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
  * Setup the invalidation of the toplevel transaction.
  *
  * This needs to be called for each XLOG_XACT_INVALIDATIONS message and
- * accumulates all the invalidation messages in the toplevel transaction.
- * This is required because in some cases where we skip processing the
- * transaction (see ReorderBufferForget), we need to execute all the
- * invalidations together.
+ * accumulates all the invalidation messages in the toplevel transaction as
+ * well as in the form of change in reorder buffer.  We require to record it in
+ * form of the change so that we can execute only the required invalidations
+ * instead of executing all the invalidations on each CommandId increment.  We
+ * also need to accumulate these in the toplevel transaction because in some
+ * cases we skip processing the transaction (see ReorderBufferForget), we need
+ * to execute all the invalidations together.
  */
 void
 ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
@@ -2813,12 +2822,16 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
 							  SharedInvalidationMessage *msgs)
 {
 	ReorderBufferTXN *txn;
+	MemoryContext oldcontext;
+	ReorderBufferChange *change;
 
 	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
 
+	oldcontext = MemoryContextSwitchTo(rb->context);
+
 	/*
-	 * We collect all the invalidations under the top transaction so that we
-	 * can execute them all together.
+	 * Collect all the invalidations under the top transaction so that we can
+	 * execute them all together.  See comment atop this function
 	 */
 	if (txn->toptxn)
 		txn = txn->toptxn;
@@ -2830,8 +2843,7 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
 	{
 		txn->ninvalidations = nmsgs;
 		txn->invalidations = (SharedInvalidationMessage *)
-			MemoryContextAlloc(rb->context,
-							   sizeof(SharedInvalidationMessage) * nmsgs);
+			palloc(sizeof(SharedInvalidationMessage) * nmsgs);
 		memcpy(txn->invalidations, msgs,
 			   sizeof(SharedInvalidationMessage) * nmsgs);
 	}
@@ -2845,6 +2857,18 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
 			   nmsgs * sizeof(SharedInvalidationMessage));
 		txn->ninvalidations += nmsgs;
 	}
+
+	change = ReorderBufferGetChange(rb);
+	change->action = REORDER_BUFFER_CHANGE_INVALIDATION;
+	change->data.inval.ninvalidations = nmsgs;
+	change->data.inval.invalidations = (SharedInvalidationMessage *)
+		palloc(rb->context, sizeof(SharedInvalidationMessage) * nmsgs);
+	memcpy(change->data.inval.invalidations, msgs,
+		   sizeof(SharedInvalidationMessage) * nmsgs);
+
+	ReorderBufferQueueChange(rb, xid, lsn, change, false);
+
+	MemoryContextSwitchTo(oldcontext);
 }
 
 /*
@@ -2852,12 +2876,12 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
  * in the changestream but we don't know which those are.
  */
 static void
-ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
+ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
 {
 	int			i;
 
-	for (i = 0; i < txn->ninvalidations; i++)
-		LocalExecuteInvalidationMessage(&txn->invalidations[i]);
+	for (i = 0; i < nmsgs; i++)
+		LocalExecuteInvalidationMessage(&msgs[i]);
 }
 
 /*
@@ -3279,6 +3303,24 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 					   change->data.msg.message_size);
 				data += change->data.msg.message_size;
 
+				break;
+			}
+		case REORDER_BUFFER_CHANGE_INVALIDATION:
+			{
+				char	   *data;
+				Size		inval_size = sizeof(SharedInvalidationMessage) *
+										change->data.inval.ninvalidations;
+
+				sz += inval_size;
+
+				ReorderBufferSerializeReserve(rb, sz);
+				data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
+
+				/* might have been reallocated above */
+				ondisk = (ReorderBufferDiskChange *) rb->outbuf;
+				memcpy(data, change->data.inval.invalidations, inval_size);
+				data += inval_size;
+
 				break;
 			}
 		case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
@@ -3556,6 +3598,11 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
 
 				break;
 			}
+		case REORDER_BUFFER_CHANGE_INVALIDATION:
+			sz += sizeof(SharedInvalidationMessage) *
+					change->data.inval.ninvalidations;
+			break;
+
 		case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
 			{
 				Snapshot	snap;
@@ -3822,6 +3869,20 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 					   change->data.msg.message_size);
 				data += change->data.msg.message_size;
 
+				break;
+			}
+		case REORDER_BUFFER_CHANGE_INVALIDATION:
+			{
+				Size	inval_size = sizeof(SharedInvalidationMessage) *
+									change->data.inval.ninvalidations;
+
+				change->data.inval.invalidations =
+						MemoryContextAlloc(rb->context, inval_size);
+
+				/* read the message */
+				memcpy(change->data.inval.invalidations, data, inval_size);
+				data += inval_size;
+
 				break;
 			}
 		case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 1ae17d5f11..248900070d 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -57,6 +57,7 @@ enum ReorderBufferChangeType
 	REORDER_BUFFER_CHANGE_UPDATE,
 	REORDER_BUFFER_CHANGE_DELETE,
 	REORDER_BUFFER_CHANGE_MESSAGE,
+	REORDER_BUFFER_CHANGE_INVALIDATION,
 	REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT,
 	REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID,
 	REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID,
@@ -149,6 +150,14 @@ typedef struct ReorderBufferChange
 			CommandId	cmax;
 			CommandId	combocid;
 		}			tuplecid;
+
+		/* Invalidation. */
+		struct
+		{
+			uint32		ninvalidations;		/* Number of messages */
+			SharedInvalidationMessage *invalidations;	/* invalidation
+														 * message */
+		}			inval;
 	}			data;
 
 	/*
@@ -306,8 +315,8 @@ typedef struct ReorderBufferTXN
 	uint64		nentries_mem;
 
 	/*
-	 * List of ReorderBufferChange structs, including new Snapshots and new
-	 * CommandIds
+	 * List of ReorderBufferChange structs, including new Snapshots, new
+	 * CommandIds and command invalidation messages.
 	 */
 	dlist_head	changes;
 
-- 
2.23.0

