From e9b4114382ee7606d06a5ceae8f85053f0dca32f Mon Sep 17 00:00:00 2001
From: Greg Nancarrow <gregn4422@gmail.com>
Date: Wed, 7 Jul 2021 13:11:09 +1000
Subject: [PATCH v16] Substantially improve performance of
 pgoutput_row_filter().

Repeated tuple table slot creation in pgoutput_row_filter() results in degraded
performance and large memory usage. This is greatly improved by caching the row
filtering tuple table slot in the relation sync cache.
---
 src/backend/replication/pgoutput/pgoutput.c | 19 ++++++++++++-------
 1 file changed, 12 insertions(+), 7 deletions(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 86aa012505..d49fb37e1f 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -113,6 +113,8 @@ typedef struct RelationSyncEntry
 	PublicationActions pubactions;
 	List	   *qual;
 	List	   *exprstate_list;
+	TupleTableSlot *scantuple;
+	List	   *tuple_table;
 
 	/*
 	 * OID of the relation to publish changes as.  For a partition, this may
@@ -643,10 +645,8 @@ pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
 static bool
 pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, RelationSyncEntry *entry)
 {
-	TupleDesc	tupdesc;
 	EState	   *estate;
 	ExprContext *ecxt;
-	MemoryContext oldcxt;
 	ListCell   *lc;
 	bool		result = true;
 //#define RF_TIMES
@@ -667,17 +667,13 @@ pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, R
 		 get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
 		 get_rel_name(relation->rd_id));
 
-	tupdesc = RelationGetDescr(relation);
-
 	PushActiveSnapshot(GetTransactionSnapshot());
 
 	estate = create_estate_for_relation(relation);
 
 	/* Prepare context per tuple */
 	ecxt = GetPerTupleExprContext(estate);
-	oldcxt = MemoryContextSwitchTo(estate->es_query_cxt);
-	ecxt->ecxt_scantuple = ExecInitExtraTupleSlot(estate, tupdesc, &TTSOpsHeapTuple);
-	MemoryContextSwitchTo(oldcxt);
+	ecxt->ecxt_scantuple = entry->scantuple;
 
 	ExecStoreHeapTuple(newtuple ? newtuple : oldtuple, ecxt->ecxt_scantuple, false);
 
@@ -1268,6 +1264,8 @@ get_rel_sync_entry(PGOutputData *data, Relation rel)
 	/* Not found means schema wasn't sent */
 	if (!found)
 	{
+		TupleDesc tupdesc;
+
 		/* immediately make a new entry valid enough to satisfy callbacks */
 		entry->schema_sent = false;
 		entry->streamed_txns = NIL;
@@ -1279,6 +1277,13 @@ get_rel_sync_entry(PGOutputData *data, Relation rel)
 		entry->publish_as_relid = InvalidOid;
 		entry->map = NULL;		/* will be set by maybe_send_schema() if
 								 * needed */
+
+		/* create a tuple table slot for use in row filtering */
+		entry->tuple_table = NIL;
+		tupdesc = RelationGetDescr(rel);
+		oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+		entry->scantuple = ExecAllocTableSlot(&entry->tuple_table, tupdesc, &TTSOpsHeapTuple);
+		MemoryContextSwitchTo(oldctx);
 	}
 
 	/* Validate the entry */
-- 
2.27.0

