From c3b5b555dfaef57162e3752d693f874d2c241387 Mon Sep 17 00:00:00 2001
From: Greg Nancarrow <gregn4422@gmail.com>
Date: Thu, 8 Jul 2021 10:21:39 +1000
Subject: [PATCH v17 5/5] Substantially improve performance of
 pgoutput_row_filter().

Repeated tuple table slot creation in pgoutput_row_filter() results in degraded
performance and large memory usage. This is greatly improved by caching the row
filtering tuple table slot in the relation sync cache.
---
 src/backend/replication/pgoutput/pgoutput.c | 25 +++++++++++++++------
 1 file changed, 18 insertions(+), 7 deletions(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 86aa012505..1b0fc64392 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -113,6 +113,7 @@ typedef struct RelationSyncEntry
 	PublicationActions pubactions;
 	List	   *qual;
 	List	   *exprstate_list;
+	TupleTableSlot *scantuple;
 
 	/*
 	 * OID of the relation to publish changes as.  For a partition, this may
@@ -643,10 +644,8 @@ pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
 static bool
 pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, RelationSyncEntry *entry)
 {
-	TupleDesc	tupdesc;
 	EState	   *estate;
 	ExprContext *ecxt;
-	MemoryContext oldcxt;
 	ListCell   *lc;
 	bool		result = true;
 //#define RF_TIMES
@@ -667,17 +666,13 @@ pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, R
 		 get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
 		 get_rel_name(relation->rd_id));
 
-	tupdesc = RelationGetDescr(relation);
-
 	PushActiveSnapshot(GetTransactionSnapshot());
 
 	estate = create_estate_for_relation(relation);
 
 	/* Prepare context per tuple */
 	ecxt = GetPerTupleExprContext(estate);
-	oldcxt = MemoryContextSwitchTo(estate->es_query_cxt);
-	ecxt->ecxt_scantuple = ExecInitExtraTupleSlot(estate, tupdesc, &TTSOpsHeapTuple);
-	MemoryContextSwitchTo(oldcxt);
+	ecxt->ecxt_scantuple = entry->scantuple;
 
 	ExecStoreHeapTuple(newtuple ? newtuple : oldtuple, ecxt->ecxt_scantuple, false);
 
@@ -1279,15 +1274,31 @@ get_rel_sync_entry(PGOutputData *data, Relation rel)
 		entry->publish_as_relid = InvalidOid;
 		entry->map = NULL;		/* will be set by maybe_send_schema() if
 								 * needed */
+
+		entry->scantuple = NULL;
 	}
 
 	/* Validate the entry */
 	if (!entry->replicate_valid)
 	{
+		TupleDesc	tupdesc;
 		List	   *pubids = GetRelationPublications(relid);
 		ListCell   *lc;
 		Oid			publish_as_relid = relid;
 
+		/* Release any existing tuple table slot */
+		if (entry->scantuple)
+		{
+			ExecDropSingleTupleTableSlot(entry->scantuple);
+			entry->scantuple = NULL;
+		}
+
+		/* Create a tuple table slot for use in row filtering */
+		oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+		tupdesc = CreateTupleDescCopy(RelationGetDescr(rel));
+		entry->scantuple = MakeSingleTupleTableSlot(tupdesc, &TTSOpsHeapTuple);
+		MemoryContextSwitchTo(oldctx);
+
 		/* Reload publications if needed before use. */
 		if (!publications_valid)
 		{
-- 
2.27.0

