From 2e4192717429c9675c675eebef00cbee93dd363f Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Mon, 17 Mar 2025 11:25:49 +0900
Subject: [PATCH v19_REL_13 2/2] Backpatch introducing invalidation messages in
 ReorderBufferChangeType

---
 .../replication/logical/reorderbuffer.c       | 72 +++++++++++++++++--
 src/include/replication/reorderbuffer.h       | 16 ++++-
 2 files changed, 80 insertions(+), 8 deletions(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index fa9413fa2a0..697b45675a6 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -220,7 +220,7 @@ static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
 static ReorderBufferChange *ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state);
 static void ReorderBufferIterTXNFinish(ReorderBuffer *rb,
 									   ReorderBufferIterTXNState *state);
-static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn);
+static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs);
 
 /*
  * ---------------------------------------
@@ -484,6 +484,11 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
 			break;
+		case REORDER_BUFFER_CHANGE_INVALIDATION:
+			if (change->data.inval.invalidations)
+				pfree(change->data.inval.invalidations);
+			change->data.inval.invalidations = NULL;
+			break;
 	}
 
 	pfree(change);
@@ -1883,7 +1888,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 						 * see new catalog contents, so execute all
 						 * invalidations.
 						 */
-						ReorderBufferExecuteInvalidations(rb, txn);
+						ReorderBufferExecuteInvalidations(txn->ninvalidations,
+														  txn->invalidations);
 					}
 
 					break;
@@ -1891,6 +1897,10 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 				case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
 					elog(ERROR, "tuplecid value in changequeue");
 					break;
+				case REORDER_BUFFER_CHANGE_INVALIDATION:
+					/* Execute the invalidation messages locally */
+					ReorderBufferExecuteInvalidations(change->data.inval.ninvalidations,
+													  change->data.inval.invalidations);
 			}
 		}
 
@@ -1921,7 +1931,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 		AbortCurrentTransaction();
 
 		/* make sure there's no cache pollution */
-		ReorderBufferExecuteInvalidations(rb, txn);
+		ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations);
 
 		if (using_subtxn)
 			RollbackAndReleaseCurrentSubTransaction();
@@ -1947,7 +1957,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 		AbortCurrentTransaction();
 
 		/* make sure there's no cache pollution */
-		ReorderBufferExecuteInvalidations(rb, txn);
+		ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations);
 
 		if (using_subtxn)
 			RollbackAndReleaseCurrentSubTransaction();
@@ -2265,6 +2275,7 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
 {
 	ReorderBufferTXN *txn;
 	MemoryContext oldcontext;
+	ReorderBufferChange *change;
 
 	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
 
@@ -2302,6 +2313,16 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
 		txn->ninvalidations += nmsgs;
 	}
 
+	change = ReorderBufferGetChange(rb);
+	change->action = REORDER_BUFFER_CHANGE_INVALIDATION;
+	change->data.inval.ninvalidations = nmsgs;
+	change->data.inval.invalidations = (SharedInvalidationMessage *)
+		palloc(sizeof(SharedInvalidationMessage) * nmsgs);
+	memcpy(change->data.inval.invalidations, msgs,
+		   sizeof(SharedInvalidationMessage) * nmsgs);
+
+	ReorderBufferQueueChange(rb, xid, lsn, change);
+
 	MemoryContextSwitchTo(oldcontext);
 }
 
@@ -2310,12 +2331,12 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
  * in the changestream but we don't know which those are.
  */
 static void
-ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
+ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
 {
 	int			i;
 
-	for (i = 0; i < txn->ninvalidations; i++)
-		LocalExecuteInvalidationMessage(&txn->invalidations[i]);
+	for (i = 0; i < nmsgs; i++)
+		LocalExecuteInvalidationMessage(&msgs[i]);
 }
 
 /*
@@ -2725,6 +2746,24 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
 			/* ReorderBufferChange contains everything important */
 			break;
+		case REORDER_BUFFER_CHANGE_INVALIDATION:
+		{
+			char	   *data;
+			Size		inval_size = sizeof(SharedInvalidationMessage) *
+				change->data.inval.ninvalidations;
+
+			sz += inval_size;
+
+			ReorderBufferSerializeReserve(rb, sz);
+			data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
+
+			/* might have been reallocated above */
+			ondisk = (ReorderBufferDiskChange *) rb->outbuf;
+			memcpy(data, change->data.inval.invalidations, inval_size);
+			data += inval_size;
+
+			break;
+		}
 	}
 
 	ondisk->size = sz;
@@ -2833,6 +2872,12 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
 			/* ReorderBufferChange contains everything important */
 			break;
+		case REORDER_BUFFER_CHANGE_INVALIDATION:
+		{
+			sz += sizeof(SharedInvalidationMessage) *
+				change->data.inval.ninvalidations;
+			break;
+		}
 	}
 
 	return sz;
@@ -3120,6 +3165,19 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
 			break;
+		case REORDER_BUFFER_CHANGE_INVALIDATION:
+		{
+			Size		inval_size = sizeof(SharedInvalidationMessage) *
+				change->data.inval.ninvalidations;
+
+			change->data.inval.invalidations =
+				MemoryContextAlloc(rb->context, inval_size);
+
+			/* read the message */
+			memcpy(change->data.inval.invalidations, data, inval_size);
+
+			break;
+		}
 	}
 
 	dlist_push_tail(&txn->changes, &change->node);
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 545cee891ed..dff58a2fd8f 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -63,7 +63,8 @@ enum ReorderBufferChangeType
 	REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT,
 	REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM,
 	REORDER_BUFFER_CHANGE_TRUNCATE,
-	REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT
+	REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT,
+	REORDER_BUFFER_CHANGE_INVALIDATION
 };
 
 /* forward declaration */
@@ -150,6 +151,13 @@ typedef struct ReorderBufferChange
 			CommandId	cmax;
 			CommandId	combocid;
 		}			tuplecid;
+
+		/* Invalidation. */
+		struct
+		{
+			uint32		ninvalidations; /* Number of messages */
+			SharedInvalidationMessage *invalidations;	/* invalidation message */
+		}			inval;
 	}			data;
 
 	/*
@@ -467,6 +475,12 @@ uint32		ReorderBufferGetInvalidations(ReorderBuffer *rb,
 										  TransactionId xid,
 										  SharedInvalidationMessage **msgs);
 
+void		ReorderBufferAddInvalidationsForDistribute(ReorderBuffer *,
+													   TransactionId,
+													   XLogRecPtr lsn,
+													   Size nmsgs,
+													   SharedInvalidationMessage *msgs);
+
 void		StartupReorderBuffer(void);
 
 #endif
-- 
2.43.5

