From 002ce81dbae3df85610f65a235b45d046af0830f Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Fri, 2 Jul 2021 16:54:45 +1000
Subject: [PATCH v17 4/5] PS POC - Implement a plan cache for pgoutput.

This is a POC patch to implement plan cache which gets used inside the pgoutput_row_filter function instead of calling prepare for every row.
This is intended to implement a cache like what Andes was suggesting [1] to see what difference it makes.

Use #if 0/1 to toggle wihout/with caching.
[1] https://www.postgresql.org/message-id/20210128022032.eq2qqc6zxkqn5syt%40alap3.anarazel.de
---
 src/backend/replication/pgoutput/pgoutput.c | 90 +++++++++++++++++++--
 1 file changed, 82 insertions(+), 8 deletions(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 10f85365fc..86aa012505 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -35,6 +35,7 @@
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
 #include "utils/varlena.h"
+#include "optimizer/optimizer.h"
 
 PG_MODULE_MAGIC;
 
@@ -72,8 +73,6 @@ static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
 static EState *create_estate_for_relation(Relation rel);
 static ExprState *pgoutput_row_filter_prepare_expr(Node *rfnode, EState *estate);
 static bool pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext);
-static bool pgoutput_row_filter(Relation relation, HeapTuple oldtuple,
-								HeapTuple newtuple, List *rowfilter);
 
 static bool publications_valid;
 static bool in_streaming;
@@ -113,6 +112,7 @@ typedef struct RelationSyncEntry
 	bool		replicate_valid;
 	PublicationActions pubactions;
 	List	   *qual;
+	List	   *exprstate_list;
 
 	/*
 	 * OID of the relation to publish changes as.  For a partition, this may
@@ -144,6 +144,8 @@ static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
 											TransactionId xid);
 static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
 											TransactionId xid);
+static bool pgoutput_row_filter(Relation relation, HeapTuple oldtuple,
+								HeapTuple newtuple, RelationSyncEntry *entry);
 
 /*
  * Specify output plugin callbacks
@@ -578,6 +580,35 @@ pgoutput_row_filter_prepare_expr(Node *rfnode, EState *estate)
 	return exprstate;
 }
 
+static ExprState *
+pgoutput_row_filter_prepare_expr2(Node *rfnode)
+{
+	ExprState  *exprstate;
+	Oid			exprtype;
+	Expr	   *expr;
+	MemoryContext oldctx;
+
+	/* Prepare expression for execution */
+	exprtype = exprType(rfnode);
+	expr = (Expr *) coerce_to_target_type(NULL, rfnode, exprtype, BOOLOID, -1, COERCION_ASSIGNMENT, COERCE_IMPLICIT_CAST, -1);
+
+	if (expr == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_CANNOT_COERCE),
+				 errmsg("row filter returns type %s that cannot be coerced to the expected type %s",
+						format_type_be(exprtype),
+						format_type_be(BOOLOID)),
+				 errhint("You will need to rewrite the row filter.")));
+
+	/* Make the exprstate long-lived by using CacheMemoryContext. */
+	oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+	expr = expression_planner(expr);
+	exprstate = ExecInitExpr(expr, NULL);
+	MemoryContextSwitchTo(oldctx);
+
+	return exprstate;
+}
+
 /*
  * Evaluates row filter.
  *
@@ -610,7 +641,7 @@ pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
  * If it returns true, the change is replicated, otherwise, it is not.
  */
 static bool
-pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, List *rowfilter)
+pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, RelationSyncEntry *entry)
 {
 	TupleDesc	tupdesc;
 	EState	   *estate;
@@ -618,11 +649,20 @@ pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, L
 	MemoryContext oldcxt;
 	ListCell   *lc;
 	bool		result = true;
+//#define RF_TIMES
+#ifdef RF_TIMES
+	instr_time	start_time;
+	instr_time	end_time;
+#endif
 
 	/* Bail out if there is no row filter */
-	if (rowfilter == NIL)
+	if (entry->qual == NIL)
 		return true;
 
+#ifdef RF_TIMES
+	INSTR_TIME_SET_CURRENT(start_time);
+#endif
+
 	elog(DEBUG3, "table \"%s.%s\" has row filter",
 		 get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
 		 get_rel_name(relation->rd_id));
@@ -646,7 +686,9 @@ pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, L
 	 * different row filter in these publications, all row filters must be
 	 * matched in order to replicate this change.
 	 */
-	foreach(lc, rowfilter)
+#if 0
+	/* Don't use cached plan. */
+	foreach(lc, entry->qual)
 	{
 		Node	   *rfnode = (Node *) lfirst(lc);
 		ExprState  *exprstate;
@@ -667,12 +709,34 @@ pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, L
 		if (!result)
 			break;
 	}
+#else
+	/* Use cached plan. */
+	foreach(lc, entry->exprstate_list)
+	{
+		ExprState  *exprstate = (ExprState *) lfirst(lc);
+
+		/* Evaluates row filter */
+		result = pgoutput_row_filter_exec_expr(exprstate, ecxt);
+
+		elog(DEBUG3, "row filter %smatched", result ? "" : " not");
+
+		/* If the tuple does not match one of the row filters, bail out */
+		if (!result)
+			break;
+	}
+#endif
 
 	/* Cleanup allocated resources */
 	ResetExprContext(ecxt);
 	FreeExecutorState(estate);
 	PopActiveSnapshot();
 
+#ifdef RF_TIMES
+	INSTR_TIME_SET_CURRENT(end_time);
+	INSTR_TIME_SUBTRACT(end_time, start_time);
+	elog(LOG, "row filter time: %0.3f us", INSTR_TIME_GET_DOUBLE(end_time) * 1e6);
+#endif
+
 	return result;
 }
 
@@ -735,7 +799,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				HeapTuple	tuple = &change->data.tp.newtuple->tuple;
 
 				/* Check row filter. */
-				if (!pgoutput_row_filter(relation, NULL, tuple, relentry->qual))
+				if (!pgoutput_row_filter(relation, NULL, tuple, relentry))
 					return;
 
 				/*
@@ -768,7 +832,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				HeapTuple	newtuple = &change->data.tp.newtuple->tuple;
 
 				/* Check row filter. */
-				if (!pgoutput_row_filter(relation, oldtuple, newtuple, relentry->qual))
+				if (!pgoutput_row_filter(relation, oldtuple, newtuple, relentry))
 					return;
 
 				maybe_send_schema(ctx, txn, change, relation, relentry);
@@ -802,7 +866,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				HeapTuple	oldtuple = &change->data.tp.oldtuple->tuple;
 
 				/* Check row filter. */
-				if (!pgoutput_row_filter(relation, oldtuple, NULL, relentry->qual))
+				if (!pgoutput_row_filter(relation, oldtuple, NULL, relentry))
 					return;
 
 				maybe_send_schema(ctx, txn, change, relation, relentry);
@@ -1211,6 +1275,7 @@ get_rel_sync_entry(PGOutputData *data, Relation rel)
 		entry->pubactions.pubinsert = entry->pubactions.pubupdate =
 			entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
 		entry->qual = NIL;
+		entry->exprstate_list = NIL;
 		entry->publish_as_relid = InvalidOid;
 		entry->map = NULL;		/* will be set by maybe_send_schema() if
 								 * needed */
@@ -1320,10 +1385,16 @@ get_rel_sync_entry(PGOutputData *data, Relation rel)
 				if (!rfisnull)
 				{
 					Node	   *rfnode;
+					ExprState  *exprstate;
 
 					oldctx = MemoryContextSwitchTo(CacheMemoryContext);
 					rfnode = stringToNode(TextDatumGetCString(rfdatum));
 					entry->qual = lappend(entry->qual, rfnode);
+
+					/* Cache the planned row filter */
+					exprstate = pgoutput_row_filter_prepare_expr2(rfnode);
+					entry->exprstate_list = lappend(entry->exprstate_list, exprstate);
+
 					MemoryContextSwitchTo(oldctx);
 				}
 
@@ -1479,6 +1550,9 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
 		if (entry->qual != NIL)
 			list_free_deep(entry->qual);
 		entry->qual = NIL;
+
+		/* FIXME - something to be freed here? */
+		entry->exprstate_list = NIL;
 	}
 
 	MemoryContextSwitchTo(oldctx);
-- 
2.27.0

